Bug 4564: Add Shard Builder class
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Set;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Test;
49 import org.mockito.InOrder;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
54 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
74 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
75 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.Modification;
78 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
79 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
96 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
97 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
98 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
99 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
100 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
101 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
102 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
103 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
104 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
105 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
122 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
123 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
124 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
125 import scala.concurrent.Await;
126 import scala.concurrent.Future;
127 import scala.concurrent.duration.FiniteDuration;
128
129 public class ShardTest extends AbstractShardTest {
130     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
131
132     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
133
134     @Test
135     public void testRegisterChangeListener() throws Exception {
136         new ShardTestKit(getSystem()) {{
137             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
138                     newShardProps(),  "testRegisterChangeListener");
139
140             waitUntilLeader(shard);
141
142             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
143
144             final MockDataChangeListener listener = new MockDataChangeListener(1);
145             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
146                     "testRegisterChangeListener-DataChangeListener");
147
148             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
149                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
150
151             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
152                     RegisterChangeListenerReply.class);
153             final String replyPath = reply.getListenerRegistrationPath().toString();
154             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
155                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
156
157             final YangInstanceIdentifier path = TestModel.TEST_PATH;
158             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
159
160             listener.waitForChangeEvents(path);
161
162             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
163             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
164         }};
165     }
166
167     @SuppressWarnings("serial")
168     @Test
169     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
170         // This test tests the timing window in which a change listener is registered before the
171         // shard becomes the leader. We verify that the listener is registered and notified of the
172         // existing data when the shard becomes the leader.
173         new ShardTestKit(getSystem()) {{
174             // For this test, we want to send the RegisterChangeListener message after the shard
175             // has recovered from persistence and before it becomes the leader. So we subclass
176             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
177             // we know that the shard has been initialized to a follower and has started the
178             // election process. The following 2 CountDownLatches are used to coordinate the
179             // ElectionTimeout with the sending of the RegisterChangeListener message.
180             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
181             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
182             final Creator<Shard> creator = new Creator<Shard>() {
183                 boolean firstElectionTimeout = true;
184
185                 @Override
186                 public Shard create() throws Exception {
187                     // Use a non persistent provider because this test actually invokes persist on the journal
188                     // this will cause all other messages to not be queued properly after that.
189                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
190                     // it does do a persist)
191                     return new Shard(newShardBuilder()) {
192                         @Override
193                         public void onReceiveCommand(final Object message) throws Exception {
194                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
195                                 // Got the first ElectionTimeout. We don't forward it to the
196                                 // base Shard yet until we've sent the RegisterChangeListener
197                                 // message. So we signal the onFirstElectionTimeout latch to tell
198                                 // the main thread to send the RegisterChangeListener message and
199                                 // start a thread to wait on the onChangeListenerRegistered latch,
200                                 // which the main thread signals after it has sent the message.
201                                 // After the onChangeListenerRegistered is triggered, we send the
202                                 // original ElectionTimeout message to proceed with the election.
203                                 firstElectionTimeout = false;
204                                 final ActorRef self = getSelf();
205                                 new Thread() {
206                                     @Override
207                                     public void run() {
208                                         Uninterruptibles.awaitUninterruptibly(
209                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
210                                         self.tell(message, self);
211                                     }
212                                 }.start();
213
214                                 onFirstElectionTimeout.countDown();
215                             } else {
216                                 super.onReceiveCommand(message);
217                             }
218                         }
219                     };
220                 }
221             };
222
223             final MockDataChangeListener listener = new MockDataChangeListener(1);
224             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
225                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
226
227             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
228                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
229                     "testRegisterChangeListenerWhenNotLeaderInitially");
230
231             // Write initial data into the in-memory store.
232             final YangInstanceIdentifier path = TestModel.TEST_PATH;
233             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
234
235             // Wait until the shard receives the first ElectionTimeout message.
236             assertEquals("Got first ElectionTimeout", true,
237                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
238
239             // Now send the RegisterChangeListener and wait for the reply.
240             shard.tell(new RegisterChangeListener(path, dclActor,
241                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
242
243             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
244                     RegisterChangeListenerReply.class);
245             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
246
247             // Sanity check - verify the shard is not the leader yet.
248             shard.tell(new FindLeader(), getRef());
249             final FindLeaderReply findLeadeReply =
250                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
251             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
252
253             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
254             // with the election process.
255             onChangeListenerRegistered.countDown();
256
257             // Wait for the shard to become the leader and notify our listener with the existing
258             // data in the store.
259             listener.waitForChangeEvents(path);
260
261             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
262             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
263         }};
264     }
265
266     @Test
267     public void testRegisterDataTreeChangeListener() throws Exception {
268         new ShardTestKit(getSystem()) {{
269             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
270                     newShardProps(), "testRegisterDataTreeChangeListener");
271
272             waitUntilLeader(shard);
273
274             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
275
276             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
277             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
278                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
279
280             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
281
282             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
283                     RegisterDataTreeChangeListenerReply.class);
284             final String replyPath = reply.getListenerRegistrationPath().toString();
285             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
286                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
287
288             final YangInstanceIdentifier path = TestModel.TEST_PATH;
289             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
290
291             listener.waitForChangeEvents();
292
293             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
294             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
295         }};
296     }
297
298     @SuppressWarnings("serial")
299     @Test
300     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
301         new ShardTestKit(getSystem()) {{
302             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
303             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
304             final Creator<Shard> creator = new Creator<Shard>() {
305                 boolean firstElectionTimeout = true;
306
307                 @Override
308                 public Shard create() throws Exception {
309                     return new Shard(Shard.builder().id(shardID).datastoreContext(
310                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
311                         @Override
312                         public void onReceiveCommand(final Object message) throws Exception {
313                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
314                                 firstElectionTimeout = false;
315                                 final ActorRef self = getSelf();
316                                 new Thread() {
317                                     @Override
318                                     public void run() {
319                                         Uninterruptibles.awaitUninterruptibly(
320                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
321                                         self.tell(message, self);
322                                     }
323                                 }.start();
324
325                                 onFirstElectionTimeout.countDown();
326                             } else {
327                                 super.onReceiveCommand(message);
328                             }
329                         }
330                     };
331                 }
332             };
333
334             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
335             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
336                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
337
338             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
339                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
340                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
341
342             final YangInstanceIdentifier path = TestModel.TEST_PATH;
343             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
344
345             assertEquals("Got first ElectionTimeout", true,
346                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
347
348             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
349             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
350                 RegisterDataTreeChangeListenerReply.class);
351             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
352
353             shard.tell(new FindLeader(), getRef());
354             final FindLeaderReply findLeadeReply =
355                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
356             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
357
358             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
359
360             onChangeListenerRegistered.countDown();
361
362             // TODO: investigate why we do not receive data chage events
363             listener.waitForChangeEvents();
364
365             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
366             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
367         }};
368     }
369
370     @Test
371     public void testCreateTransaction(){
372         new ShardTestKit(getSystem()) {{
373             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
374
375             waitUntilLeader(shard);
376
377             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
378
379             shard.tell(new CreateTransaction("txn-1",
380                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
381
382             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
383                     CreateTransactionReply.class);
384
385             final String path = reply.getTransactionActorPath().toString();
386             assertTrue("Unexpected transaction path " + path,
387                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
388
389             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
390         }};
391     }
392
393     @Test
394     public void testCreateTransactionOnChain(){
395         new ShardTestKit(getSystem()) {{
396             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
397
398             waitUntilLeader(shard);
399
400             shard.tell(new CreateTransaction("txn-1",
401                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
402                     getRef());
403
404             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
405                     CreateTransactionReply.class);
406
407             final String path = reply.getTransactionActorPath().toString();
408             assertTrue("Unexpected transaction path " + path,
409                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
410
411             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
412         }};
413     }
414
415     @SuppressWarnings("serial")
416     @Test
417     public void testPeerAddressResolved() throws Exception {
418         new ShardTestKit(getSystem()) {{
419             final CountDownLatch recoveryComplete = new CountDownLatch(1);
420             class TestShard extends Shard {
421                 TestShard() {
422                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
423                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
424                             schemaContext(SCHEMA_CONTEXT));
425                 }
426
427                 String getPeerAddress(String id) {
428                     return getRaftActorContext().getPeerAddress(id);
429                 }
430
431                 @Override
432                 protected void onRecoveryComplete() {
433                     try {
434                         super.onRecoveryComplete();
435                     } finally {
436                         recoveryComplete.countDown();
437                     }
438                 }
439             }
440
441             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
442                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
443                         @Override
444                         public TestShard create() throws Exception {
445                             return new TestShard();
446                         }
447                     })), "testPeerAddressResolved");
448
449             assertEquals("Recovery complete", true,
450                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
451
452             final String address = "akka://foobar";
453             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
454
455             assertEquals("getPeerAddress", address,
456                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
457
458             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
459         }};
460     }
461
462     @Test
463     public void testApplySnapshot() throws Exception {
464
465         ShardTestKit testkit = new ShardTestKit(getSystem());
466
467         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
468                 "testApplySnapshot");
469
470         testkit.waitUntilLeader(shard);
471
472         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
473         store.setSchemaContext(SCHEMA_CONTEXT);
474
475         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
476                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
477                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
478                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
479
480         writeToStore(store, TestModel.TEST_PATH, container);
481
482         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
483         final NormalizedNode<?,?> expected = readStore(store, root);
484
485         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
486                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
487
488         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
489
490         final NormalizedNode<?,?> actual = readStore(shard, root);
491
492         assertEquals("Root node", expected, actual);
493
494         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
495     }
496
497     @Test
498     public void testApplyState() throws Exception {
499
500         ShardTestKit testkit = new ShardTestKit(getSystem());
501
502         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
503
504         testkit.waitUntilLeader(shard);
505
506         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
507
508         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
509                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
510
511         shard.underlyingActor().onReceiveCommand(applyState);
512
513         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
514         assertEquals("Applied state", node, actual);
515
516         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
517     }
518
519     @Test
520     public void testApplyStateWithCandidatePayload() throws Exception {
521
522         ShardTestKit testkit = new ShardTestKit(getSystem());
523
524         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
525
526         testkit.waitUntilLeader(shard);
527
528         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
529         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
530
531         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
532                 DataTreeCandidatePayload.create(candidate)));
533
534         shard.underlyingActor().onReceiveCommand(applyState);
535
536         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
537         assertEquals("Applied state", node, actual);
538
539         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
540     }
541
542     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
543         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
544         testStore.setSchemaContext(SCHEMA_CONTEXT);
545
546         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
547
548         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
549
550         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
551             SerializationUtils.serializeNormalizedNode(root),
552             Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
553         return testStore;
554     }
555
556     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
557         source.validate(mod);
558         final DataTreeCandidate candidate = source.prepare(mod);
559         source.commit(candidate);
560         return DataTreeCandidatePayload.create(candidate);
561     }
562
563     @Test
564     public void testDataTreeCandidateRecovery() throws Exception {
565         // Set up the InMemorySnapshotStore.
566         final DataTree source = setupInMemorySnapshotStore();
567
568         final DataTreeModification writeMod = source.takeSnapshot().newModification();
569         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
570         writeMod.ready();
571         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
572
573         // Set up the InMemoryJournal.
574         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
575
576         final int nListEntries = 16;
577         final Set<Integer> listEntryKeys = new HashSet<>();
578
579         // Add some ModificationPayload entries
580         for (int i = 1; i <= nListEntries; i++) {
581             listEntryKeys.add(Integer.valueOf(i));
582
583             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
584                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
585
586             final DataTreeModification mod = source.takeSnapshot().newModification();
587             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
588             mod.ready();
589             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
590                 payloadForModification(source, mod)));
591         }
592
593         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
594             new ApplyJournalEntries(nListEntries));
595
596         testRecovery(listEntryKeys);
597     }
598
599     @Test
600     public void testModicationRecovery() throws Exception {
601
602         // Set up the InMemorySnapshotStore.
603         setupInMemorySnapshotStore();
604
605         // Set up the InMemoryJournal.
606
607         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
608
609         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
610             new WriteModification(TestModel.OUTER_LIST_PATH,
611                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
612
613         final int nListEntries = 16;
614         final Set<Integer> listEntryKeys = new HashSet<>();
615
616         // Add some ModificationPayload entries
617         for(int i = 1; i <= nListEntries; i++) {
618             listEntryKeys.add(Integer.valueOf(i));
619             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
620                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
621             final Modification mod = new MergeModification(path,
622                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
623             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
624                     newModificationPayload(mod)));
625         }
626
627         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
628                 new ApplyJournalEntries(nListEntries));
629
630         testRecovery(listEntryKeys);
631     }
632
633     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
634         final MutableCompositeModification compMod = new MutableCompositeModification();
635         for(final Modification mod: mods) {
636             compMod.addModification(mod);
637         }
638
639         return new ModificationPayload(compMod);
640     }
641
642     @Test
643     public void testConcurrentThreePhaseCommits() throws Throwable {
644         new ShardTestKit(getSystem()) {{
645             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
646                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
647                     "testConcurrentThreePhaseCommits");
648
649             waitUntilLeader(shard);
650
651          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
652
653             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
654
655             final String transactionID1 = "tx1";
656             final MutableCompositeModification modification1 = new MutableCompositeModification();
657             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
658                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
659
660             final String transactionID2 = "tx2";
661             final MutableCompositeModification modification2 = new MutableCompositeModification();
662             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
663                     TestModel.OUTER_LIST_PATH,
664                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
665                     modification2);
666
667             final String transactionID3 = "tx3";
668             final MutableCompositeModification modification3 = new MutableCompositeModification();
669             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
670                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
671                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
672                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
673                     modification3);
674
675             final long timeoutSec = 5;
676             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
677             final Timeout timeout = new Timeout(duration);
678
679             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
680             // by the ShardTransaction.
681
682             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
683                     cohort1, modification1, true, false), getRef());
684             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
685                     expectMsgClass(duration, ReadyTransactionReply.class));
686             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
687
688             // Send the CanCommitTransaction message for the first Tx.
689
690             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
691             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
692                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
693             assertEquals("Can commit", true, canCommitReply.getCanCommit());
694
695             // Send the ForwardedReadyTransaction for the next 2 Tx's.
696
697             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
698                     cohort2, modification2, true, false), getRef());
699             expectMsgClass(duration, ReadyTransactionReply.class);
700
701             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
702                     cohort3, modification3, true, false), getRef());
703             expectMsgClass(duration, ReadyTransactionReply.class);
704
705             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
706             // processed after the first Tx completes.
707
708             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
709                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
710
711             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
712                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
713
714             // Send the CommitTransaction message for the first Tx. After it completes, it should
715             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
716
717             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
718             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
719
720             // Wait for the next 2 Tx's to complete.
721
722             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
723             final CountDownLatch commitLatch = new CountDownLatch(2);
724
725             class OnFutureComplete extends OnComplete<Object> {
726                 private final Class<?> expRespType;
727
728                 OnFutureComplete(final Class<?> expRespType) {
729                     this.expRespType = expRespType;
730                 }
731
732                 @Override
733                 public void onComplete(final Throwable error, final Object resp) {
734                     if(error != null) {
735                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
736                     } else {
737                         try {
738                             assertEquals("Commit response type", expRespType, resp.getClass());
739                             onSuccess(resp);
740                         } catch (final Exception e) {
741                             caughtEx.set(e);
742                         }
743                     }
744                 }
745
746                 void onSuccess(final Object resp) throws Exception {
747                 }
748             }
749
750             class OnCommitFutureComplete extends OnFutureComplete {
751                 OnCommitFutureComplete() {
752                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
753                 }
754
755                 @Override
756                 public void onComplete(final Throwable error, final Object resp) {
757                     super.onComplete(error, resp);
758                     commitLatch.countDown();
759                 }
760             }
761
762             class OnCanCommitFutureComplete extends OnFutureComplete {
763                 private final String transactionID;
764
765                 OnCanCommitFutureComplete(final String transactionID) {
766                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
767                     this.transactionID = transactionID;
768                 }
769
770                 @Override
771                 void onSuccess(final Object resp) throws Exception {
772                     final CanCommitTransactionReply canCommitReply =
773                             CanCommitTransactionReply.fromSerializable(resp);
774                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
775
776                     final Future<Object> commitFuture = Patterns.ask(shard,
777                             new CommitTransaction(transactionID).toSerializable(), timeout);
778                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
779                 }
780             }
781
782             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
783                     getSystem().dispatcher());
784
785             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
786                     getSystem().dispatcher());
787
788             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
789
790             if(caughtEx.get() != null) {
791                 throw caughtEx.get();
792             }
793
794             assertEquals("Commits complete", true, done);
795
796             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
797             inOrder.verify(cohort1).canCommit();
798             inOrder.verify(cohort1).preCommit();
799             inOrder.verify(cohort1).commit();
800             inOrder.verify(cohort2).canCommit();
801             inOrder.verify(cohort2).preCommit();
802             inOrder.verify(cohort2).commit();
803             inOrder.verify(cohort3).canCommit();
804             inOrder.verify(cohort3).preCommit();
805             inOrder.verify(cohort3).commit();
806
807             // Verify data in the data store.
808
809             verifyOuterListEntry(shard, 1);
810
811             verifyLastApplied(shard, 2);
812
813             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
814         }};
815     }
816
817     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
818             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
819         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
820     }
821
822     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
823             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
824             final int messagesSent) {
825         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
826         batched.addModification(new WriteModification(path, data));
827         batched.setReady(ready);
828         batched.setDoCommitOnReady(doCommitOnReady);
829         batched.setTotalMessagesSent(messagesSent);
830         return batched;
831     }
832
833     @Test
834     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
835         new ShardTestKit(getSystem()) {{
836             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
837                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
838                     "testBatchedModificationsWithNoCommitOnReady");
839
840             waitUntilLeader(shard);
841
842             final String transactionID = "tx";
843             final FiniteDuration duration = duration("5 seconds");
844
845             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
846             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
847                 @Override
848                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
849                     if(mockCohort.get() == null) {
850                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
851                     }
852
853                     return mockCohort.get();
854                 }
855             };
856
857             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
858
859             // Send a BatchedModifications to start a transaction.
860
861             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
862                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
863             expectMsgClass(duration, BatchedModificationsReply.class);
864
865             // Send a couple more BatchedModifications.
866
867             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
868                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
869             expectMsgClass(duration, BatchedModificationsReply.class);
870
871             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
872                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
873                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
874             expectMsgClass(duration, ReadyTransactionReply.class);
875
876             // Send the CanCommitTransaction message.
877
878             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
879             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
880                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
881             assertEquals("Can commit", true, canCommitReply.getCanCommit());
882
883             // Send the CanCommitTransaction message.
884
885             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
886             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
887
888             final InOrder inOrder = inOrder(mockCohort.get());
889             inOrder.verify(mockCohort.get()).canCommit();
890             inOrder.verify(mockCohort.get()).preCommit();
891             inOrder.verify(mockCohort.get()).commit();
892
893             // Verify data in the data store.
894
895             verifyOuterListEntry(shard, 1);
896
897             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
898         }};
899     }
900
901     @Test
902     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
903         new ShardTestKit(getSystem()) {{
904             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
905                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
906                     "testBatchedModificationsWithCommitOnReady");
907
908             waitUntilLeader(shard);
909
910             final String transactionID = "tx";
911             final FiniteDuration duration = duration("5 seconds");
912
913             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
914             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
915                 @Override
916                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
917                     if(mockCohort.get() == null) {
918                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
919                     }
920
921                     return mockCohort.get();
922                 }
923             };
924
925             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
926
927             // Send a BatchedModifications to start a transaction.
928
929             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
930                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
931             expectMsgClass(duration, BatchedModificationsReply.class);
932
933             // Send a couple more BatchedModifications.
934
935             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
936                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
937             expectMsgClass(duration, BatchedModificationsReply.class);
938
939             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
940                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
941                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
942
943             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
944
945             final InOrder inOrder = inOrder(mockCohort.get());
946             inOrder.verify(mockCohort.get()).canCommit();
947             inOrder.verify(mockCohort.get()).preCommit();
948             inOrder.verify(mockCohort.get()).commit();
949
950             // Verify data in the data store.
951
952             verifyOuterListEntry(shard, 1);
953
954             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
955         }};
956     }
957
958     @Test(expected=IllegalStateException.class)
959     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
960         new ShardTestKit(getSystem()) {{
961             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
962                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
963                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
964
965             waitUntilLeader(shard);
966
967             final String transactionID = "tx1";
968             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
969             batched.setReady(true);
970             batched.setTotalMessagesSent(2);
971
972             shard.tell(batched, getRef());
973
974             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
975
976             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
977
978             if(failure != null) {
979                 throw failure.cause();
980             }
981         }};
982     }
983
984     @Test
985     public void testBatchedModificationsWithOperationFailure() throws Throwable {
986         new ShardTestKit(getSystem()) {{
987             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
988                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
989                     "testBatchedModificationsWithOperationFailure");
990
991             waitUntilLeader(shard);
992
993             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
994             // write will not validate the children for performance reasons.
995
996             String transactionID = "tx1";
997
998             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
999                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1000                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1001
1002             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1003             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1004             shard.tell(batched, getRef());
1005             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1006
1007             Throwable cause = failure.cause();
1008
1009             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1010             batched.setReady(true);
1011             batched.setTotalMessagesSent(2);
1012
1013             shard.tell(batched, getRef());
1014
1015             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1016             assertEquals("Failure cause", cause, failure.cause());
1017
1018             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1019         }};
1020     }
1021
1022     @SuppressWarnings("unchecked")
1023     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1024         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1025         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1026         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1027                 outerList.getValue() instanceof Iterable);
1028         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1029         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1030                 entry instanceof MapEntryNode);
1031         final MapEntryNode mapEntry = (MapEntryNode)entry;
1032         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1033                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1034         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1035         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1036     }
1037
1038     @Test
1039     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1040         new ShardTestKit(getSystem()) {{
1041             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1042                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1043                     "testBatchedModificationsOnTransactionChain");
1044
1045             waitUntilLeader(shard);
1046
1047             final String transactionChainID = "txChain";
1048             final String transactionID1 = "tx1";
1049             final String transactionID2 = "tx2";
1050
1051             final FiniteDuration duration = duration("5 seconds");
1052
1053             // Send a BatchedModifications to start a chained write transaction and ready it.
1054
1055             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1056             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1057             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1058                     containerNode, true, false, 1), getRef());
1059             expectMsgClass(duration, ReadyTransactionReply.class);
1060
1061             // Create a read Tx on the same chain.
1062
1063             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1064                     transactionChainID).toSerializable(), getRef());
1065
1066             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1067
1068             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1069             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1070             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1071
1072             // Commit the write transaction.
1073
1074             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1075             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1076                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1077             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1078
1079             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1080             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1081
1082             // Verify data in the data store.
1083
1084             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1085             assertEquals("Stored node", containerNode, actualNode);
1086
1087             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1088         }};
1089     }
1090
1091     @Test
1092     public void testOnBatchedModificationsWhenNotLeader() {
1093         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1094         new ShardTestKit(getSystem()) {{
1095             final Creator<Shard> creator = new Creator<Shard>() {
1096                 private static final long serialVersionUID = 1L;
1097
1098                 @Override
1099                 public Shard create() throws Exception {
1100                     return new Shard(newShardBuilder()) {
1101                         @Override
1102                         protected boolean isLeader() {
1103                             return overrideLeaderCalls.get() ? false : super.isLeader();
1104                         }
1105
1106                         @Override
1107                         protected ActorSelection getLeader() {
1108                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1109                                 super.getLeader();
1110                         }
1111                     };
1112                 }
1113             };
1114
1115             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1116                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1117
1118             waitUntilLeader(shard);
1119
1120             overrideLeaderCalls.set(true);
1121
1122             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1123
1124             shard.tell(batched, ActorRef.noSender());
1125
1126             expectMsgEquals(batched);
1127
1128             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1129         }};
1130     }
1131
1132     @Test
1133     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1134         new ShardTestKit(getSystem()) {{
1135             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1136                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1137                     "testForwardedReadyTransactionWithImmediateCommit");
1138
1139             waitUntilLeader(shard);
1140
1141             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1142
1143             final String transactionID = "tx1";
1144             final MutableCompositeModification modification = new MutableCompositeModification();
1145             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1146             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1147                     TestModel.TEST_PATH, containerNode, modification);
1148
1149             final FiniteDuration duration = duration("5 seconds");
1150
1151             // Simulate the ForwardedReadyTransaction messages that would be sent
1152             // by the ShardTransaction.
1153
1154             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1155                     cohort, modification, true, true), getRef());
1156
1157             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1158
1159             final InOrder inOrder = inOrder(cohort);
1160             inOrder.verify(cohort).canCommit();
1161             inOrder.verify(cohort).preCommit();
1162             inOrder.verify(cohort).commit();
1163
1164             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1165             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1166
1167             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1168         }};
1169     }
1170
1171     @Test
1172     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1173         new ShardTestKit(getSystem()) {{
1174             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1175                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1176                     "testReadyLocalTransactionWithImmediateCommit");
1177
1178             waitUntilLeader(shard);
1179
1180             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1181
1182             final DataTreeModification modification = dataStore.newModification();
1183
1184             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1185             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1186             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1187             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1188
1189             final String txId = "tx1";
1190             modification.ready();
1191             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1192
1193             shard.tell(readyMessage, getRef());
1194
1195             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1196
1197             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1198             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1199
1200             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1201         }};
1202     }
1203
1204     @Test
1205     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1206         new ShardTestKit(getSystem()) {{
1207             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1208                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1209                     "testReadyLocalTransactionWithThreePhaseCommit");
1210
1211             waitUntilLeader(shard);
1212
1213             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1214
1215             final DataTreeModification modification = dataStore.newModification();
1216
1217             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1218             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1219             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1220             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1221
1222             final String txId = "tx1";
1223                 modification.ready();
1224             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1225
1226             shard.tell(readyMessage, getRef());
1227
1228             expectMsgClass(ReadyTransactionReply.class);
1229
1230             // Send the CanCommitTransaction message.
1231
1232             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1233             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1234                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1235             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1236
1237             // Send the CanCommitTransaction message.
1238
1239             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1240             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1241
1242             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1243             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1244
1245             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1246         }};
1247     }
1248
1249     @Test
1250     public void testCommitWithPersistenceDisabled() throws Throwable {
1251         dataStoreContextBuilder.persistent(false);
1252         new ShardTestKit(getSystem()) {{
1253             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1254                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1255                     "testCommitWithPersistenceDisabled");
1256
1257             waitUntilLeader(shard);
1258
1259             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1260
1261             // Setup a simulated transactions with a mock cohort.
1262
1263             final String transactionID = "tx";
1264             final MutableCompositeModification modification = new MutableCompositeModification();
1265             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1266             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1267                 TestModel.TEST_PATH, containerNode, modification);
1268
1269             final FiniteDuration duration = duration("5 seconds");
1270
1271             // Simulate the ForwardedReadyTransaction messages that would be sent
1272             // by the ShardTransaction.
1273
1274             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1275                 cohort, modification, true, false), getRef());
1276             expectMsgClass(duration, ReadyTransactionReply.class);
1277
1278             // Send the CanCommitTransaction message.
1279
1280             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1281             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1282                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1283             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1284
1285             // Send the CanCommitTransaction message.
1286
1287             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1288             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1289
1290             final InOrder inOrder = inOrder(cohort);
1291             inOrder.verify(cohort).canCommit();
1292             inOrder.verify(cohort).preCommit();
1293             inOrder.verify(cohort).commit();
1294
1295             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1296             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1297
1298             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1299         }};
1300     }
1301
1302     private static DataTreeCandidateTip mockCandidate(final String name) {
1303         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1304         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1305         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1306         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1307         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1308         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1309         return mockCandidate;
1310     }
1311
1312     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1313         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1314         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1315         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1316         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1317         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1318         return mockCandidate;
1319     }
1320
1321     @Test
1322     public void testCommitWhenTransactionHasNoModifications(){
1323         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1324         // but here that need not happen
1325         new ShardTestKit(getSystem()) {
1326             {
1327                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1328                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1329                         "testCommitWhenTransactionHasNoModifications");
1330
1331                 waitUntilLeader(shard);
1332
1333                 final String transactionID = "tx1";
1334                 final MutableCompositeModification modification = new MutableCompositeModification();
1335                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1336                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1337                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1338                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1339                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1340
1341                 final FiniteDuration duration = duration("5 seconds");
1342
1343                 // Simulate the ForwardedReadyTransaction messages that would be sent
1344                 // by the ShardTransaction.
1345
1346                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1347                         cohort, modification, true, false), getRef());
1348                 expectMsgClass(duration, ReadyTransactionReply.class);
1349
1350                 // Send the CanCommitTransaction message.
1351
1352                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1353                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1354                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1355                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1356
1357                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1358                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1359
1360                 final InOrder inOrder = inOrder(cohort);
1361                 inOrder.verify(cohort).canCommit();
1362                 inOrder.verify(cohort).preCommit();
1363                 inOrder.verify(cohort).commit();
1364
1365                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1366                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1367
1368                 // Use MBean for verification
1369                 // Committed transaction count should increase as usual
1370                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1371
1372                 // Commit index should not advance because this does not go into the journal
1373                 assertEquals(-1, shardStats.getCommitIndex());
1374
1375                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1376
1377             }
1378         };
1379     }
1380
1381     @Test
1382     public void testCommitWhenTransactionHasModifications(){
1383         new ShardTestKit(getSystem()) {
1384             {
1385                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1386                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1387                         "testCommitWhenTransactionHasModifications");
1388
1389                 waitUntilLeader(shard);
1390
1391                 final String transactionID = "tx1";
1392                 final MutableCompositeModification modification = new MutableCompositeModification();
1393                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1394                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1395                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1396                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1397                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1398                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1399
1400                 final FiniteDuration duration = duration("5 seconds");
1401
1402                 // Simulate the ForwardedReadyTransaction messages that would be sent
1403                 // by the ShardTransaction.
1404
1405                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1406                         cohort, modification, true, false), getRef());
1407                 expectMsgClass(duration, ReadyTransactionReply.class);
1408
1409                 // Send the CanCommitTransaction message.
1410
1411                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1412                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1413                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1414                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1415
1416                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1417                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1418
1419                 final InOrder inOrder = inOrder(cohort);
1420                 inOrder.verify(cohort).canCommit();
1421                 inOrder.verify(cohort).preCommit();
1422                 inOrder.verify(cohort).commit();
1423
1424                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1425                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1426
1427                 // Use MBean for verification
1428                 // Committed transaction count should increase as usual
1429                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1430
1431                 // Commit index should advance as we do not have an empty modification
1432                 assertEquals(0, shardStats.getCommitIndex());
1433
1434                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1435
1436             }
1437         };
1438     }
1439
1440     @Test
1441     public void testCommitPhaseFailure() throws Throwable {
1442         new ShardTestKit(getSystem()) {{
1443             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1444                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1445                     "testCommitPhaseFailure");
1446
1447             waitUntilLeader(shard);
1448
1449             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1450             // commit phase.
1451
1452             final String transactionID1 = "tx1";
1453             final MutableCompositeModification modification1 = new MutableCompositeModification();
1454             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1455             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1456             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1457             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1458             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1459
1460             final String transactionID2 = "tx2";
1461             final MutableCompositeModification modification2 = new MutableCompositeModification();
1462             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1463             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1464
1465             final FiniteDuration duration = duration("5 seconds");
1466             final Timeout timeout = new Timeout(duration);
1467
1468             // Simulate the ForwardedReadyTransaction messages that would be sent
1469             // by the ShardTransaction.
1470
1471             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1472                     cohort1, modification1, true, false), getRef());
1473             expectMsgClass(duration, ReadyTransactionReply.class);
1474
1475             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1476                     cohort2, modification2, true, false), getRef());
1477             expectMsgClass(duration, ReadyTransactionReply.class);
1478
1479             // Send the CanCommitTransaction message for the first Tx.
1480
1481             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1482             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1483                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1484             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1485
1486             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1487             // processed after the first Tx completes.
1488
1489             final Future<Object> canCommitFuture = Patterns.ask(shard,
1490                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1491
1492             // Send the CommitTransaction message for the first Tx. This should send back an error
1493             // and trigger the 2nd Tx to proceed.
1494
1495             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1496             expectMsgClass(duration, akka.actor.Status.Failure.class);
1497
1498             // Wait for the 2nd Tx to complete the canCommit phase.
1499
1500             final CountDownLatch latch = new CountDownLatch(1);
1501             canCommitFuture.onComplete(new OnComplete<Object>() {
1502                 @Override
1503                 public void onComplete(final Throwable t, final Object resp) {
1504                     latch.countDown();
1505                 }
1506             }, getSystem().dispatcher());
1507
1508             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1509
1510             final InOrder inOrder = inOrder(cohort1, cohort2);
1511             inOrder.verify(cohort1).canCommit();
1512             inOrder.verify(cohort1).preCommit();
1513             inOrder.verify(cohort1).commit();
1514             inOrder.verify(cohort2).canCommit();
1515
1516             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1517         }};
1518     }
1519
1520     @Test
1521     public void testPreCommitPhaseFailure() throws Throwable {
1522         new ShardTestKit(getSystem()) {{
1523             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1524                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1525                     "testPreCommitPhaseFailure");
1526
1527             waitUntilLeader(shard);
1528
1529             final String transactionID1 = "tx1";
1530             final MutableCompositeModification modification1 = new MutableCompositeModification();
1531             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1532             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1533             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1534
1535             final String transactionID2 = "tx2";
1536             final MutableCompositeModification modification2 = new MutableCompositeModification();
1537             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1538             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1539
1540             final FiniteDuration duration = duration("5 seconds");
1541             final Timeout timeout = new Timeout(duration);
1542
1543             // Simulate the ForwardedReadyTransaction messages that would be sent
1544             // by the ShardTransaction.
1545
1546             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1547                     cohort1, modification1, true, false), getRef());
1548             expectMsgClass(duration, ReadyTransactionReply.class);
1549
1550             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1551                     cohort2, modification2, true, false), getRef());
1552             expectMsgClass(duration, ReadyTransactionReply.class);
1553
1554             // Send the CanCommitTransaction message for the first Tx.
1555
1556             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1557             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1558                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1559             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1560
1561             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1562             // processed after the first Tx completes.
1563
1564             final Future<Object> canCommitFuture = Patterns.ask(shard,
1565                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1566
1567             // Send the CommitTransaction message for the first Tx. This should send back an error
1568             // and trigger the 2nd Tx to proceed.
1569
1570             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1571             expectMsgClass(duration, akka.actor.Status.Failure.class);
1572
1573             // Wait for the 2nd Tx to complete the canCommit phase.
1574
1575             final CountDownLatch latch = new CountDownLatch(1);
1576             canCommitFuture.onComplete(new OnComplete<Object>() {
1577                 @Override
1578                 public void onComplete(final Throwable t, final Object resp) {
1579                     latch.countDown();
1580                 }
1581             }, getSystem().dispatcher());
1582
1583             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1584
1585             final InOrder inOrder = inOrder(cohort1, cohort2);
1586             inOrder.verify(cohort1).canCommit();
1587             inOrder.verify(cohort1).preCommit();
1588             inOrder.verify(cohort2).canCommit();
1589
1590             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1591         }};
1592     }
1593
1594     @Test
1595     public void testCanCommitPhaseFailure() throws Throwable {
1596         new ShardTestKit(getSystem()) {{
1597             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1598                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1599                     "testCanCommitPhaseFailure");
1600
1601             waitUntilLeader(shard);
1602
1603             final FiniteDuration duration = duration("5 seconds");
1604
1605             final String transactionID1 = "tx1";
1606             final MutableCompositeModification modification = new MutableCompositeModification();
1607             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1608             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1609
1610             // Simulate the ForwardedReadyTransaction messages that would be sent
1611             // by the ShardTransaction.
1612
1613             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1614                     cohort, modification, true, false), getRef());
1615             expectMsgClass(duration, ReadyTransactionReply.class);
1616
1617             // Send the CanCommitTransaction message.
1618
1619             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1620             expectMsgClass(duration, akka.actor.Status.Failure.class);
1621
1622             // Send another can commit to ensure the failed one got cleaned up.
1623
1624             reset(cohort);
1625
1626             final String transactionID2 = "tx2";
1627             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1628
1629             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1630                     cohort, modification, true, false), getRef());
1631             expectMsgClass(duration, ReadyTransactionReply.class);
1632
1633             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1634             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1635                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1636             assertEquals("getCanCommit", true, reply.getCanCommit());
1637
1638             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1639         }};
1640     }
1641
1642     @Test
1643     public void testCanCommitPhaseFalseResponse() throws Throwable {
1644         new ShardTestKit(getSystem()) {{
1645             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1646                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1647                     "testCanCommitPhaseFalseResponse");
1648
1649             waitUntilLeader(shard);
1650
1651             final FiniteDuration duration = duration("5 seconds");
1652
1653             final String transactionID1 = "tx1";
1654             final MutableCompositeModification modification = new MutableCompositeModification();
1655             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1656             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1657
1658             // Simulate the ForwardedReadyTransaction messages that would be sent
1659             // by the ShardTransaction.
1660
1661             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1662                     cohort, modification, true, false), getRef());
1663             expectMsgClass(duration, ReadyTransactionReply.class);
1664
1665             // Send the CanCommitTransaction message.
1666
1667             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1668             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1669                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1670             assertEquals("getCanCommit", false, reply.getCanCommit());
1671
1672             // Send another can commit to ensure the failed one got cleaned up.
1673
1674             reset(cohort);
1675
1676             final String transactionID2 = "tx2";
1677             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1678
1679             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1680                 cohort, modification, true, false), getRef());
1681             expectMsgClass(duration, ReadyTransactionReply.class);
1682
1683             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1684             reply = CanCommitTransactionReply.fromSerializable(
1685                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1686             assertEquals("getCanCommit", true, reply.getCanCommit());
1687
1688             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1689         }};
1690     }
1691
1692     @Test
1693     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1694         new ShardTestKit(getSystem()) {{
1695             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1696                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1697                     "testImmediateCommitWithCanCommitPhaseFailure");
1698
1699             waitUntilLeader(shard);
1700
1701             final FiniteDuration duration = duration("5 seconds");
1702
1703             final String transactionID1 = "tx1";
1704             final MutableCompositeModification modification = new MutableCompositeModification();
1705             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1706             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1707
1708             // Simulate the ForwardedReadyTransaction messages that would be sent
1709             // by the ShardTransaction.
1710
1711             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1712                     cohort, modification, true, true), getRef());
1713
1714             expectMsgClass(duration, akka.actor.Status.Failure.class);
1715
1716             // Send another can commit to ensure the failed one got cleaned up.
1717
1718             reset(cohort);
1719
1720             final String transactionID2 = "tx2";
1721             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1722             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1723             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1724             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1725             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1726             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1727             doReturn(candidateRoot).when(candidate).getRootNode();
1728             doReturn(candidate).when(cohort).getCandidate();
1729
1730             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1731                     cohort, modification, true, true), getRef());
1732
1733             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1734
1735             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1736         }};
1737     }
1738
1739     @Test
1740     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1741         new ShardTestKit(getSystem()) {{
1742             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1743                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1744                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1745
1746             waitUntilLeader(shard);
1747
1748             final FiniteDuration duration = duration("5 seconds");
1749
1750             final String transactionID = "tx1";
1751             final MutableCompositeModification modification = new MutableCompositeModification();
1752             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1753             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1754
1755             // Simulate the ForwardedReadyTransaction messages that would be sent
1756             // by the ShardTransaction.
1757
1758             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1759                     cohort, modification, true, true), getRef());
1760
1761             expectMsgClass(duration, akka.actor.Status.Failure.class);
1762
1763             // Send another can commit to ensure the failed one got cleaned up.
1764
1765             reset(cohort);
1766
1767             final String transactionID2 = "tx2";
1768             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1769             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1770             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1771             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1772             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1773             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1774             doReturn(candidateRoot).when(candidate).getRootNode();
1775             doReturn(candidate).when(cohort).getCandidate();
1776
1777             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1778                     cohort, modification, true, true), getRef());
1779
1780             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1781
1782             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1783         }};
1784     }
1785
1786     @Test
1787     public void testAbortBeforeFinishCommit() throws Throwable {
1788         new ShardTestKit(getSystem()) {{
1789             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1790                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1791                     "testAbortBeforeFinishCommit");
1792
1793             waitUntilLeader(shard);
1794
1795             final FiniteDuration duration = duration("5 seconds");
1796             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1797
1798             final String transactionID = "tx1";
1799             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1800                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1801                 @Override
1802                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1803                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1804
1805                     // Simulate an AbortTransaction message occurring during replication, after
1806                     // persisting and before finishing the commit to the in-memory store.
1807                     // We have no followers so due to optimizations in the RaftActor, it does not
1808                     // attempt replication and thus we can't send an AbortTransaction message b/c
1809                     // it would be processed too late after CommitTransaction completes. So we'll
1810                     // simulate an AbortTransaction message occurring during replication by calling
1811                     // the shard directly.
1812                     //
1813                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1814
1815                     return preCommitFuture;
1816                 }
1817             };
1818
1819             final MutableCompositeModification modification = new MutableCompositeModification();
1820             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1821                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1822                     modification, preCommit);
1823
1824             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1825                 cohort, modification, true, false), getRef());
1826             expectMsgClass(duration, ReadyTransactionReply.class);
1827
1828             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1829             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1830                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1831             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1832
1833             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1834             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1835
1836             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1837
1838             // Since we're simulating an abort occurring during replication and before finish commit,
1839             // the data should still get written to the in-memory store since we've gotten past
1840             // canCommit and preCommit and persisted the data.
1841             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1842
1843             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1844         }};
1845     }
1846
1847     @Test
1848     public void testTransactionCommitTimeout() throws Throwable {
1849         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1850
1851         new ShardTestKit(getSystem()) {{
1852             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1853                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1854                     "testTransactionCommitTimeout");
1855
1856             waitUntilLeader(shard);
1857
1858             final FiniteDuration duration = duration("5 seconds");
1859
1860             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1861
1862             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1863             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1864                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1865
1866             // Create 1st Tx - will timeout
1867
1868             final String transactionID1 = "tx1";
1869             final MutableCompositeModification modification1 = new MutableCompositeModification();
1870             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1871                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1872                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1873                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1874                     modification1);
1875
1876             // Create 2nd Tx
1877
1878             final String transactionID2 = "tx3";
1879             final MutableCompositeModification modification2 = new MutableCompositeModification();
1880             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1881                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1882             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1883                     listNodePath,
1884                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1885                     modification2);
1886
1887             // Ready the Tx's
1888
1889             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1890                     cohort1, modification1, true, false), getRef());
1891             expectMsgClass(duration, ReadyTransactionReply.class);
1892
1893             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1894                     cohort2, modification2, true, false), getRef());
1895             expectMsgClass(duration, ReadyTransactionReply.class);
1896
1897             // canCommit 1st Tx. We don't send the commit so it should timeout.
1898
1899             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1900             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1901
1902             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1903
1904             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1905             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1906
1907             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1908
1909             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1910             expectMsgClass(duration, akka.actor.Status.Failure.class);
1911
1912             // Commit the 2nd Tx.
1913
1914             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1915             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1916
1917             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1918             assertNotNull(listNodePath + " not found", node);
1919
1920             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1921         }};
1922     }
1923
1924     @Test
1925     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1926         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1927
1928         new ShardTestKit(getSystem()) {{
1929             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1930                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1931                     "testTransactionCommitQueueCapacityExceeded");
1932
1933             waitUntilLeader(shard);
1934
1935             final FiniteDuration duration = duration("5 seconds");
1936
1937             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1938
1939             final String transactionID1 = "tx1";
1940             final MutableCompositeModification modification1 = new MutableCompositeModification();
1941             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1942                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1943
1944             final String transactionID2 = "tx2";
1945             final MutableCompositeModification modification2 = new MutableCompositeModification();
1946             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1947                     TestModel.OUTER_LIST_PATH,
1948                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1949                     modification2);
1950
1951             final String transactionID3 = "tx3";
1952             final MutableCompositeModification modification3 = new MutableCompositeModification();
1953             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1954                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1955
1956             // Ready the Tx's
1957
1958             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1959                     cohort1, modification1, true, false), getRef());
1960             expectMsgClass(duration, ReadyTransactionReply.class);
1961
1962             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1963                     cohort2, modification2, true, false), getRef());
1964             expectMsgClass(duration, ReadyTransactionReply.class);
1965
1966             // The 3rd Tx should exceed queue capacity and fail.
1967
1968             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1969                     cohort3, modification3, true, false), getRef());
1970             expectMsgClass(duration, akka.actor.Status.Failure.class);
1971
1972             // canCommit 1st Tx.
1973
1974             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1975             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1976
1977             // canCommit the 2nd Tx - it should get queued.
1978
1979             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1980
1981             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1982
1983             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1984             expectMsgClass(duration, akka.actor.Status.Failure.class);
1985
1986             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1987         }};
1988     }
1989
1990     @Test
1991     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1992         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1993
1994         new ShardTestKit(getSystem()) {{
1995             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1996                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1997                     "testTransactionCommitWithPriorExpiredCohortEntries");
1998
1999             waitUntilLeader(shard);
2000
2001             final FiniteDuration duration = duration("5 seconds");
2002
2003             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2004
2005             final String transactionID1 = "tx1";
2006             final MutableCompositeModification modification1 = new MutableCompositeModification();
2007             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2008                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2009
2010             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2011                 cohort1, modification1, true, false), getRef());
2012             expectMsgClass(duration, ReadyTransactionReply.class);
2013
2014             final String transactionID2 = "tx2";
2015             final MutableCompositeModification modification2 = new MutableCompositeModification();
2016             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2017                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2018
2019             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2020                     cohort2, modification2, true, false), getRef());
2021             expectMsgClass(duration, ReadyTransactionReply.class);
2022
2023             final String transactionID3 = "tx3";
2024             final MutableCompositeModification modification3 = new MutableCompositeModification();
2025             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2026                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2027
2028             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2029                 cohort3, modification3, true, false), getRef());
2030             expectMsgClass(duration, ReadyTransactionReply.class);
2031
2032             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2033             // should expire from the queue and the last one should be processed.
2034
2035             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2036             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2037
2038             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2039         }};
2040     }
2041
2042     @Test
2043     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2044         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2045
2046         new ShardTestKit(getSystem()) {{
2047             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2048                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2049                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2050
2051             waitUntilLeader(shard);
2052
2053             final FiniteDuration duration = duration("5 seconds");
2054
2055             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2056
2057             final String transactionID1 = "tx1";
2058             final MutableCompositeModification modification1 = new MutableCompositeModification();
2059             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2060                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2061
2062             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2063                     cohort1, modification1, true, false), getRef());
2064             expectMsgClass(duration, ReadyTransactionReply.class);
2065
2066             // CanCommit the first one so it's the current in-progress CohortEntry.
2067
2068             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2069             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2070
2071             // Ready the second Tx.
2072
2073             final String transactionID2 = "tx2";
2074             final MutableCompositeModification modification2 = new MutableCompositeModification();
2075             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2076                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2077
2078             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2079                     cohort2, modification2, true, false), getRef());
2080             expectMsgClass(duration, ReadyTransactionReply.class);
2081
2082             // Ready the third Tx.
2083
2084             final String transactionID3 = "tx3";
2085             final DataTreeModification modification3 = dataStore.newModification();
2086             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2087                     .apply(modification3);
2088                 modification3.ready();
2089             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2090
2091             shard.tell(readyMessage, getRef());
2092
2093             // Commit the first Tx. After completing, the second should expire from the queue and the third
2094             // Tx committed.
2095
2096             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2097             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2098
2099             // Expect commit reply from the third Tx.
2100
2101             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2102
2103             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2104             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2105
2106             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2107         }};
2108     }
2109
2110     @Test
2111     public void testCanCommitBeforeReadyFailure() throws Throwable {
2112         new ShardTestKit(getSystem()) {{
2113             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2114                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2115                     "testCanCommitBeforeReadyFailure");
2116
2117             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2118             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2119
2120             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2121         }};
2122     }
2123
2124     @Test
2125     public void testAbortCurrentTransaction() throws Throwable {
2126         new ShardTestKit(getSystem()) {{
2127             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2128                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2129                     "testAbortCurrentTransaction");
2130
2131             waitUntilLeader(shard);
2132
2133             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2134
2135             final String transactionID1 = "tx1";
2136             final MutableCompositeModification modification1 = new MutableCompositeModification();
2137             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2138             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2139             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2140
2141             final String transactionID2 = "tx2";
2142             final MutableCompositeModification modification2 = new MutableCompositeModification();
2143             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2144             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2145
2146             final FiniteDuration duration = duration("5 seconds");
2147             final Timeout timeout = new Timeout(duration);
2148
2149             // Simulate the ForwardedReadyTransaction messages that would be sent
2150             // by the ShardTransaction.
2151
2152             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2153                     cohort1, modification1, true, false), getRef());
2154             expectMsgClass(duration, ReadyTransactionReply.class);
2155
2156             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2157                     cohort2, modification2, true, false), getRef());
2158             expectMsgClass(duration, ReadyTransactionReply.class);
2159
2160             // Send the CanCommitTransaction message for the first Tx.
2161
2162             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2163             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2164                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2165             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2166
2167             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2168             // processed after the first Tx completes.
2169
2170             final Future<Object> canCommitFuture = Patterns.ask(shard,
2171                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2172
2173             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2174             // Tx to proceed.
2175
2176             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2177             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2178
2179             // Wait for the 2nd Tx to complete the canCommit phase.
2180
2181             Await.ready(canCommitFuture, duration);
2182
2183             final InOrder inOrder = inOrder(cohort1, cohort2);
2184             inOrder.verify(cohort1).canCommit();
2185             inOrder.verify(cohort2).canCommit();
2186
2187             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2188         }};
2189     }
2190
2191     @Test
2192     public void testAbortQueuedTransaction() throws Throwable {
2193         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2194         new ShardTestKit(getSystem()) {{
2195             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2196             @SuppressWarnings("serial")
2197             final Creator<Shard> creator = new Creator<Shard>() {
2198                 @Override
2199                 public Shard create() throws Exception {
2200                     return new Shard(newShardBuilder()) {
2201                         @Override
2202                         public void onReceiveCommand(final Object message) throws Exception {
2203                             super.onReceiveCommand(message);
2204                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2205                                 if(cleaupCheckLatch.get() != null) {
2206                                     cleaupCheckLatch.get().countDown();
2207                                 }
2208                             }
2209                         }
2210                     };
2211                 }
2212             };
2213
2214             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2215                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2216                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2217
2218             waitUntilLeader(shard);
2219
2220             final String transactionID = "tx1";
2221
2222             final MutableCompositeModification modification = new MutableCompositeModification();
2223             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2224             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2225
2226             final FiniteDuration duration = duration("5 seconds");
2227
2228             // Ready the tx.
2229
2230             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2231                     cohort, modification, true, false), getRef());
2232             expectMsgClass(duration, ReadyTransactionReply.class);
2233
2234             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2235
2236             // Send the AbortTransaction message.
2237
2238             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2239             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2240
2241             verify(cohort).abort();
2242
2243             // Verify the tx cohort is removed from queue at the cleanup check interval.
2244
2245             cleaupCheckLatch.set(new CountDownLatch(1));
2246             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2247                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2248
2249             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2250
2251             // Now send CanCommitTransaction - should fail.
2252
2253             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2254
2255             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2256             assertTrue("Failure type", failure instanceof IllegalStateException);
2257
2258             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2259         }};
2260     }
2261
2262     @Test
2263     public void testCreateSnapshot() throws Exception {
2264         testCreateSnapshot(true, "testCreateSnapshot");
2265     }
2266
2267     @Test
2268     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2269         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2270     }
2271
2272     @SuppressWarnings("serial")
2273     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2274
2275         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2276
2277         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2278         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2279             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2280                 super(delegate);
2281             }
2282
2283             @Override
2284             public void saveSnapshot(final Object o) {
2285                 savedSnapshot.set(o);
2286                 super.saveSnapshot(o);
2287             }
2288         }
2289
2290         dataStoreContextBuilder.persistent(persistent);
2291
2292         new ShardTestKit(getSystem()) {{
2293             class TestShard extends Shard {
2294
2295                 protected TestShard(AbstractBuilder<?, ?> builder) {
2296                     super(builder);
2297                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2298                 }
2299
2300                 @Override
2301                 public void handleCommand(final Object message) {
2302                     super.handleCommand(message);
2303
2304                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2305                         latch.get().countDown();
2306                     }
2307                 }
2308
2309                 @Override
2310                 public RaftActorContext getRaftActorContext() {
2311                     return super.getRaftActorContext();
2312                 }
2313             }
2314
2315             final Creator<Shard> creator = new Creator<Shard>() {
2316                 @Override
2317                 public Shard create() throws Exception {
2318                     return new TestShard(newShardBuilder());
2319                 }
2320             };
2321
2322             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2323                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2324
2325             waitUntilLeader(shard);
2326             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2327
2328             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2329
2330             // Trigger creation of a snapshot by ensuring
2331             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2332             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2333             awaitAndValidateSnapshot(expectedRoot);
2334
2335             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2336             awaitAndValidateSnapshot(expectedRoot);
2337
2338             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2339         }
2340
2341             private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2342                                               ) throws InterruptedException {
2343                 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2344                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2345
2346                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2347                         savedSnapshot.get() instanceof Snapshot);
2348
2349                 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2350
2351                 latch.set(new CountDownLatch(1));
2352                 savedSnapshot.set(null);
2353             }
2354
2355             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2356
2357                 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2358                 assertEquals("Root node", expectedRoot, actual);
2359
2360            }
2361         };
2362     }
2363
2364     /**
2365      * This test simply verifies that the applySnapShot logic will work
2366      * @throws ReadFailedException
2367      * @throws DataValidationFailedException
2368      */
2369     @Test
2370     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2371         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2372         store.setSchemaContext(SCHEMA_CONTEXT);
2373
2374         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2375         putTransaction.write(TestModel.TEST_PATH,
2376             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2377         commitTransaction(store, putTransaction);
2378
2379
2380         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2381
2382         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2383
2384         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2385         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2386
2387         commitTransaction(store, writeTransaction);
2388
2389         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2390
2391         assertEquals(expected, actual);
2392     }
2393
2394     @Test
2395     public void testRecoveryApplicable(){
2396
2397         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2398                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2399
2400         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2401                 schemaContext(SCHEMA_CONTEXT).props();
2402
2403         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2404                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2405
2406         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2407                 schemaContext(SCHEMA_CONTEXT).props();
2408
2409         new ShardTestKit(getSystem()) {{
2410             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2411                     persistentProps, "testPersistence1");
2412
2413             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2414
2415             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2416
2417             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2418                     nonPersistentProps, "testPersistence2");
2419
2420             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2421
2422             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2423
2424         }};
2425
2426     }
2427
2428     @Test
2429     public void testOnDatastoreContext() {
2430         new ShardTestKit(getSystem()) {{
2431             dataStoreContextBuilder.persistent(true);
2432
2433             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2434
2435             assertEquals("isRecoveryApplicable", true,
2436                     shard.underlyingActor().persistence().isRecoveryApplicable());
2437
2438             waitUntilLeader(shard);
2439
2440             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2441
2442             assertEquals("isRecoveryApplicable", false,
2443                 shard.underlyingActor().persistence().isRecoveryApplicable());
2444
2445             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2446
2447             assertEquals("isRecoveryApplicable", true,
2448                 shard.underlyingActor().persistence().isRecoveryApplicable());
2449
2450             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2451         }};
2452     }
2453
2454     @Test
2455     public void testRegisterRoleChangeListener() throws Exception {
2456         new ShardTestKit(getSystem()) {
2457             {
2458                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2459                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2460                         "testRegisterRoleChangeListener");
2461
2462                 waitUntilLeader(shard);
2463
2464                 final TestActorRef<MessageCollectorActor> listener =
2465                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2466
2467                 shard.tell(new RegisterRoleChangeListener(), listener);
2468
2469                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2470
2471                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2472                     ShardLeaderStateChanged.class);
2473                 assertEquals("getLocalShardDataTree present", true,
2474                         leaderStateChanged.getLocalShardDataTree().isPresent());
2475                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2476                     leaderStateChanged.getLocalShardDataTree().get());
2477
2478                 MessageCollectorActor.clearMessages(listener);
2479
2480                 // Force a leader change
2481
2482                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2483
2484                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2485                         ShardLeaderStateChanged.class);
2486                 assertEquals("getLocalShardDataTree present", false,
2487                         leaderStateChanged.getLocalShardDataTree().isPresent());
2488
2489                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2490             }
2491         };
2492     }
2493
2494     @Test
2495     public void testFollowerInitialSyncStatus() throws Exception {
2496         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2497                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2498                 "testFollowerInitialSyncStatus");
2499
2500         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2501
2502         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2503
2504         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2505
2506         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2507
2508         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2509     }
2510
2511     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2512         modification.ready();
2513         store.validate(modification);
2514         store.commit(store.prepare(modification));
2515     }
2516
2517     @Test
2518     public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2519         new ShardTestKit(getSystem()) {{
2520             dataStoreContextBuilder.persistent(false);
2521             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2522             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2523             final Creator<Shard> creator = new Creator<Shard>() {
2524                 private static final long serialVersionUID = 1L;
2525                 boolean firstElectionTimeout = true;
2526
2527                 @Override
2528                 public Shard create() throws Exception {
2529                     return new Shard(newShardBuilder()) {
2530                         @Override
2531                         public void onReceiveCommand(final Object message) throws Exception {
2532                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
2533                                 firstElectionTimeout = false;
2534                                 final ActorRef self = getSelf();
2535                                 new Thread() {
2536                                     @Override
2537                                     public void run() {
2538                                         Uninterruptibles.awaitUninterruptibly(
2539                                             onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2540                                         self.tell(message, self);
2541                                     }
2542                                 }.start();
2543
2544                                 onFirstElectionTimeout.countDown();
2545                             } else {
2546                                 super.onReceiveCommand(message);
2547                             }
2548                         }
2549                     };
2550                 }
2551             };
2552
2553             final MockDataChangeListener listener = new MockDataChangeListener(1);
2554             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2555                 "testDataChangeListenerOnFollower-DataChangeListener");
2556
2557             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2558                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
2559                     withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
2560
2561             assertEquals("Got first ElectionTimeout", true,
2562                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2563
2564             shard.tell(new FindLeader(), getRef());
2565             final FindLeaderReply findLeadeReply =
2566                 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2567             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2568
2569             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2570
2571             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2572             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2573                 RegisterChangeListenerReply.class);
2574             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2575
2576             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2577
2578             onChangeListenerRegistered.countDown();
2579
2580             listener.waitForChangeEvents();
2581
2582             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2583             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2584         }};
2585     }
2586
2587     @Test
2588     public void testClusteredDataChangeListernerRegistration() throws Exception {
2589         dataStoreContextBuilder.persistent(false).build();
2590         new ShardTestKit(getSystem()) {{
2591             final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2592                 .shardName("inventory").type("config").build();
2593
2594             final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2595                 .shardName("inventory").type("config").build();
2596             final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2597                 private static final long serialVersionUID = 1L;
2598
2599                 @Override
2600                 public Shard create() throws Exception {
2601                     return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
2602                             peerAddresses(Collections.singletonMap(member2ShardID.toString(),
2603                                     "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
2604                         @Override
2605                         public void onReceiveCommand(final Object message) throws Exception {
2606
2607                             if(!(message instanceof ElectionTimeout)) {
2608                                 super.onReceiveCommand(message);
2609                             }
2610                         }
2611                     };
2612                 }
2613             };
2614
2615             final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2616                 private static final long serialVersionUID = 1L;
2617
2618                 @Override
2619                 public Shard create() throws Exception {
2620                     return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
2621                             peerAddresses(Collections.singletonMap(member1ShardID.toString(),
2622                                     "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
2623                 }
2624             };
2625
2626
2627             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2628                 Props.create(new DelegatingShardCreator(followerShardCreator)),
2629                 member1ShardID.toString());
2630
2631             final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2632                 Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
2633                 member2ShardID.toString());
2634             // Sleep to let election happen
2635             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2636
2637             shard.tell(new FindLeader(), getRef());
2638             final FindLeaderReply findLeaderReply =
2639                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2640             assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2641
2642             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2643             final MockDataChangeListener listener = new MockDataChangeListener(1);
2644             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2645                 "testDataChangeListenerOnFollower-DataChangeListener");
2646
2647             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2648             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2649                 RegisterChangeListenerReply.class);
2650             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2651
2652             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2653
2654             listener.waitForChangeEvents();
2655
2656             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2657             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2658         }};
2659     }
2660 }