Bug 4651: Implement handling of ClusteredDOMDataTreeChangeListener in CDS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Set;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Test;
49 import org.mockito.InOrder;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
54 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
74 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
75 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.Modification;
78 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
79 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
96 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
97 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
98 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
99 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
100 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
101 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
102 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
103 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
104 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
105 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
106 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
107 import org.opendaylight.yangtools.yang.common.QName;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
123 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
124 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
125 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
126 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
127 import scala.concurrent.Await;
128 import scala.concurrent.Future;
129 import scala.concurrent.duration.FiniteDuration;
130
131 public class ShardTest extends AbstractShardTest {
132     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
133
134     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
135
136     @Test
137     public void testRegisterChangeListener() throws Exception {
138         new ShardTestKit(getSystem()) {{
139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140                     newShardProps(),  "testRegisterChangeListener");
141
142             waitUntilLeader(shard);
143
144             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145
146             final MockDataChangeListener listener = new MockDataChangeListener(1);
147             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148                     "testRegisterChangeListener-DataChangeListener");
149
150             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
152
153             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154                     RegisterChangeListenerReply.class);
155             final String replyPath = reply.getListenerRegistrationPath().toString();
156             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158
159             final YangInstanceIdentifier path = TestModel.TEST_PATH;
160             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161
162             listener.waitForChangeEvents(path);
163
164             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
166         }};
167     }
168
169     @SuppressWarnings("serial")
170     @Test
171     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172         // This test tests the timing window in which a change listener is registered before the
173         // shard becomes the leader. We verify that the listener is registered and notified of the
174         // existing data when the shard becomes the leader.
175         new ShardTestKit(getSystem()) {{
176             // For this test, we want to send the RegisterChangeListener message after the shard
177             // has recovered from persistence and before it becomes the leader. So we subclass
178             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179             // we know that the shard has been initialized to a follower and has started the
180             // election process. The following 2 CountDownLatches are used to coordinate the
181             // ElectionTimeout with the sending of the RegisterChangeListener message.
182             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184             final Creator<Shard> creator = new Creator<Shard>() {
185                 boolean firstElectionTimeout = true;
186
187                 @Override
188                 public Shard create() throws Exception {
189                     // Use a non persistent provider because this test actually invokes persist on the journal
190                     // this will cause all other messages to not be queued properly after that.
191                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
192                     // it does do a persist)
193                     return new Shard(newShardBuilder()) {
194                         @Override
195                         public void onReceiveCommand(final Object message) throws Exception {
196                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
197                                 // Got the first ElectionTimeout. We don't forward it to the
198                                 // base Shard yet until we've sent the RegisterChangeListener
199                                 // message. So we signal the onFirstElectionTimeout latch to tell
200                                 // the main thread to send the RegisterChangeListener message and
201                                 // start a thread to wait on the onChangeListenerRegistered latch,
202                                 // which the main thread signals after it has sent the message.
203                                 // After the onChangeListenerRegistered is triggered, we send the
204                                 // original ElectionTimeout message to proceed with the election.
205                                 firstElectionTimeout = false;
206                                 final ActorRef self = getSelf();
207                                 new Thread() {
208                                     @Override
209                                     public void run() {
210                                         Uninterruptibles.awaitUninterruptibly(
211                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
212                                         self.tell(message, self);
213                                     }
214                                 }.start();
215
216                                 onFirstElectionTimeout.countDown();
217                             } else {
218                                 super.onReceiveCommand(message);
219                             }
220                         }
221                     };
222                 }
223             };
224
225             final MockDataChangeListener listener = new MockDataChangeListener(1);
226             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
227                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
228
229             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
230                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
231                     "testRegisterChangeListenerWhenNotLeaderInitially");
232
233             // Write initial data into the in-memory store.
234             final YangInstanceIdentifier path = TestModel.TEST_PATH;
235             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
236
237             // Wait until the shard receives the first ElectionTimeout message.
238             assertEquals("Got first ElectionTimeout", true,
239                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
240
241             // Now send the RegisterChangeListener and wait for the reply.
242             shard.tell(new RegisterChangeListener(path, dclActor,
243                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
244
245             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
246                     RegisterChangeListenerReply.class);
247             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
248
249             // Sanity check - verify the shard is not the leader yet.
250             shard.tell(new FindLeader(), getRef());
251             final FindLeaderReply findLeadeReply =
252                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
253             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
254
255             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
256             // with the election process.
257             onChangeListenerRegistered.countDown();
258
259             // Wait for the shard to become the leader and notify our listener with the existing
260             // data in the store.
261             listener.waitForChangeEvents(path);
262
263             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
264             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
265         }};
266     }
267
268     @Test
269     public void testRegisterDataTreeChangeListener() throws Exception {
270         new ShardTestKit(getSystem()) {{
271             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
272                     newShardProps(), "testRegisterDataTreeChangeListener");
273
274             waitUntilLeader(shard);
275
276             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
277
278             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
279             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
280                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
281
282             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
283
284             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
285                     RegisterDataTreeChangeListenerReply.class);
286             final String replyPath = reply.getListenerRegistrationPath().toString();
287             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
288                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
289
290             final YangInstanceIdentifier path = TestModel.TEST_PATH;
291             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
292
293             listener.waitForChangeEvents();
294
295             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
296             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
297         }};
298     }
299
300     @SuppressWarnings("serial")
301     @Test
302     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
303         new ShardTestKit(getSystem()) {{
304             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
305             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
306             final Creator<Shard> creator = new Creator<Shard>() {
307                 boolean firstElectionTimeout = true;
308
309                 @Override
310                 public Shard create() throws Exception {
311                     return new Shard(Shard.builder().id(shardID).datastoreContext(
312                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
313                         @Override
314                         public void onReceiveCommand(final Object message) throws Exception {
315                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
316                                 firstElectionTimeout = false;
317                                 final ActorRef self = getSelf();
318                                 new Thread() {
319                                     @Override
320                                     public void run() {
321                                         Uninterruptibles.awaitUninterruptibly(
322                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
323                                         self.tell(message, self);
324                                     }
325                                 }.start();
326
327                                 onFirstElectionTimeout.countDown();
328                             } else {
329                                 super.onReceiveCommand(message);
330                             }
331                         }
332                     };
333                 }
334             };
335
336             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
337             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
338                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
339
340             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
341                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
342                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
343
344             final YangInstanceIdentifier path = TestModel.TEST_PATH;
345             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
346
347             assertEquals("Got first ElectionTimeout", true,
348                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
349
350             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
351             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
352                 RegisterDataTreeChangeListenerReply.class);
353             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
354
355             shard.tell(new FindLeader(), getRef());
356             final FindLeaderReply findLeadeReply =
357                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
358             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
359
360             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
361
362             onChangeListenerRegistered.countDown();
363
364             // TODO: investigate why we do not receive data chage events
365             listener.waitForChangeEvents();
366
367             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
368             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
369         }};
370     }
371
372     @Test
373     public void testCreateTransaction(){
374         new ShardTestKit(getSystem()) {{
375             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
376
377             waitUntilLeader(shard);
378
379             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
380
381             shard.tell(new CreateTransaction("txn-1",
382                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
383
384             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
385                     CreateTransactionReply.class);
386
387             final String path = reply.getTransactionActorPath().toString();
388             assertTrue("Unexpected transaction path " + path,
389                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
390
391             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
392         }};
393     }
394
395     @Test
396     public void testCreateTransactionOnChain(){
397         new ShardTestKit(getSystem()) {{
398             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
399
400             waitUntilLeader(shard);
401
402             shard.tell(new CreateTransaction("txn-1",
403                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
404                     getRef());
405
406             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
407                     CreateTransactionReply.class);
408
409             final String path = reply.getTransactionActorPath().toString();
410             assertTrue("Unexpected transaction path " + path,
411                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
412
413             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
414         }};
415     }
416
417     @SuppressWarnings("serial")
418     @Test
419     public void testPeerAddressResolved() throws Exception {
420         new ShardTestKit(getSystem()) {{
421             final CountDownLatch recoveryComplete = new CountDownLatch(1);
422             class TestShard extends Shard {
423                 TestShard() {
424                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
425                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
426                             schemaContext(SCHEMA_CONTEXT));
427                 }
428
429                 String getPeerAddress(String id) {
430                     return getRaftActorContext().getPeerAddress(id);
431                 }
432
433                 @Override
434                 protected void onRecoveryComplete() {
435                     try {
436                         super.onRecoveryComplete();
437                     } finally {
438                         recoveryComplete.countDown();
439                     }
440                 }
441             }
442
443             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
444                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
445                         @Override
446                         public TestShard create() throws Exception {
447                             return new TestShard();
448                         }
449                     })), "testPeerAddressResolved");
450
451             assertEquals("Recovery complete", true,
452                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
453
454             final String address = "akka://foobar";
455             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
456
457             assertEquals("getPeerAddress", address,
458                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
459
460             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
461         }};
462     }
463
464     @Test
465     public void testApplySnapshot() throws Exception {
466
467         ShardTestKit testkit = new ShardTestKit(getSystem());
468
469         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
470                 "testApplySnapshot");
471
472         testkit.waitUntilLeader(shard);
473
474         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
475         store.setSchemaContext(SCHEMA_CONTEXT);
476
477         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
478                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
479                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
480                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
481
482         writeToStore(store, TestModel.TEST_PATH, container);
483
484         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
485         final NormalizedNode<?,?> expected = readStore(store, root);
486
487         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
488                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
489
490         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
491
492         final NormalizedNode<?,?> actual = readStore(shard, root);
493
494         assertEquals("Root node", expected, actual);
495
496         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
497     }
498
499     @Test
500     public void testApplyState() throws Exception {
501
502         ShardTestKit testkit = new ShardTestKit(getSystem());
503
504         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
505
506         testkit.waitUntilLeader(shard);
507
508         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
509
510         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
511                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
512
513         shard.underlyingActor().onReceiveCommand(applyState);
514
515         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
516         assertEquals("Applied state", node, actual);
517
518         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
519     }
520
521     @Test
522     public void testApplyStateWithCandidatePayload() throws Exception {
523
524         ShardTestKit testkit = new ShardTestKit(getSystem());
525
526         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
527
528         testkit.waitUntilLeader(shard);
529
530         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
531         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
532
533         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
534                 DataTreeCandidatePayload.create(candidate)));
535
536         shard.underlyingActor().onReceiveCommand(applyState);
537
538         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
539         assertEquals("Applied state", node, actual);
540
541         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
542     }
543
544     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
545         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
546         testStore.setSchemaContext(SCHEMA_CONTEXT);
547
548         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
549
550         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
551
552         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
553             SerializationUtils.serializeNormalizedNode(root),
554             Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
555         return testStore;
556     }
557
558     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
559         source.validate(mod);
560         final DataTreeCandidate candidate = source.prepare(mod);
561         source.commit(candidate);
562         return DataTreeCandidatePayload.create(candidate);
563     }
564
565     @Test
566     public void testDataTreeCandidateRecovery() throws Exception {
567         // Set up the InMemorySnapshotStore.
568         final DataTree source = setupInMemorySnapshotStore();
569
570         final DataTreeModification writeMod = source.takeSnapshot().newModification();
571         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
572         writeMod.ready();
573         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
574
575         // Set up the InMemoryJournal.
576         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
577
578         final int nListEntries = 16;
579         final Set<Integer> listEntryKeys = new HashSet<>();
580
581         // Add some ModificationPayload entries
582         for (int i = 1; i <= nListEntries; i++) {
583             listEntryKeys.add(Integer.valueOf(i));
584
585             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
586                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
587
588             final DataTreeModification mod = source.takeSnapshot().newModification();
589             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
590             mod.ready();
591             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
592                 payloadForModification(source, mod)));
593         }
594
595         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
596             new ApplyJournalEntries(nListEntries));
597
598         testRecovery(listEntryKeys);
599     }
600
601     @Test
602     public void testModicationRecovery() throws Exception {
603
604         // Set up the InMemorySnapshotStore.
605         setupInMemorySnapshotStore();
606
607         // Set up the InMemoryJournal.
608
609         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
610
611         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
612             new WriteModification(TestModel.OUTER_LIST_PATH,
613                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
614
615         final int nListEntries = 16;
616         final Set<Integer> listEntryKeys = new HashSet<>();
617
618         // Add some ModificationPayload entries
619         for(int i = 1; i <= nListEntries; i++) {
620             listEntryKeys.add(Integer.valueOf(i));
621             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
622                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
623             final Modification mod = new MergeModification(path,
624                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
625             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
626                     newModificationPayload(mod)));
627         }
628
629         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
630                 new ApplyJournalEntries(nListEntries));
631
632         testRecovery(listEntryKeys);
633     }
634
635     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
636         final MutableCompositeModification compMod = new MutableCompositeModification();
637         for(final Modification mod: mods) {
638             compMod.addModification(mod);
639         }
640
641         return new ModificationPayload(compMod);
642     }
643
644     @Test
645     public void testConcurrentThreePhaseCommits() throws Throwable {
646         new ShardTestKit(getSystem()) {{
647             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
648                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
649                     "testConcurrentThreePhaseCommits");
650
651             waitUntilLeader(shard);
652
653          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
654
655             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
656
657             final String transactionID1 = "tx1";
658             final MutableCompositeModification modification1 = new MutableCompositeModification();
659             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
660                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
661
662             final String transactionID2 = "tx2";
663             final MutableCompositeModification modification2 = new MutableCompositeModification();
664             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
665                     TestModel.OUTER_LIST_PATH,
666                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
667                     modification2);
668
669             final String transactionID3 = "tx3";
670             final MutableCompositeModification modification3 = new MutableCompositeModification();
671             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
672                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
673                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
674                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
675                     modification3);
676
677             final long timeoutSec = 5;
678             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
679             final Timeout timeout = new Timeout(duration);
680
681             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
682             // by the ShardTransaction.
683
684             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
685                     cohort1, modification1, true, false), getRef());
686             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
687                     expectMsgClass(duration, ReadyTransactionReply.class));
688             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
689
690             // Send the CanCommitTransaction message for the first Tx.
691
692             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
693             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
694                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
695             assertEquals("Can commit", true, canCommitReply.getCanCommit());
696
697             // Send the ForwardedReadyTransaction for the next 2 Tx's.
698
699             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
700                     cohort2, modification2, true, false), getRef());
701             expectMsgClass(duration, ReadyTransactionReply.class);
702
703             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
704                     cohort3, modification3, true, false), getRef());
705             expectMsgClass(duration, ReadyTransactionReply.class);
706
707             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
708             // processed after the first Tx completes.
709
710             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
711                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
712
713             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
714                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
715
716             // Send the CommitTransaction message for the first Tx. After it completes, it should
717             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
718
719             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
720             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
721
722             // Wait for the next 2 Tx's to complete.
723
724             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
725             final CountDownLatch commitLatch = new CountDownLatch(2);
726
727             class OnFutureComplete extends OnComplete<Object> {
728                 private final Class<?> expRespType;
729
730                 OnFutureComplete(final Class<?> expRespType) {
731                     this.expRespType = expRespType;
732                 }
733
734                 @Override
735                 public void onComplete(final Throwable error, final Object resp) {
736                     if(error != null) {
737                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
738                     } else {
739                         try {
740                             assertEquals("Commit response type", expRespType, resp.getClass());
741                             onSuccess(resp);
742                         } catch (final Exception e) {
743                             caughtEx.set(e);
744                         }
745                     }
746                 }
747
748                 void onSuccess(final Object resp) throws Exception {
749                 }
750             }
751
752             class OnCommitFutureComplete extends OnFutureComplete {
753                 OnCommitFutureComplete() {
754                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
755                 }
756
757                 @Override
758                 public void onComplete(final Throwable error, final Object resp) {
759                     super.onComplete(error, resp);
760                     commitLatch.countDown();
761                 }
762             }
763
764             class OnCanCommitFutureComplete extends OnFutureComplete {
765                 private final String transactionID;
766
767                 OnCanCommitFutureComplete(final String transactionID) {
768                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
769                     this.transactionID = transactionID;
770                 }
771
772                 @Override
773                 void onSuccess(final Object resp) throws Exception {
774                     final CanCommitTransactionReply canCommitReply =
775                             CanCommitTransactionReply.fromSerializable(resp);
776                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
777
778                     final Future<Object> commitFuture = Patterns.ask(shard,
779                             new CommitTransaction(transactionID).toSerializable(), timeout);
780                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
781                 }
782             }
783
784             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
785                     getSystem().dispatcher());
786
787             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
788                     getSystem().dispatcher());
789
790             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
791
792             if(caughtEx.get() != null) {
793                 throw caughtEx.get();
794             }
795
796             assertEquals("Commits complete", true, done);
797
798             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
799             inOrder.verify(cohort1).canCommit();
800             inOrder.verify(cohort1).preCommit();
801             inOrder.verify(cohort1).commit();
802             inOrder.verify(cohort2).canCommit();
803             inOrder.verify(cohort2).preCommit();
804             inOrder.verify(cohort2).commit();
805             inOrder.verify(cohort3).canCommit();
806             inOrder.verify(cohort3).preCommit();
807             inOrder.verify(cohort3).commit();
808
809             // Verify data in the data store.
810
811             verifyOuterListEntry(shard, 1);
812
813             verifyLastApplied(shard, 2);
814
815             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
816         }};
817     }
818
819     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
820             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
821         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
822     }
823
824     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
825             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
826             final int messagesSent) {
827         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
828         batched.addModification(new WriteModification(path, data));
829         batched.setReady(ready);
830         batched.setDoCommitOnReady(doCommitOnReady);
831         batched.setTotalMessagesSent(messagesSent);
832         return batched;
833     }
834
835     @Test
836     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
837         new ShardTestKit(getSystem()) {{
838             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
840                     "testBatchedModificationsWithNoCommitOnReady");
841
842             waitUntilLeader(shard);
843
844             final String transactionID = "tx";
845             final FiniteDuration duration = duration("5 seconds");
846
847             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
848             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
849                 @Override
850                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
851                     if(mockCohort.get() == null) {
852                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
853                     }
854
855                     return mockCohort.get();
856                 }
857             };
858
859             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
860
861             // Send a BatchedModifications to start a transaction.
862
863             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
864                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
865             expectMsgClass(duration, BatchedModificationsReply.class);
866
867             // Send a couple more BatchedModifications.
868
869             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
870                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
871             expectMsgClass(duration, BatchedModificationsReply.class);
872
873             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
874                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
875                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
876             expectMsgClass(duration, ReadyTransactionReply.class);
877
878             // Send the CanCommitTransaction message.
879
880             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
881             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
882                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
883             assertEquals("Can commit", true, canCommitReply.getCanCommit());
884
885             // Send the CanCommitTransaction message.
886
887             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
888             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
889
890             final InOrder inOrder = inOrder(mockCohort.get());
891             inOrder.verify(mockCohort.get()).canCommit();
892             inOrder.verify(mockCohort.get()).preCommit();
893             inOrder.verify(mockCohort.get()).commit();
894
895             // Verify data in the data store.
896
897             verifyOuterListEntry(shard, 1);
898
899             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
900         }};
901     }
902
903     @Test
904     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
905         new ShardTestKit(getSystem()) {{
906             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
907                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
908                     "testBatchedModificationsWithCommitOnReady");
909
910             waitUntilLeader(shard);
911
912             final String transactionID = "tx";
913             final FiniteDuration duration = duration("5 seconds");
914
915             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
916             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
917                 @Override
918                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
919                     if(mockCohort.get() == null) {
920                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
921                     }
922
923                     return mockCohort.get();
924                 }
925             };
926
927             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
928
929             // Send a BatchedModifications to start a transaction.
930
931             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
932                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
933             expectMsgClass(duration, BatchedModificationsReply.class);
934
935             // Send a couple more BatchedModifications.
936
937             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
938                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
939             expectMsgClass(duration, BatchedModificationsReply.class);
940
941             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
942                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
943                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
944
945             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
946
947             final InOrder inOrder = inOrder(mockCohort.get());
948             inOrder.verify(mockCohort.get()).canCommit();
949             inOrder.verify(mockCohort.get()).preCommit();
950             inOrder.verify(mockCohort.get()).commit();
951
952             // Verify data in the data store.
953
954             verifyOuterListEntry(shard, 1);
955
956             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
957         }};
958     }
959
960     @Test(expected=IllegalStateException.class)
961     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
962         new ShardTestKit(getSystem()) {{
963             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
964                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
965                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
966
967             waitUntilLeader(shard);
968
969             final String transactionID = "tx1";
970             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
971             batched.setReady(true);
972             batched.setTotalMessagesSent(2);
973
974             shard.tell(batched, getRef());
975
976             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
977
978             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
979
980             if(failure != null) {
981                 throw failure.cause();
982             }
983         }};
984     }
985
986     @Test
987     public void testBatchedModificationsWithOperationFailure() throws Throwable {
988         new ShardTestKit(getSystem()) {{
989             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
990                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
991                     "testBatchedModificationsWithOperationFailure");
992
993             waitUntilLeader(shard);
994
995             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
996             // write will not validate the children for performance reasons.
997
998             String transactionID = "tx1";
999
1000             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1001                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1002                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1003
1004             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1005             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1006             shard.tell(batched, getRef());
1007             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1008
1009             Throwable cause = failure.cause();
1010
1011             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1012             batched.setReady(true);
1013             batched.setTotalMessagesSent(2);
1014
1015             shard.tell(batched, getRef());
1016
1017             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1018             assertEquals("Failure cause", cause, failure.cause());
1019
1020             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1021         }};
1022     }
1023
1024     @SuppressWarnings("unchecked")
1025     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1026         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1027         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1028         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1029                 outerList.getValue() instanceof Iterable);
1030         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1031         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1032                 entry instanceof MapEntryNode);
1033         final MapEntryNode mapEntry = (MapEntryNode)entry;
1034         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1035                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1036         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1037         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1038     }
1039
1040     @Test
1041     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1042         new ShardTestKit(getSystem()) {{
1043             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1044                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1045                     "testBatchedModificationsOnTransactionChain");
1046
1047             waitUntilLeader(shard);
1048
1049             final String transactionChainID = "txChain";
1050             final String transactionID1 = "tx1";
1051             final String transactionID2 = "tx2";
1052
1053             final FiniteDuration duration = duration("5 seconds");
1054
1055             // Send a BatchedModifications to start a chained write transaction and ready it.
1056
1057             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1058             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1059             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1060                     containerNode, true, false, 1), getRef());
1061             expectMsgClass(duration, ReadyTransactionReply.class);
1062
1063             // Create a read Tx on the same chain.
1064
1065             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1066                     transactionChainID).toSerializable(), getRef());
1067
1068             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1069
1070             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1071             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1072             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1073
1074             // Commit the write transaction.
1075
1076             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1077             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1078                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1079             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1080
1081             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1082             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1083
1084             // Verify data in the data store.
1085
1086             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1087             assertEquals("Stored node", containerNode, actualNode);
1088
1089             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1090         }};
1091     }
1092
1093     @Test
1094     public void testOnBatchedModificationsWhenNotLeader() {
1095         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1096         new ShardTestKit(getSystem()) {{
1097             final Creator<Shard> creator = new Creator<Shard>() {
1098                 private static final long serialVersionUID = 1L;
1099
1100                 @Override
1101                 public Shard create() throws Exception {
1102                     return new Shard(newShardBuilder()) {
1103                         @Override
1104                         protected boolean isLeader() {
1105                             return overrideLeaderCalls.get() ? false : super.isLeader();
1106                         }
1107
1108                         @Override
1109                         protected ActorSelection getLeader() {
1110                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1111                                 super.getLeader();
1112                         }
1113                     };
1114                 }
1115             };
1116
1117             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1118                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1119
1120             waitUntilLeader(shard);
1121
1122             overrideLeaderCalls.set(true);
1123
1124             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1125
1126             shard.tell(batched, ActorRef.noSender());
1127
1128             expectMsgEquals(batched);
1129
1130             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1131         }};
1132     }
1133
1134     @Test
1135     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1136         new ShardTestKit(getSystem()) {{
1137             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1138                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1139                     "testForwardedReadyTransactionWithImmediateCommit");
1140
1141             waitUntilLeader(shard);
1142
1143             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1144
1145             final String transactionID = "tx1";
1146             final MutableCompositeModification modification = new MutableCompositeModification();
1147             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1148             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1149                     TestModel.TEST_PATH, containerNode, modification);
1150
1151             final FiniteDuration duration = duration("5 seconds");
1152
1153             // Simulate the ForwardedReadyTransaction messages that would be sent
1154             // by the ShardTransaction.
1155
1156             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1157                     cohort, modification, true, true), getRef());
1158
1159             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1160
1161             final InOrder inOrder = inOrder(cohort);
1162             inOrder.verify(cohort).canCommit();
1163             inOrder.verify(cohort).preCommit();
1164             inOrder.verify(cohort).commit();
1165
1166             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1167             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1168
1169             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1170         }};
1171     }
1172
1173     @Test
1174     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1175         new ShardTestKit(getSystem()) {{
1176             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1177                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1178                     "testReadyLocalTransactionWithImmediateCommit");
1179
1180             waitUntilLeader(shard);
1181
1182             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1183
1184             final DataTreeModification modification = dataStore.newModification();
1185
1186             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1187             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1188             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1189             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1190
1191             final String txId = "tx1";
1192             modification.ready();
1193             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1194
1195             shard.tell(readyMessage, getRef());
1196
1197             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1198
1199             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1200             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1201
1202             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1203         }};
1204     }
1205
1206     @Test
1207     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1208         new ShardTestKit(getSystem()) {{
1209             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1210                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1211                     "testReadyLocalTransactionWithThreePhaseCommit");
1212
1213             waitUntilLeader(shard);
1214
1215             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1216
1217             final DataTreeModification modification = dataStore.newModification();
1218
1219             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1220             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1221             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1222             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1223
1224             final String txId = "tx1";
1225                 modification.ready();
1226             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1227
1228             shard.tell(readyMessage, getRef());
1229
1230             expectMsgClass(ReadyTransactionReply.class);
1231
1232             // Send the CanCommitTransaction message.
1233
1234             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1235             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1236                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1237             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1238
1239             // Send the CanCommitTransaction message.
1240
1241             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1242             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1243
1244             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1245             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1246
1247             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1248         }};
1249     }
1250
1251     @Test
1252     public void testCommitWithPersistenceDisabled() throws Throwable {
1253         dataStoreContextBuilder.persistent(false);
1254         new ShardTestKit(getSystem()) {{
1255             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1256                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1257                     "testCommitWithPersistenceDisabled");
1258
1259             waitUntilLeader(shard);
1260
1261             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1262
1263             // Setup a simulated transactions with a mock cohort.
1264
1265             final String transactionID = "tx";
1266             final MutableCompositeModification modification = new MutableCompositeModification();
1267             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1268             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1269                 TestModel.TEST_PATH, containerNode, modification);
1270
1271             final FiniteDuration duration = duration("5 seconds");
1272
1273             // Simulate the ForwardedReadyTransaction messages that would be sent
1274             // by the ShardTransaction.
1275
1276             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1277                 cohort, modification, true, false), getRef());
1278             expectMsgClass(duration, ReadyTransactionReply.class);
1279
1280             // Send the CanCommitTransaction message.
1281
1282             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1283             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1284                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1285             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1286
1287             // Send the CanCommitTransaction message.
1288
1289             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1290             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1291
1292             final InOrder inOrder = inOrder(cohort);
1293             inOrder.verify(cohort).canCommit();
1294             inOrder.verify(cohort).preCommit();
1295             inOrder.verify(cohort).commit();
1296
1297             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1298             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1299
1300             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1301         }};
1302     }
1303
1304     private static DataTreeCandidateTip mockCandidate(final String name) {
1305         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1306         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1307         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1308         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1309         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1310         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1311         return mockCandidate;
1312     }
1313
1314     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1315         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1316         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1317         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1318         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1319         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1320         return mockCandidate;
1321     }
1322
1323     @Test
1324     public void testCommitWhenTransactionHasNoModifications(){
1325         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1326         // but here that need not happen
1327         new ShardTestKit(getSystem()) {
1328             {
1329                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1330                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1331                         "testCommitWhenTransactionHasNoModifications");
1332
1333                 waitUntilLeader(shard);
1334
1335                 final String transactionID = "tx1";
1336                 final MutableCompositeModification modification = new MutableCompositeModification();
1337                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1338                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1339                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1340                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1341                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1342
1343                 final FiniteDuration duration = duration("5 seconds");
1344
1345                 // Simulate the ForwardedReadyTransaction messages that would be sent
1346                 // by the ShardTransaction.
1347
1348                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1349                         cohort, modification, true, false), getRef());
1350                 expectMsgClass(duration, ReadyTransactionReply.class);
1351
1352                 // Send the CanCommitTransaction message.
1353
1354                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1355                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1356                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1357                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1358
1359                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1360                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1361
1362                 final InOrder inOrder = inOrder(cohort);
1363                 inOrder.verify(cohort).canCommit();
1364                 inOrder.verify(cohort).preCommit();
1365                 inOrder.verify(cohort).commit();
1366
1367                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1368                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1369
1370                 // Use MBean for verification
1371                 // Committed transaction count should increase as usual
1372                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1373
1374                 // Commit index should not advance because this does not go into the journal
1375                 assertEquals(-1, shardStats.getCommitIndex());
1376
1377                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1378
1379             }
1380         };
1381     }
1382
1383     @Test
1384     public void testCommitWhenTransactionHasModifications(){
1385         new ShardTestKit(getSystem()) {
1386             {
1387                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1388                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1389                         "testCommitWhenTransactionHasModifications");
1390
1391                 waitUntilLeader(shard);
1392
1393                 final String transactionID = "tx1";
1394                 final MutableCompositeModification modification = new MutableCompositeModification();
1395                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1396                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1397                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1398                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1399                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1400                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1401
1402                 final FiniteDuration duration = duration("5 seconds");
1403
1404                 // Simulate the ForwardedReadyTransaction messages that would be sent
1405                 // by the ShardTransaction.
1406
1407                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1408                         cohort, modification, true, false), getRef());
1409                 expectMsgClass(duration, ReadyTransactionReply.class);
1410
1411                 // Send the CanCommitTransaction message.
1412
1413                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1414                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1415                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1416                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1417
1418                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1419                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1420
1421                 final InOrder inOrder = inOrder(cohort);
1422                 inOrder.verify(cohort).canCommit();
1423                 inOrder.verify(cohort).preCommit();
1424                 inOrder.verify(cohort).commit();
1425
1426                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1427                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1428
1429                 // Use MBean for verification
1430                 // Committed transaction count should increase as usual
1431                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1432
1433                 // Commit index should advance as we do not have an empty modification
1434                 assertEquals(0, shardStats.getCommitIndex());
1435
1436                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1437
1438             }
1439         };
1440     }
1441
1442     @Test
1443     public void testCommitPhaseFailure() throws Throwable {
1444         new ShardTestKit(getSystem()) {{
1445             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1446                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1447                     "testCommitPhaseFailure");
1448
1449             waitUntilLeader(shard);
1450
1451             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1452             // commit phase.
1453
1454             final String transactionID1 = "tx1";
1455             final MutableCompositeModification modification1 = new MutableCompositeModification();
1456             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1457             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1458             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1459             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1460             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1461
1462             final String transactionID2 = "tx2";
1463             final MutableCompositeModification modification2 = new MutableCompositeModification();
1464             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1465             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1466
1467             final FiniteDuration duration = duration("5 seconds");
1468             final Timeout timeout = new Timeout(duration);
1469
1470             // Simulate the ForwardedReadyTransaction messages that would be sent
1471             // by the ShardTransaction.
1472
1473             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1474                     cohort1, modification1, true, false), getRef());
1475             expectMsgClass(duration, ReadyTransactionReply.class);
1476
1477             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1478                     cohort2, modification2, true, false), getRef());
1479             expectMsgClass(duration, ReadyTransactionReply.class);
1480
1481             // Send the CanCommitTransaction message for the first Tx.
1482
1483             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1484             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1485                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1486             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1487
1488             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1489             // processed after the first Tx completes.
1490
1491             final Future<Object> canCommitFuture = Patterns.ask(shard,
1492                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1493
1494             // Send the CommitTransaction message for the first Tx. This should send back an error
1495             // and trigger the 2nd Tx to proceed.
1496
1497             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1498             expectMsgClass(duration, akka.actor.Status.Failure.class);
1499
1500             // Wait for the 2nd Tx to complete the canCommit phase.
1501
1502             final CountDownLatch latch = new CountDownLatch(1);
1503             canCommitFuture.onComplete(new OnComplete<Object>() {
1504                 @Override
1505                 public void onComplete(final Throwable t, final Object resp) {
1506                     latch.countDown();
1507                 }
1508             }, getSystem().dispatcher());
1509
1510             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1511
1512             final InOrder inOrder = inOrder(cohort1, cohort2);
1513             inOrder.verify(cohort1).canCommit();
1514             inOrder.verify(cohort1).preCommit();
1515             inOrder.verify(cohort1).commit();
1516             inOrder.verify(cohort2).canCommit();
1517
1518             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1519         }};
1520     }
1521
1522     @Test
1523     public void testPreCommitPhaseFailure() throws Throwable {
1524         new ShardTestKit(getSystem()) {{
1525             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1526                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1527                     "testPreCommitPhaseFailure");
1528
1529             waitUntilLeader(shard);
1530
1531             final String transactionID1 = "tx1";
1532             final MutableCompositeModification modification1 = new MutableCompositeModification();
1533             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1534             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1535             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1536
1537             final String transactionID2 = "tx2";
1538             final MutableCompositeModification modification2 = new MutableCompositeModification();
1539             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1540             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1541
1542             final FiniteDuration duration = duration("5 seconds");
1543             final Timeout timeout = new Timeout(duration);
1544
1545             // Simulate the ForwardedReadyTransaction messages that would be sent
1546             // by the ShardTransaction.
1547
1548             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1549                     cohort1, modification1, true, false), getRef());
1550             expectMsgClass(duration, ReadyTransactionReply.class);
1551
1552             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1553                     cohort2, modification2, true, false), getRef());
1554             expectMsgClass(duration, ReadyTransactionReply.class);
1555
1556             // Send the CanCommitTransaction message for the first Tx.
1557
1558             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1559             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1560                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1561             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1562
1563             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1564             // processed after the first Tx completes.
1565
1566             final Future<Object> canCommitFuture = Patterns.ask(shard,
1567                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1568
1569             // Send the CommitTransaction message for the first Tx. This should send back an error
1570             // and trigger the 2nd Tx to proceed.
1571
1572             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1573             expectMsgClass(duration, akka.actor.Status.Failure.class);
1574
1575             // Wait for the 2nd Tx to complete the canCommit phase.
1576
1577             final CountDownLatch latch = new CountDownLatch(1);
1578             canCommitFuture.onComplete(new OnComplete<Object>() {
1579                 @Override
1580                 public void onComplete(final Throwable t, final Object resp) {
1581                     latch.countDown();
1582                 }
1583             }, getSystem().dispatcher());
1584
1585             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1586
1587             final InOrder inOrder = inOrder(cohort1, cohort2);
1588             inOrder.verify(cohort1).canCommit();
1589             inOrder.verify(cohort1).preCommit();
1590             inOrder.verify(cohort2).canCommit();
1591
1592             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1593         }};
1594     }
1595
1596     @Test
1597     public void testCanCommitPhaseFailure() throws Throwable {
1598         new ShardTestKit(getSystem()) {{
1599             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1600                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1601                     "testCanCommitPhaseFailure");
1602
1603             waitUntilLeader(shard);
1604
1605             final FiniteDuration duration = duration("5 seconds");
1606
1607             final String transactionID1 = "tx1";
1608             final MutableCompositeModification modification = new MutableCompositeModification();
1609             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1610             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1611
1612             // Simulate the ForwardedReadyTransaction messages that would be sent
1613             // by the ShardTransaction.
1614
1615             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1616                     cohort, modification, true, false), getRef());
1617             expectMsgClass(duration, ReadyTransactionReply.class);
1618
1619             // Send the CanCommitTransaction message.
1620
1621             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1622             expectMsgClass(duration, akka.actor.Status.Failure.class);
1623
1624             // Send another can commit to ensure the failed one got cleaned up.
1625
1626             reset(cohort);
1627
1628             final String transactionID2 = "tx2";
1629             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1630
1631             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1632                     cohort, modification, true, false), getRef());
1633             expectMsgClass(duration, ReadyTransactionReply.class);
1634
1635             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1636             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1637                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1638             assertEquals("getCanCommit", true, reply.getCanCommit());
1639
1640             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1641         }};
1642     }
1643
1644     @Test
1645     public void testCanCommitPhaseFalseResponse() throws Throwable {
1646         new ShardTestKit(getSystem()) {{
1647             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1648                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1649                     "testCanCommitPhaseFalseResponse");
1650
1651             waitUntilLeader(shard);
1652
1653             final FiniteDuration duration = duration("5 seconds");
1654
1655             final String transactionID1 = "tx1";
1656             final MutableCompositeModification modification = new MutableCompositeModification();
1657             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1658             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1659
1660             // Simulate the ForwardedReadyTransaction messages that would be sent
1661             // by the ShardTransaction.
1662
1663             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1664                     cohort, modification, true, false), getRef());
1665             expectMsgClass(duration, ReadyTransactionReply.class);
1666
1667             // Send the CanCommitTransaction message.
1668
1669             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1670             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1671                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1672             assertEquals("getCanCommit", false, reply.getCanCommit());
1673
1674             // Send another can commit to ensure the failed one got cleaned up.
1675
1676             reset(cohort);
1677
1678             final String transactionID2 = "tx2";
1679             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1680
1681             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1682                 cohort, modification, true, false), getRef());
1683             expectMsgClass(duration, ReadyTransactionReply.class);
1684
1685             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1686             reply = CanCommitTransactionReply.fromSerializable(
1687                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1688             assertEquals("getCanCommit", true, reply.getCanCommit());
1689
1690             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1691         }};
1692     }
1693
1694     @Test
1695     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1696         new ShardTestKit(getSystem()) {{
1697             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1698                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1699                     "testImmediateCommitWithCanCommitPhaseFailure");
1700
1701             waitUntilLeader(shard);
1702
1703             final FiniteDuration duration = duration("5 seconds");
1704
1705             final String transactionID1 = "tx1";
1706             final MutableCompositeModification modification = new MutableCompositeModification();
1707             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1708             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1709
1710             // Simulate the ForwardedReadyTransaction messages that would be sent
1711             // by the ShardTransaction.
1712
1713             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1714                     cohort, modification, true, true), getRef());
1715
1716             expectMsgClass(duration, akka.actor.Status.Failure.class);
1717
1718             // Send another can commit to ensure the failed one got cleaned up.
1719
1720             reset(cohort);
1721
1722             final String transactionID2 = "tx2";
1723             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1724             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1725             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1726             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1727             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1728             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1729             doReturn(candidateRoot).when(candidate).getRootNode();
1730             doReturn(candidate).when(cohort).getCandidate();
1731
1732             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1733                     cohort, modification, true, true), getRef());
1734
1735             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1736
1737             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1738         }};
1739     }
1740
1741     @Test
1742     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1743         new ShardTestKit(getSystem()) {{
1744             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1745                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1746                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1747
1748             waitUntilLeader(shard);
1749
1750             final FiniteDuration duration = duration("5 seconds");
1751
1752             final String transactionID = "tx1";
1753             final MutableCompositeModification modification = new MutableCompositeModification();
1754             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1755             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1756
1757             // Simulate the ForwardedReadyTransaction messages that would be sent
1758             // by the ShardTransaction.
1759
1760             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1761                     cohort, modification, true, true), getRef());
1762
1763             expectMsgClass(duration, akka.actor.Status.Failure.class);
1764
1765             // Send another can commit to ensure the failed one got cleaned up.
1766
1767             reset(cohort);
1768
1769             final String transactionID2 = "tx2";
1770             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1771             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1772             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1773             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1774             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1775             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1776             doReturn(candidateRoot).when(candidate).getRootNode();
1777             doReturn(candidate).when(cohort).getCandidate();
1778
1779             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1780                     cohort, modification, true, true), getRef());
1781
1782             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1783
1784             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1785         }};
1786     }
1787
1788     @Test
1789     public void testAbortBeforeFinishCommit() throws Throwable {
1790         new ShardTestKit(getSystem()) {{
1791             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1792                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1793                     "testAbortBeforeFinishCommit");
1794
1795             waitUntilLeader(shard);
1796
1797             final FiniteDuration duration = duration("5 seconds");
1798             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1799
1800             final String transactionID = "tx1";
1801             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1802                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1803                 @Override
1804                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1805                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1806
1807                     // Simulate an AbortTransaction message occurring during replication, after
1808                     // persisting and before finishing the commit to the in-memory store.
1809                     // We have no followers so due to optimizations in the RaftActor, it does not
1810                     // attempt replication and thus we can't send an AbortTransaction message b/c
1811                     // it would be processed too late after CommitTransaction completes. So we'll
1812                     // simulate an AbortTransaction message occurring during replication by calling
1813                     // the shard directly.
1814                     //
1815                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1816
1817                     return preCommitFuture;
1818                 }
1819             };
1820
1821             final MutableCompositeModification modification = new MutableCompositeModification();
1822             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1823                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1824                     modification, preCommit);
1825
1826             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1827                 cohort, modification, true, false), getRef());
1828             expectMsgClass(duration, ReadyTransactionReply.class);
1829
1830             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1831             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1832                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1833             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1834
1835             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1836             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1837
1838             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1839
1840             // Since we're simulating an abort occurring during replication and before finish commit,
1841             // the data should still get written to the in-memory store since we've gotten past
1842             // canCommit and preCommit and persisted the data.
1843             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1844
1845             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1846         }};
1847     }
1848
1849     @Test
1850     public void testTransactionCommitTimeout() throws Throwable {
1851         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1852
1853         new ShardTestKit(getSystem()) {{
1854             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1855                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1856                     "testTransactionCommitTimeout");
1857
1858             waitUntilLeader(shard);
1859
1860             final FiniteDuration duration = duration("5 seconds");
1861
1862             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1863
1864             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1865             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1866                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1867
1868             // Create 1st Tx - will timeout
1869
1870             final String transactionID1 = "tx1";
1871             final MutableCompositeModification modification1 = new MutableCompositeModification();
1872             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1873                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1874                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1875                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1876                     modification1);
1877
1878             // Create 2nd Tx
1879
1880             final String transactionID2 = "tx3";
1881             final MutableCompositeModification modification2 = new MutableCompositeModification();
1882             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1883                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1884             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1885                     listNodePath,
1886                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1887                     modification2);
1888
1889             // Ready the Tx's
1890
1891             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1892                     cohort1, modification1, true, false), getRef());
1893             expectMsgClass(duration, ReadyTransactionReply.class);
1894
1895             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1896                     cohort2, modification2, true, false), getRef());
1897             expectMsgClass(duration, ReadyTransactionReply.class);
1898
1899             // canCommit 1st Tx. We don't send the commit so it should timeout.
1900
1901             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1902             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1903
1904             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1905
1906             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1907             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1908
1909             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1910
1911             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1912             expectMsgClass(duration, akka.actor.Status.Failure.class);
1913
1914             // Commit the 2nd Tx.
1915
1916             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1917             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1918
1919             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1920             assertNotNull(listNodePath + " not found", node);
1921
1922             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1923         }};
1924     }
1925
1926     @Test
1927     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1928         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1929
1930         new ShardTestKit(getSystem()) {{
1931             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1932                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1933                     "testTransactionCommitQueueCapacityExceeded");
1934
1935             waitUntilLeader(shard);
1936
1937             final FiniteDuration duration = duration("5 seconds");
1938
1939             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1940
1941             final String transactionID1 = "tx1";
1942             final MutableCompositeModification modification1 = new MutableCompositeModification();
1943             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1944                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1945
1946             final String transactionID2 = "tx2";
1947             final MutableCompositeModification modification2 = new MutableCompositeModification();
1948             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1949                     TestModel.OUTER_LIST_PATH,
1950                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1951                     modification2);
1952
1953             final String transactionID3 = "tx3";
1954             final MutableCompositeModification modification3 = new MutableCompositeModification();
1955             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1956                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1957
1958             // Ready the Tx's
1959
1960             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1961                     cohort1, modification1, true, false), getRef());
1962             expectMsgClass(duration, ReadyTransactionReply.class);
1963
1964             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1965                     cohort2, modification2, true, false), getRef());
1966             expectMsgClass(duration, ReadyTransactionReply.class);
1967
1968             // The 3rd Tx should exceed queue capacity and fail.
1969
1970             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1971                     cohort3, modification3, true, false), getRef());
1972             expectMsgClass(duration, akka.actor.Status.Failure.class);
1973
1974             // canCommit 1st Tx.
1975
1976             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1977             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1978
1979             // canCommit the 2nd Tx - it should get queued.
1980
1981             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1982
1983             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1984
1985             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1986             expectMsgClass(duration, akka.actor.Status.Failure.class);
1987
1988             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1989         }};
1990     }
1991
1992     @Test
1993     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1994         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1995
1996         new ShardTestKit(getSystem()) {{
1997             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1998                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1999                     "testTransactionCommitWithPriorExpiredCohortEntries");
2000
2001             waitUntilLeader(shard);
2002
2003             final FiniteDuration duration = duration("5 seconds");
2004
2005             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2006
2007             final String transactionID1 = "tx1";
2008             final MutableCompositeModification modification1 = new MutableCompositeModification();
2009             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2010                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2011
2012             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2013                 cohort1, modification1, true, false), getRef());
2014             expectMsgClass(duration, ReadyTransactionReply.class);
2015
2016             final String transactionID2 = "tx2";
2017             final MutableCompositeModification modification2 = new MutableCompositeModification();
2018             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2019                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2020
2021             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2022                     cohort2, modification2, true, false), getRef());
2023             expectMsgClass(duration, ReadyTransactionReply.class);
2024
2025             final String transactionID3 = "tx3";
2026             final MutableCompositeModification modification3 = new MutableCompositeModification();
2027             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2028                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2029
2030             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2031                 cohort3, modification3, true, false), getRef());
2032             expectMsgClass(duration, ReadyTransactionReply.class);
2033
2034             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2035             // should expire from the queue and the last one should be processed.
2036
2037             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2038             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2039
2040             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2041         }};
2042     }
2043
2044     @Test
2045     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2046         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2047
2048         new ShardTestKit(getSystem()) {{
2049             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2050                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2051                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2052
2053             waitUntilLeader(shard);
2054
2055             final FiniteDuration duration = duration("5 seconds");
2056
2057             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2058
2059             final String transactionID1 = "tx1";
2060             final MutableCompositeModification modification1 = new MutableCompositeModification();
2061             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2062                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2063
2064             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2065                     cohort1, modification1, true, false), getRef());
2066             expectMsgClass(duration, ReadyTransactionReply.class);
2067
2068             // CanCommit the first one so it's the current in-progress CohortEntry.
2069
2070             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2071             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2072
2073             // Ready the second Tx.
2074
2075             final String transactionID2 = "tx2";
2076             final MutableCompositeModification modification2 = new MutableCompositeModification();
2077             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2078                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2079
2080             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2081                     cohort2, modification2, true, false), getRef());
2082             expectMsgClass(duration, ReadyTransactionReply.class);
2083
2084             // Ready the third Tx.
2085
2086             final String transactionID3 = "tx3";
2087             final DataTreeModification modification3 = dataStore.newModification();
2088             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2089                     .apply(modification3);
2090                 modification3.ready();
2091             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2092
2093             shard.tell(readyMessage, getRef());
2094
2095             // Commit the first Tx. After completing, the second should expire from the queue and the third
2096             // Tx committed.
2097
2098             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2099             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2100
2101             // Expect commit reply from the third Tx.
2102
2103             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2104
2105             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2106             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2107
2108             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2109         }};
2110     }
2111
2112     @Test
2113     public void testCanCommitBeforeReadyFailure() throws Throwable {
2114         new ShardTestKit(getSystem()) {{
2115             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2116                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2117                     "testCanCommitBeforeReadyFailure");
2118
2119             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2120             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2121
2122             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2123         }};
2124     }
2125
2126     @Test
2127     public void testAbortCurrentTransaction() throws Throwable {
2128         new ShardTestKit(getSystem()) {{
2129             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2130                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2131                     "testAbortCurrentTransaction");
2132
2133             waitUntilLeader(shard);
2134
2135             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2136
2137             final String transactionID1 = "tx1";
2138             final MutableCompositeModification modification1 = new MutableCompositeModification();
2139             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2140             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2141             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2142
2143             final String transactionID2 = "tx2";
2144             final MutableCompositeModification modification2 = new MutableCompositeModification();
2145             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2146             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2147
2148             final FiniteDuration duration = duration("5 seconds");
2149             final Timeout timeout = new Timeout(duration);
2150
2151             // Simulate the ForwardedReadyTransaction messages that would be sent
2152             // by the ShardTransaction.
2153
2154             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2155                     cohort1, modification1, true, false), getRef());
2156             expectMsgClass(duration, ReadyTransactionReply.class);
2157
2158             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2159                     cohort2, modification2, true, false), getRef());
2160             expectMsgClass(duration, ReadyTransactionReply.class);
2161
2162             // Send the CanCommitTransaction message for the first Tx.
2163
2164             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2165             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2166                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2167             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2168
2169             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2170             // processed after the first Tx completes.
2171
2172             final Future<Object> canCommitFuture = Patterns.ask(shard,
2173                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2174
2175             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2176             // Tx to proceed.
2177
2178             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2179             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2180
2181             // Wait for the 2nd Tx to complete the canCommit phase.
2182
2183             Await.ready(canCommitFuture, duration);
2184
2185             final InOrder inOrder = inOrder(cohort1, cohort2);
2186             inOrder.verify(cohort1).canCommit();
2187             inOrder.verify(cohort2).canCommit();
2188
2189             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2190         }};
2191     }
2192
2193     @Test
2194     public void testAbortQueuedTransaction() throws Throwable {
2195         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2196         new ShardTestKit(getSystem()) {{
2197             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2198             @SuppressWarnings("serial")
2199             final Creator<Shard> creator = new Creator<Shard>() {
2200                 @Override
2201                 public Shard create() throws Exception {
2202                     return new Shard(newShardBuilder()) {
2203                         @Override
2204                         public void onReceiveCommand(final Object message) throws Exception {
2205                             super.onReceiveCommand(message);
2206                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2207                                 if(cleaupCheckLatch.get() != null) {
2208                                     cleaupCheckLatch.get().countDown();
2209                                 }
2210                             }
2211                         }
2212                     };
2213                 }
2214             };
2215
2216             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2217                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2218                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2219
2220             waitUntilLeader(shard);
2221
2222             final String transactionID = "tx1";
2223
2224             final MutableCompositeModification modification = new MutableCompositeModification();
2225             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2226             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2227
2228             final FiniteDuration duration = duration("5 seconds");
2229
2230             // Ready the tx.
2231
2232             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2233                     cohort, modification, true, false), getRef());
2234             expectMsgClass(duration, ReadyTransactionReply.class);
2235
2236             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2237
2238             // Send the AbortTransaction message.
2239
2240             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2241             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2242
2243             verify(cohort).abort();
2244
2245             // Verify the tx cohort is removed from queue at the cleanup check interval.
2246
2247             cleaupCheckLatch.set(new CountDownLatch(1));
2248             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2249                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2250
2251             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2252
2253             // Now send CanCommitTransaction - should fail.
2254
2255             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2256
2257             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2258             assertTrue("Failure type", failure instanceof IllegalStateException);
2259
2260             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2261         }};
2262     }
2263
2264     @Test
2265     public void testCreateSnapshot() throws Exception {
2266         testCreateSnapshot(true, "testCreateSnapshot");
2267     }
2268
2269     @Test
2270     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2271         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2272     }
2273
2274     @SuppressWarnings("serial")
2275     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2276
2277         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2278
2279         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2280         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2281             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2282                 super(delegate);
2283             }
2284
2285             @Override
2286             public void saveSnapshot(final Object o) {
2287                 savedSnapshot.set(o);
2288                 super.saveSnapshot(o);
2289             }
2290         }
2291
2292         dataStoreContextBuilder.persistent(persistent);
2293
2294         new ShardTestKit(getSystem()) {{
2295             class TestShard extends Shard {
2296
2297                 protected TestShard(AbstractBuilder<?, ?> builder) {
2298                     super(builder);
2299                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2300                 }
2301
2302                 @Override
2303                 public void handleCommand(final Object message) {
2304                     super.handleCommand(message);
2305
2306                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2307                         latch.get().countDown();
2308                     }
2309                 }
2310
2311                 @Override
2312                 public RaftActorContext getRaftActorContext() {
2313                     return super.getRaftActorContext();
2314                 }
2315             }
2316
2317             final Creator<Shard> creator = new Creator<Shard>() {
2318                 @Override
2319                 public Shard create() throws Exception {
2320                     return new TestShard(newShardBuilder());
2321                 }
2322             };
2323
2324             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2325                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2326
2327             waitUntilLeader(shard);
2328             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2329
2330             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2331
2332             // Trigger creation of a snapshot by ensuring
2333             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2334             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2335             awaitAndValidateSnapshot(expectedRoot);
2336
2337             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2338             awaitAndValidateSnapshot(expectedRoot);
2339
2340             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2341         }
2342
2343             private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2344                                               ) throws InterruptedException {
2345                 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2346                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2347
2348                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2349                         savedSnapshot.get() instanceof Snapshot);
2350
2351                 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2352
2353                 latch.set(new CountDownLatch(1));
2354                 savedSnapshot.set(null);
2355             }
2356
2357             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2358
2359                 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2360                 assertEquals("Root node", expectedRoot, actual);
2361
2362            }
2363         };
2364     }
2365
2366     /**
2367      * This test simply verifies that the applySnapShot logic will work
2368      * @throws ReadFailedException
2369      * @throws DataValidationFailedException
2370      */
2371     @Test
2372     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2373         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2374         store.setSchemaContext(SCHEMA_CONTEXT);
2375
2376         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2377         putTransaction.write(TestModel.TEST_PATH,
2378             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2379         commitTransaction(store, putTransaction);
2380
2381
2382         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2383
2384         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2385
2386         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2387         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2388
2389         commitTransaction(store, writeTransaction);
2390
2391         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2392
2393         assertEquals(expected, actual);
2394     }
2395
2396     @Test
2397     public void testRecoveryApplicable(){
2398
2399         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2400                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2401
2402         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2403                 schemaContext(SCHEMA_CONTEXT).props();
2404
2405         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2406                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2407
2408         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2409                 schemaContext(SCHEMA_CONTEXT).props();
2410
2411         new ShardTestKit(getSystem()) {{
2412             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2413                     persistentProps, "testPersistence1");
2414
2415             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2416
2417             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2418
2419             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2420                     nonPersistentProps, "testPersistence2");
2421
2422             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2423
2424             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2425
2426         }};
2427
2428     }
2429
2430     @Test
2431     public void testOnDatastoreContext() {
2432         new ShardTestKit(getSystem()) {{
2433             dataStoreContextBuilder.persistent(true);
2434
2435             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2436
2437             assertEquals("isRecoveryApplicable", true,
2438                     shard.underlyingActor().persistence().isRecoveryApplicable());
2439
2440             waitUntilLeader(shard);
2441
2442             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2443
2444             assertEquals("isRecoveryApplicable", false,
2445                 shard.underlyingActor().persistence().isRecoveryApplicable());
2446
2447             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2448
2449             assertEquals("isRecoveryApplicable", true,
2450                 shard.underlyingActor().persistence().isRecoveryApplicable());
2451
2452             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2453         }};
2454     }
2455
2456     @Test
2457     public void testRegisterRoleChangeListener() throws Exception {
2458         new ShardTestKit(getSystem()) {
2459             {
2460                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2461                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2462                         "testRegisterRoleChangeListener");
2463
2464                 waitUntilLeader(shard);
2465
2466                 final TestActorRef<MessageCollectorActor> listener =
2467                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2468
2469                 shard.tell(new RegisterRoleChangeListener(), listener);
2470
2471                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2472
2473                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2474                     ShardLeaderStateChanged.class);
2475                 assertEquals("getLocalShardDataTree present", true,
2476                         leaderStateChanged.getLocalShardDataTree().isPresent());
2477                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2478                     leaderStateChanged.getLocalShardDataTree().get());
2479
2480                 MessageCollectorActor.clearMessages(listener);
2481
2482                 // Force a leader change
2483
2484                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2485
2486                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2487                         ShardLeaderStateChanged.class);
2488                 assertEquals("getLocalShardDataTree present", false,
2489                         leaderStateChanged.getLocalShardDataTree().isPresent());
2490
2491                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2492             }
2493         };
2494     }
2495
2496     @Test
2497     public void testFollowerInitialSyncStatus() throws Exception {
2498         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2499                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2500                 "testFollowerInitialSyncStatus");
2501
2502         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2503
2504         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2505
2506         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2507
2508         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2509
2510         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2511     }
2512
2513     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2514         modification.ready();
2515         store.validate(modification);
2516         store.commit(store.prepare(modification));
2517     }
2518
2519     @Test
2520     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2521         new ShardTestKit(getSystem()) {{
2522             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2523             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2524
2525             final MockDataChangeListener listener = new MockDataChangeListener(1);
2526             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2527                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2528
2529             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2530                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2531                     actorFactory.generateActorId(testName + "-shard"));
2532
2533             waitUntilNoLeader(shard);
2534
2535             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2536
2537             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2538             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2539                 RegisterChangeListenerReply.class);
2540             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2541
2542             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2543
2544             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2545
2546             listener.waitForChangeEvents();
2547         }};
2548     }
2549
2550     @Test
2551     public void testClusteredDataChangeListenerRegistration() throws Exception {
2552         new ShardTestKit(getSystem()) {{
2553             String testName = "testClusteredDataChangeListenerRegistration";
2554             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2555                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2556
2557             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2558                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2559
2560             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2561                     Shard.builder().id(followerShardID).
2562                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2563                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2564                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2565                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2566
2567             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2568                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2569                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2570                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2571                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2572
2573             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2574             String leaderPath = waitUntilLeader(followerShard);
2575             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2576
2577             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2578             final MockDataChangeListener listener = new MockDataChangeListener(1);
2579             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2580                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2581
2582             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2583             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2584                 RegisterChangeListenerReply.class);
2585             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2586
2587             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2588
2589             listener.waitForChangeEvents();
2590         }};
2591     }
2592
2593     @Test
2594     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2595         new ShardTestKit(getSystem()) {{
2596             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2597             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2598
2599             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2600             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2601                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2602
2603             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2604                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2605                     actorFactory.generateActorId(testName + "-shard"));
2606
2607             waitUntilNoLeader(shard);
2608
2609             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2610
2611             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2612             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2613                     RegisterDataTreeChangeListenerReply.class);
2614             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2615
2616             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2617
2618             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2619
2620             listener.waitForChangeEvents();
2621         }};
2622     }
2623
2624     @Test
2625     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2626         new ShardTestKit(getSystem()) {{
2627             String testName = "testClusteredDataTreeChangeListenerRegistration";
2628             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2629                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2630
2631             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2632                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2633
2634             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2635                     Shard.builder().id(followerShardID).
2636                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2637                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2638                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2639                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2640
2641             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2642                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2643                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2644                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2645                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2646
2647             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2648             String leaderPath = waitUntilLeader(followerShard);
2649             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2650
2651             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2652             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2653             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2654                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2655
2656             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2657             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2658                     RegisterDataTreeChangeListenerReply.class);
2659             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2660
2661             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2662
2663             listener.waitForChangeEvents();
2664         }};
2665     }
2666
2667     @Test
2668     public void testServerRemoved() throws Exception {
2669         final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
2670
2671         final ActorRef shard = parent.underlyingActor().context().actorOf(
2672                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2673                 "testServerRemoved");
2674
2675         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2676
2677         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2678
2679     }
2680
2681 }