0121a15338a2ed807e910a793e6655a062ba35de
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.util.Collections;
41 import java.util.HashSet;
42 import java.util.Set;
43 import java.util.concurrent.CountDownLatch;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.Test;
48 import org.mockito.InOrder;
49 import org.opendaylight.controller.cluster.DataPersistenceProvider;
50 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
51 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
54 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
72 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
73 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
74 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
75 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
76 import org.opendaylight.controller.cluster.datastore.modification.Modification;
77 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
79 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
80 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
81 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
83 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
84 import org.opendaylight.controller.cluster.raft.RaftActorContext;
85 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
86 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
87 import org.opendaylight.controller.cluster.raft.Snapshot;
88 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
89 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
90 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
93 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
94 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
95 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
96 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
97 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
98 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
99 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
100 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
101 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
102 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
103 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
104 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
105 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
122 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
123 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
124 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
125 import scala.concurrent.Await;
126 import scala.concurrent.Future;
127 import scala.concurrent.duration.FiniteDuration;
128
129 public class ShardTest extends AbstractShardTest {
130     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
131
132     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
133
134     @Test
135     public void testRegisterChangeListener() throws Exception {
136         new ShardTestKit(getSystem()) {{
137             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
138                     newShardProps(),  "testRegisterChangeListener");
139
140             waitUntilLeader(shard);
141
142             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
143
144             final MockDataChangeListener listener = new MockDataChangeListener(1);
145             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
146                     "testRegisterChangeListener-DataChangeListener");
147
148             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
149                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
150
151             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
152                     RegisterChangeListenerReply.class);
153             final String replyPath = reply.getListenerRegistrationPath().toString();
154             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
155                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
156
157             final YangInstanceIdentifier path = TestModel.TEST_PATH;
158             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
159
160             listener.waitForChangeEvents(path);
161
162             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
163             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
164         }};
165     }
166
167     @SuppressWarnings("serial")
168     @Test
169     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
170         // This test tests the timing window in which a change listener is registered before the
171         // shard becomes the leader. We verify that the listener is registered and notified of the
172         // existing data when the shard becomes the leader.
173         new ShardTestKit(getSystem()) {{
174             // For this test, we want to send the RegisterChangeListener message after the shard
175             // has recovered from persistence and before it becomes the leader. So we subclass
176             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
177             // we know that the shard has been initialized to a follower and has started the
178             // election process. The following 2 CountDownLatches are used to coordinate the
179             // ElectionTimeout with the sending of the RegisterChangeListener message.
180             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
181             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
182             final Creator<Shard> creator = new Creator<Shard>() {
183                 boolean firstElectionTimeout = true;
184
185                 @Override
186                 public Shard create() throws Exception {
187                     // Use a non persistent provider because this test actually invokes persist on the journal
188                     // this will cause all other messages to not be queued properly after that.
189                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
190                     // it does do a persist)
191                     return new Shard(newShardBuilder()) {
192                         @Override
193                         public void onReceiveCommand(final Object message) throws Exception {
194                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
195                                 // Got the first ElectionTimeout. We don't forward it to the
196                                 // base Shard yet until we've sent the RegisterChangeListener
197                                 // message. So we signal the onFirstElectionTimeout latch to tell
198                                 // the main thread to send the RegisterChangeListener message and
199                                 // start a thread to wait on the onChangeListenerRegistered latch,
200                                 // which the main thread signals after it has sent the message.
201                                 // After the onChangeListenerRegistered is triggered, we send the
202                                 // original ElectionTimeout message to proceed with the election.
203                                 firstElectionTimeout = false;
204                                 final ActorRef self = getSelf();
205                                 new Thread() {
206                                     @Override
207                                     public void run() {
208                                         Uninterruptibles.awaitUninterruptibly(
209                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
210                                         self.tell(message, self);
211                                     }
212                                 }.start();
213
214                                 onFirstElectionTimeout.countDown();
215                             } else {
216                                 super.onReceiveCommand(message);
217                             }
218                         }
219                     };
220                 }
221             };
222
223             final MockDataChangeListener listener = new MockDataChangeListener(1);
224             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
225                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
226
227             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
228                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
229                     "testRegisterChangeListenerWhenNotLeaderInitially");
230
231             // Write initial data into the in-memory store.
232             final YangInstanceIdentifier path = TestModel.TEST_PATH;
233             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
234
235             // Wait until the shard receives the first ElectionTimeout message.
236             assertEquals("Got first ElectionTimeout", true,
237                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
238
239             // Now send the RegisterChangeListener and wait for the reply.
240             shard.tell(new RegisterChangeListener(path, dclActor,
241                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
242
243             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
244                     RegisterChangeListenerReply.class);
245             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
246
247             // Sanity check - verify the shard is not the leader yet.
248             shard.tell(new FindLeader(), getRef());
249             final FindLeaderReply findLeadeReply =
250                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
251             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
252
253             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
254             // with the election process.
255             onChangeListenerRegistered.countDown();
256
257             // Wait for the shard to become the leader and notify our listener with the existing
258             // data in the store.
259             listener.waitForChangeEvents(path);
260
261             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
262             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
263         }};
264     }
265
266     @Test
267     public void testRegisterDataTreeChangeListener() throws Exception {
268         new ShardTestKit(getSystem()) {{
269             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
270                     newShardProps(), "testRegisterDataTreeChangeListener");
271
272             waitUntilLeader(shard);
273
274             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
275
276             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
277             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
278                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
279
280             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
281
282             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
283                     RegisterDataTreeChangeListenerReply.class);
284             final String replyPath = reply.getListenerRegistrationPath().toString();
285             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
286                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
287
288             final YangInstanceIdentifier path = TestModel.TEST_PATH;
289             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
290
291             listener.waitForChangeEvents();
292
293             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
294             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
295         }};
296     }
297
298     @SuppressWarnings("serial")
299     @Test
300     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
301         new ShardTestKit(getSystem()) {{
302             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
303             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
304             final Creator<Shard> creator = new Creator<Shard>() {
305                 boolean firstElectionTimeout = true;
306
307                 @Override
308                 public Shard create() throws Exception {
309                     return new Shard(Shard.builder().id(shardID).datastoreContext(
310                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
311                         @Override
312                         public void onReceiveCommand(final Object message) throws Exception {
313                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
314                                 firstElectionTimeout = false;
315                                 final ActorRef self = getSelf();
316                                 new Thread() {
317                                     @Override
318                                     public void run() {
319                                         Uninterruptibles.awaitUninterruptibly(
320                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
321                                         self.tell(message, self);
322                                     }
323                                 }.start();
324
325                                 onFirstElectionTimeout.countDown();
326                             } else {
327                                 super.onReceiveCommand(message);
328                             }
329                         }
330                     };
331                 }
332             };
333
334             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
335             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
336                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
337
338             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
339                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
340                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
341
342             final YangInstanceIdentifier path = TestModel.TEST_PATH;
343             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
344
345             assertEquals("Got first ElectionTimeout", true,
346                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
347
348             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
349             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
350                 RegisterDataTreeChangeListenerReply.class);
351             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
352
353             shard.tell(new FindLeader(), getRef());
354             final FindLeaderReply findLeadeReply =
355                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
356             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
357
358             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
359
360             onChangeListenerRegistered.countDown();
361
362             // TODO: investigate why we do not receive data chage events
363             listener.waitForChangeEvents();
364
365             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
366             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
367         }};
368     }
369
370     @Test
371     public void testCreateTransaction(){
372         new ShardTestKit(getSystem()) {{
373             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
374
375             waitUntilLeader(shard);
376
377             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
378
379             shard.tell(new CreateTransaction("txn-1",
380                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
381
382             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
383                     CreateTransactionReply.class);
384
385             final String path = reply.getTransactionActorPath().toString();
386             assertTrue("Unexpected transaction path " + path,
387                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
388
389             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
390         }};
391     }
392
393     @Test
394     public void testCreateTransactionOnChain(){
395         new ShardTestKit(getSystem()) {{
396             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
397
398             waitUntilLeader(shard);
399
400             shard.tell(new CreateTransaction("txn-1",
401                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
402                     getRef());
403
404             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
405                     CreateTransactionReply.class);
406
407             final String path = reply.getTransactionActorPath().toString();
408             assertTrue("Unexpected transaction path " + path,
409                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
410
411             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
412         }};
413     }
414
415     @SuppressWarnings("serial")
416     @Test
417     public void testPeerAddressResolved() throws Exception {
418         new ShardTestKit(getSystem()) {{
419             final CountDownLatch recoveryComplete = new CountDownLatch(1);
420             class TestShard extends Shard {
421                 TestShard() {
422                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
423                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
424                             schemaContext(SCHEMA_CONTEXT));
425                 }
426
427                 String getPeerAddress(String id) {
428                     return getRaftActorContext().getPeerAddress(id);
429                 }
430
431                 @Override
432                 protected void onRecoveryComplete() {
433                     try {
434                         super.onRecoveryComplete();
435                     } finally {
436                         recoveryComplete.countDown();
437                     }
438                 }
439             }
440
441             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
442                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
443                         @Override
444                         public TestShard create() throws Exception {
445                             return new TestShard();
446                         }
447                     })), "testPeerAddressResolved");
448
449             assertEquals("Recovery complete", true,
450                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
451
452             final String address = "akka://foobar";
453             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
454
455             assertEquals("getPeerAddress", address,
456                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
457
458             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
459         }};
460     }
461
462     @Test
463     public void testApplySnapshot() throws Exception {
464
465         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
466                 "testApplySnapshot");
467
468         ShardTestKit.waitUntilLeader(shard);
469
470         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
471         store.setSchemaContext(SCHEMA_CONTEXT);
472
473         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
474                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
475                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
476                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
477
478         writeToStore(store, TestModel.TEST_PATH, container);
479
480         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
481         final NormalizedNode<?,?> expected = readStore(store, root);
482
483         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
484                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
485
486         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
487
488         final NormalizedNode<?,?> actual = readStore(shard, root);
489
490         assertEquals("Root node", expected, actual);
491
492         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
493     }
494
495     @Test
496     public void testApplyState() throws Exception {
497         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
498
499         ShardTestKit.waitUntilLeader(shard);
500
501         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502
503         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
504                 newDataTreeCandidatePayload(new WriteModification(TestModel.TEST_PATH, node))));
505
506         shard.underlyingActor().onReceiveCommand(applyState);
507
508         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
509         assertEquals("Applied state", node, actual);
510
511         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
512     }
513
514     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
515         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
516         testStore.setSchemaContext(SCHEMA_CONTEXT);
517
518         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
519
520         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
521
522         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
523                 SerializationUtils.serializeNormalizedNode(root),
524                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
525         return testStore;
526     }
527
528     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
529         source.validate(mod);
530         final DataTreeCandidate candidate = source.prepare(mod);
531         source.commit(candidate);
532         return DataTreeCandidatePayload.create(candidate);
533     }
534
535     @Test
536     public void testDataTreeCandidateRecovery() throws Exception {
537         // Set up the InMemorySnapshotStore.
538         final DataTree source = setupInMemorySnapshotStore();
539
540         final DataTreeModification writeMod = source.takeSnapshot().newModification();
541         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
542         writeMod.ready();
543         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
544
545         // Set up the InMemoryJournal.
546         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
547
548         final int nListEntries = 16;
549         final Set<Integer> listEntryKeys = new HashSet<>();
550
551         // Add some ModificationPayload entries
552         for (int i = 1; i <= nListEntries; i++) {
553             listEntryKeys.add(Integer.valueOf(i));
554
555             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
556                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
557
558             final DataTreeModification mod = source.takeSnapshot().newModification();
559             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
560             mod.ready();
561             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
562                 payloadForModification(source, mod)));
563         }
564
565         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
566             new ApplyJournalEntries(nListEntries));
567
568         testRecovery(listEntryKeys);
569     }
570
571     @Test
572     public void testModicationRecovery() throws Exception {
573
574         // Set up the InMemorySnapshotStore.
575         setupInMemorySnapshotStore();
576
577         // Set up the InMemoryJournal.
578
579         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
580
581         ShardDataTree shardDataTree = new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION);
582
583         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newDataTreeCandidatePayload(
584                 shardDataTree,
585                 new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
586                 new WriteModification(TestModel.OUTER_LIST_PATH,
587                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
588
589         final int nListEntries = 16;
590         final Set<Integer> listEntryKeys = new HashSet<>();
591
592         // Add some ModificationPayload entries
593         for(int i = 1; i <= nListEntries; i++) {
594             listEntryKeys.add(Integer.valueOf(i));
595             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
596                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
597             final Modification mod = new MergeModification(path,
598                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
599             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
600                     newDataTreeCandidatePayload(shardDataTree, mod)));
601         }
602
603         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
604                 new ApplyJournalEntries(nListEntries));
605
606         testRecovery(listEntryKeys);
607     }
608
609     private static DataTreeCandidatePayload newDataTreeCandidatePayload(final Modification... mods) throws Exception {
610         return newDataTreeCandidatePayload(new ShardDataTree(SCHEMA_CONTEXT, TreeType.CONFIGURATION), mods);
611     }
612
613     private static DataTreeCandidatePayload newDataTreeCandidatePayload(ShardDataTree shardDataTree,
614             final Modification... mods) throws Exception {
615         DataTreeModification dataTreeModification = shardDataTree.newModification();
616         for(final Modification mod: mods) {
617             mod.apply(dataTreeModification);
618         }
619
620         return DataTreeCandidatePayload.create(shardDataTree.commit(dataTreeModification));
621     }
622
623     @Test
624     public void testConcurrentThreePhaseCommits() throws Throwable {
625         new ShardTestKit(getSystem()) {{
626             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
627                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
628                     "testConcurrentThreePhaseCommits");
629
630             waitUntilLeader(shard);
631
632          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
633
634             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
635
636             final String transactionID1 = "tx1";
637             final MutableCompositeModification modification1 = new MutableCompositeModification();
638             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
639                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
640
641             final String transactionID2 = "tx2";
642             final MutableCompositeModification modification2 = new MutableCompositeModification();
643             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
644                     TestModel.OUTER_LIST_PATH,
645                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
646                     modification2);
647
648             final String transactionID3 = "tx3";
649             final MutableCompositeModification modification3 = new MutableCompositeModification();
650             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
651                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
652                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
653                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
654                     modification3);
655
656             final long timeoutSec = 5;
657             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
658             final Timeout timeout = new Timeout(duration);
659
660             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
661             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
662                     expectMsgClass(duration, ReadyTransactionReply.class));
663             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
664
665             // Send the CanCommitTransaction message for the first Tx.
666
667             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
668             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
669                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
670             assertEquals("Can commit", true, canCommitReply.getCanCommit());
671
672             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
673             expectMsgClass(duration, ReadyTransactionReply.class);
674
675             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
676             expectMsgClass(duration, ReadyTransactionReply.class);
677
678             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
679             // processed after the first Tx completes.
680
681             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
682                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
683
684             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
685                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
686
687             // Send the CommitTransaction message for the first Tx. After it completes, it should
688             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
689
690             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
691             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
692
693             // Wait for the next 2 Tx's to complete.
694
695             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
696             final CountDownLatch commitLatch = new CountDownLatch(2);
697
698             class OnFutureComplete extends OnComplete<Object> {
699                 private final Class<?> expRespType;
700
701                 OnFutureComplete(final Class<?> expRespType) {
702                     this.expRespType = expRespType;
703                 }
704
705                 @Override
706                 public void onComplete(final Throwable error, final Object resp) {
707                     if(error != null) {
708                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
709                     } else {
710                         try {
711                             assertEquals("Commit response type", expRespType, resp.getClass());
712                             onSuccess(resp);
713                         } catch (final Exception e) {
714                             caughtEx.set(e);
715                         }
716                     }
717                 }
718
719                 void onSuccess(final Object resp) throws Exception {
720                 }
721             }
722
723             class OnCommitFutureComplete extends OnFutureComplete {
724                 OnCommitFutureComplete() {
725                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
726                 }
727
728                 @Override
729                 public void onComplete(final Throwable error, final Object resp) {
730                     super.onComplete(error, resp);
731                     commitLatch.countDown();
732                 }
733             }
734
735             class OnCanCommitFutureComplete extends OnFutureComplete {
736                 private final String transactionID;
737
738                 OnCanCommitFutureComplete(final String transactionID) {
739                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
740                     this.transactionID = transactionID;
741                 }
742
743                 @Override
744                 void onSuccess(final Object resp) throws Exception {
745                     final CanCommitTransactionReply canCommitReply =
746                             CanCommitTransactionReply.fromSerializable(resp);
747                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
748
749                     final Future<Object> commitFuture = Patterns.ask(shard,
750                             new CommitTransaction(transactionID).toSerializable(), timeout);
751                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
752                 }
753             }
754
755             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
756                     getSystem().dispatcher());
757
758             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
759                     getSystem().dispatcher());
760
761             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
762
763             if(caughtEx.get() != null) {
764                 throw caughtEx.get();
765             }
766
767             assertEquals("Commits complete", true, done);
768
769             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
770             inOrder.verify(cohort1).canCommit();
771             inOrder.verify(cohort1).preCommit();
772             inOrder.verify(cohort1).commit();
773             inOrder.verify(cohort2).canCommit();
774             inOrder.verify(cohort2).preCommit();
775             inOrder.verify(cohort2).commit();
776             inOrder.verify(cohort3).canCommit();
777             inOrder.verify(cohort3).preCommit();
778             inOrder.verify(cohort3).commit();
779
780             // Verify data in the data store.
781
782             verifyOuterListEntry(shard, 1);
783
784             verifyLastApplied(shard, 2);
785
786             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
787         }};
788     }
789
790     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
791             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
792         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
793     }
794
795     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
796             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
797             final int messagesSent) {
798         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
799         batched.addModification(new WriteModification(path, data));
800         batched.setReady(ready);
801         batched.setDoCommitOnReady(doCommitOnReady);
802         batched.setTotalMessagesSent(messagesSent);
803         return batched;
804     }
805
806     @Test
807     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
808         new ShardTestKit(getSystem()) {{
809             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
810                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
811                     "testBatchedModificationsWithNoCommitOnReady");
812
813             waitUntilLeader(shard);
814
815             final String transactionID = "tx";
816             final FiniteDuration duration = duration("5 seconds");
817
818             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
819             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
820                 @Override
821                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
822                     if(mockCohort.get() == null) {
823                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
824                     }
825
826                     return mockCohort.get();
827                 }
828             };
829
830             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
831
832             // Send a BatchedModifications to start a transaction.
833
834             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
835                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
836             expectMsgClass(duration, BatchedModificationsReply.class);
837
838             // Send a couple more BatchedModifications.
839
840             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
841                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
842             expectMsgClass(duration, BatchedModificationsReply.class);
843
844             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
845                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
846                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
847             expectMsgClass(duration, ReadyTransactionReply.class);
848
849             // Send the CanCommitTransaction message.
850
851             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
852             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
853                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
854             assertEquals("Can commit", true, canCommitReply.getCanCommit());
855
856             // Send the CanCommitTransaction message.
857
858             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
859             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
860
861             final InOrder inOrder = inOrder(mockCohort.get());
862             inOrder.verify(mockCohort.get()).canCommit();
863             inOrder.verify(mockCohort.get()).preCommit();
864             inOrder.verify(mockCohort.get()).commit();
865
866             // Verify data in the data store.
867
868             verifyOuterListEntry(shard, 1);
869
870             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
871         }};
872     }
873
874     @Test
875     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
876         new ShardTestKit(getSystem()) {{
877             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
878                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
879                     "testBatchedModificationsWithCommitOnReady");
880
881             waitUntilLeader(shard);
882
883             final String transactionID = "tx";
884             final FiniteDuration duration = duration("5 seconds");
885
886             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
887             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
888                 @Override
889                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
890                     if(mockCohort.get() == null) {
891                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
892                     }
893
894                     return mockCohort.get();
895                 }
896             };
897
898             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
899
900             // Send a BatchedModifications to start a transaction.
901
902             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
903                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
904             expectMsgClass(duration, BatchedModificationsReply.class);
905
906             // Send a couple more BatchedModifications.
907
908             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
909                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
910             expectMsgClass(duration, BatchedModificationsReply.class);
911
912             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
913                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
914                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
915
916             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
917
918             final InOrder inOrder = inOrder(mockCohort.get());
919             inOrder.verify(mockCohort.get()).canCommit();
920             inOrder.verify(mockCohort.get()).preCommit();
921             inOrder.verify(mockCohort.get()).commit();
922
923             // Verify data in the data store.
924
925             verifyOuterListEntry(shard, 1);
926
927             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
928         }};
929     }
930
931     @Test(expected=IllegalStateException.class)
932     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
933         new ShardTestKit(getSystem()) {{
934             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
935                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
936                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
937
938             waitUntilLeader(shard);
939
940             final String transactionID = "tx1";
941             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
942             batched.setReady(true);
943             batched.setTotalMessagesSent(2);
944
945             shard.tell(batched, getRef());
946
947             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
948
949             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
950
951             if(failure != null) {
952                 throw failure.cause();
953             }
954         }};
955     }
956
957     @Test
958     public void testBatchedModificationsWithOperationFailure() throws Throwable {
959         new ShardTestKit(getSystem()) {{
960             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
961                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
962                     "testBatchedModificationsWithOperationFailure");
963
964             waitUntilLeader(shard);
965
966             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
967             // write will not validate the children for performance reasons.
968
969             String transactionID = "tx1";
970
971             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
972                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
973                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
974
975             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
976             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
977             shard.tell(batched, getRef());
978             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
979
980             Throwable cause = failure.cause();
981
982             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
983             batched.setReady(true);
984             batched.setTotalMessagesSent(2);
985
986             shard.tell(batched, getRef());
987
988             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
989             assertEquals("Failure cause", cause, failure.cause());
990
991             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
992         }};
993     }
994
995     @SuppressWarnings("unchecked")
996     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
997         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
998         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
999         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1000                 outerList.getValue() instanceof Iterable);
1001         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1002         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1003                 entry instanceof MapEntryNode);
1004         final MapEntryNode mapEntry = (MapEntryNode)entry;
1005         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1006                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1007         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1008         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1009     }
1010
1011     @Test
1012     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1013         new ShardTestKit(getSystem()) {{
1014             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1015                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1016                     "testBatchedModificationsOnTransactionChain");
1017
1018             waitUntilLeader(shard);
1019
1020             final String transactionChainID = "txChain";
1021             final String transactionID1 = "tx1";
1022             final String transactionID2 = "tx2";
1023
1024             final FiniteDuration duration = duration("5 seconds");
1025
1026             // Send a BatchedModifications to start a chained write transaction and ready it.
1027
1028             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1029             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1030             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1031                     containerNode, true, false, 1), getRef());
1032             expectMsgClass(duration, ReadyTransactionReply.class);
1033
1034             // Create a read Tx on the same chain.
1035
1036             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1037                     transactionChainID).toSerializable(), getRef());
1038
1039             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1040
1041             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1042             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1043             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1044
1045             // Commit the write transaction.
1046
1047             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1048             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1049                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1050             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1051
1052             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1053             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1054
1055             // Verify data in the data store.
1056
1057             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1058             assertEquals("Stored node", containerNode, actualNode);
1059
1060             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1061         }};
1062     }
1063
1064     @Test
1065     public void testOnBatchedModificationsWhenNotLeader() {
1066         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1067         new ShardTestKit(getSystem()) {{
1068             final Creator<Shard> creator = new Creator<Shard>() {
1069                 private static final long serialVersionUID = 1L;
1070
1071                 @Override
1072                 public Shard create() throws Exception {
1073                     return new Shard(newShardBuilder()) {
1074                         @Override
1075                         protected boolean isLeader() {
1076                             return overrideLeaderCalls.get() ? false : super.isLeader();
1077                         }
1078
1079                         @Override
1080                         protected ActorSelection getLeader() {
1081                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1082                                 super.getLeader();
1083                         }
1084                     };
1085                 }
1086             };
1087
1088             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1089                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1090
1091             waitUntilLeader(shard);
1092
1093             overrideLeaderCalls.set(true);
1094
1095             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1096
1097             shard.tell(batched, ActorRef.noSender());
1098
1099             expectMsgEquals(batched);
1100
1101             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1102         }};
1103     }
1104
1105     @Test
1106     public void testTransactionMessagesWithNoLeader() {
1107         new ShardTestKit(getSystem()) {{
1108             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
1109                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
1110             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1111                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112                     "testTransactionMessagesWithNoLeader");
1113
1114             waitUntilNoLeader(shard);
1115
1116             shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
1117             Failure failure = expectMsgClass(Failure.class);
1118             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
1119
1120             shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
1121                     DataStoreVersions.CURRENT_VERSION, true), getRef());
1122             failure = expectMsgClass(Failure.class);
1123             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
1124
1125             shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
1126             failure = expectMsgClass(Failure.class);
1127             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
1128         }};
1129     }
1130
1131     @Test
1132     public void testReadyWithImmediateCommit() throws Exception{
1133         testReadyWithImmediateCommit(true);
1134         testReadyWithImmediateCommit(false);
1135     }
1136
1137     public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
1138         new ShardTestKit(getSystem()) {{
1139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1140                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141                     "testReadyWithImmediateCommit-" + readWrite);
1142
1143             waitUntilLeader(shard);
1144
1145             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1146
1147             final String transactionID = "tx1";
1148             final MutableCompositeModification modification = new MutableCompositeModification();
1149             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1150             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1151                     TestModel.TEST_PATH, containerNode, modification);
1152
1153             final FiniteDuration duration = duration("5 seconds");
1154
1155             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1156
1157             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1158
1159             final InOrder inOrder = inOrder(cohort);
1160             inOrder.verify(cohort).canCommit();
1161             inOrder.verify(cohort).preCommit();
1162             inOrder.verify(cohort).commit();
1163
1164             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1165             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1166
1167             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1168         }};
1169     }
1170
1171     @Test
1172     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1173         new ShardTestKit(getSystem()) {{
1174             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1175                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1176                     "testReadyLocalTransactionWithImmediateCommit");
1177
1178             waitUntilLeader(shard);
1179
1180             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1181
1182             final DataTreeModification modification = dataStore.newModification();
1183
1184             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1185             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1186             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1187             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1188
1189             final String txId = "tx1";
1190             modification.ready();
1191             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1192
1193             shard.tell(readyMessage, getRef());
1194
1195             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1196
1197             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1198             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1199
1200             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1201         }};
1202     }
1203
1204     @Test
1205     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1206         new ShardTestKit(getSystem()) {{
1207             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1208                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1209                     "testReadyLocalTransactionWithThreePhaseCommit");
1210
1211             waitUntilLeader(shard);
1212
1213             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1214
1215             final DataTreeModification modification = dataStore.newModification();
1216
1217             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1218             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1219             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1220             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1221
1222             final String txId = "tx1";
1223                 modification.ready();
1224             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1225
1226             shard.tell(readyMessage, getRef());
1227
1228             expectMsgClass(ReadyTransactionReply.class);
1229
1230             // Send the CanCommitTransaction message.
1231
1232             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1233             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1234                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1235             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1236
1237             // Send the CanCommitTransaction message.
1238
1239             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1240             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1241
1242             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1243             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1244
1245             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1246         }};
1247     }
1248
1249     @Test
1250     public void testCommitWithPersistenceDisabled() throws Throwable {
1251         testCommitWithPersistenceDisabled(true);
1252         testCommitWithPersistenceDisabled(false);
1253     }
1254
1255     public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1256         dataStoreContextBuilder.persistent(false);
1257         new ShardTestKit(getSystem()) {{
1258             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1259                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1260                     "testCommitWithPersistenceDisabled-" + readWrite);
1261
1262             waitUntilLeader(shard);
1263
1264             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1265
1266             // Setup a simulated transactions with a mock cohort.
1267
1268             final String transactionID = "tx";
1269             final MutableCompositeModification modification = new MutableCompositeModification();
1270             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1271             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1272                 TestModel.TEST_PATH, containerNode, modification);
1273
1274             final FiniteDuration duration = duration("5 seconds");
1275
1276             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1277             expectMsgClass(duration, ReadyTransactionReply.class);
1278
1279             // Send the CanCommitTransaction message.
1280
1281             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1282             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1283                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1284             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1285
1286             // Send the CanCommitTransaction message.
1287
1288             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1289             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1290
1291             final InOrder inOrder = inOrder(cohort);
1292             inOrder.verify(cohort).canCommit();
1293             inOrder.verify(cohort).preCommit();
1294             inOrder.verify(cohort).commit();
1295
1296             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1297             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1298
1299             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1300         }};
1301     }
1302
1303     private static DataTreeCandidateTip mockCandidate(final String name) {
1304         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1305         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1306         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1307         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1308         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1309         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1310         return mockCandidate;
1311     }
1312
1313     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1314         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1315         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1316         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1317         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1318         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1319         return mockCandidate;
1320     }
1321
1322     @Test
1323     public void testCommitWhenTransactionHasNoModifications() {
1324         testCommitWhenTransactionHasNoModifications(true);
1325         testCommitWhenTransactionHasNoModifications(false);
1326     }
1327
1328     public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1329         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1330         // but here that need not happen
1331         new ShardTestKit(getSystem()) {
1332             {
1333                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1334                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1335                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1336
1337                 waitUntilLeader(shard);
1338
1339                 final String transactionID = "tx1";
1340                 final MutableCompositeModification modification = new MutableCompositeModification();
1341                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1342                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1343                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1344                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1345                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1346
1347                 final FiniteDuration duration = duration("5 seconds");
1348
1349                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1350                 expectMsgClass(duration, ReadyTransactionReply.class);
1351
1352                 // Send the CanCommitTransaction message.
1353
1354                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1355                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1356                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1357                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1358
1359                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1360                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1361
1362                 final InOrder inOrder = inOrder(cohort);
1363                 inOrder.verify(cohort).canCommit();
1364                 inOrder.verify(cohort).preCommit();
1365                 inOrder.verify(cohort).commit();
1366
1367                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1368                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1369
1370                 // Use MBean for verification
1371                 // Committed transaction count should increase as usual
1372                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1373
1374                 // Commit index should not advance because this does not go into the journal
1375                 assertEquals(-1, shardStats.getCommitIndex());
1376
1377                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1378
1379             }
1380         };
1381     }
1382
1383     @Test
1384     public void testCommitWhenTransactionHasModifications() {
1385         testCommitWhenTransactionHasModifications(true);
1386         testCommitWhenTransactionHasModifications(false);
1387     }
1388
1389     public void testCommitWhenTransactionHasModifications(final boolean readWrite){
1390         new ShardTestKit(getSystem()) {
1391             {
1392                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1393                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1394                         "testCommitWhenTransactionHasModifications-" + readWrite);
1395
1396                 waitUntilLeader(shard);
1397
1398                 final String transactionID = "tx1";
1399                 final MutableCompositeModification modification = new MutableCompositeModification();
1400                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1401                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1402                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1403                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1404                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1405                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1406
1407                 final FiniteDuration duration = duration("5 seconds");
1408
1409                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1410                 expectMsgClass(duration, ReadyTransactionReply.class);
1411
1412                 // Send the CanCommitTransaction message.
1413
1414                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1415                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1416                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1417                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1418
1419                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1420                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1421
1422                 final InOrder inOrder = inOrder(cohort);
1423                 inOrder.verify(cohort).canCommit();
1424                 inOrder.verify(cohort).preCommit();
1425                 inOrder.verify(cohort).commit();
1426
1427                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1428                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1429
1430                 // Use MBean for verification
1431                 // Committed transaction count should increase as usual
1432                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1433
1434                 // Commit index should advance as we do not have an empty modification
1435                 assertEquals(0, shardStats.getCommitIndex());
1436
1437                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1438
1439             }
1440         };
1441     }
1442
1443     @Test
1444     public void testCommitPhaseFailure() throws Throwable {
1445         testCommitPhaseFailure(true);
1446         testCommitPhaseFailure(false);
1447     }
1448
1449     public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1450         new ShardTestKit(getSystem()) {{
1451             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1452                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1453                     "testCommitPhaseFailure-" + readWrite);
1454
1455             waitUntilLeader(shard);
1456
1457             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1458             // commit phase.
1459
1460             final String transactionID1 = "tx1";
1461             final MutableCompositeModification modification1 = new MutableCompositeModification();
1462             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1463             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1464             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1465             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1466             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1467
1468             final String transactionID2 = "tx2";
1469             final MutableCompositeModification modification2 = new MutableCompositeModification();
1470             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1471             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1472
1473             final FiniteDuration duration = duration("5 seconds");
1474             final Timeout timeout = new Timeout(duration);
1475
1476             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1477             expectMsgClass(duration, ReadyTransactionReply.class);
1478
1479             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1480             expectMsgClass(duration, ReadyTransactionReply.class);
1481
1482             // Send the CanCommitTransaction message for the first Tx.
1483
1484             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1485             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1486                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1487             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1488
1489             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1490             // processed after the first Tx completes.
1491
1492             final Future<Object> canCommitFuture = Patterns.ask(shard,
1493                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1494
1495             // Send the CommitTransaction message for the first Tx. This should send back an error
1496             // and trigger the 2nd Tx to proceed.
1497
1498             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1499             expectMsgClass(duration, akka.actor.Status.Failure.class);
1500
1501             // Wait for the 2nd Tx to complete the canCommit phase.
1502
1503             final CountDownLatch latch = new CountDownLatch(1);
1504             canCommitFuture.onComplete(new OnComplete<Object>() {
1505                 @Override
1506                 public void onComplete(final Throwable t, final Object resp) {
1507                     latch.countDown();
1508                 }
1509             }, getSystem().dispatcher());
1510
1511             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1512
1513             final InOrder inOrder = inOrder(cohort1, cohort2);
1514             inOrder.verify(cohort1).canCommit();
1515             inOrder.verify(cohort1).preCommit();
1516             inOrder.verify(cohort1).commit();
1517             inOrder.verify(cohort2).canCommit();
1518
1519             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1520         }};
1521     }
1522
1523     @Test
1524     public void testPreCommitPhaseFailure() throws Throwable {
1525         testPreCommitPhaseFailure(true);
1526         testPreCommitPhaseFailure(false);
1527     }
1528
1529     public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1530         new ShardTestKit(getSystem()) {{
1531             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1532                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1533                     "testPreCommitPhaseFailure-" + readWrite);
1534
1535             waitUntilLeader(shard);
1536
1537             final String transactionID1 = "tx1";
1538             final MutableCompositeModification modification1 = new MutableCompositeModification();
1539             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1540             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1541             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1542
1543             final String transactionID2 = "tx2";
1544             final MutableCompositeModification modification2 = new MutableCompositeModification();
1545             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1546             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1547
1548             final FiniteDuration duration = duration("5 seconds");
1549             final Timeout timeout = new Timeout(duration);
1550
1551             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1552             expectMsgClass(duration, ReadyTransactionReply.class);
1553
1554             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1555             expectMsgClass(duration, ReadyTransactionReply.class);
1556
1557             // Send the CanCommitTransaction message for the first Tx.
1558
1559             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1560             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1561                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1562             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1563
1564             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1565             // processed after the first Tx completes.
1566
1567             final Future<Object> canCommitFuture = Patterns.ask(shard,
1568                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1569
1570             // Send the CommitTransaction message for the first Tx. This should send back an error
1571             // and trigger the 2nd Tx to proceed.
1572
1573             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1574             expectMsgClass(duration, akka.actor.Status.Failure.class);
1575
1576             // Wait for the 2nd Tx to complete the canCommit phase.
1577
1578             final CountDownLatch latch = new CountDownLatch(1);
1579             canCommitFuture.onComplete(new OnComplete<Object>() {
1580                 @Override
1581                 public void onComplete(final Throwable t, final Object resp) {
1582                     latch.countDown();
1583                 }
1584             }, getSystem().dispatcher());
1585
1586             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1587
1588             final InOrder inOrder = inOrder(cohort1, cohort2);
1589             inOrder.verify(cohort1).canCommit();
1590             inOrder.verify(cohort1).preCommit();
1591             inOrder.verify(cohort2).canCommit();
1592
1593             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1594         }};
1595     }
1596
1597     @Test
1598     public void testCanCommitPhaseFailure() throws Throwable {
1599         testCanCommitPhaseFailure(true);
1600         testCanCommitPhaseFailure(false);
1601     }
1602
1603     public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1604         new ShardTestKit(getSystem()) {{
1605             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1606                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1607                     "testCanCommitPhaseFailure-" + readWrite);
1608
1609             waitUntilLeader(shard);
1610
1611             final FiniteDuration duration = duration("5 seconds");
1612
1613             final String transactionID1 = "tx1";
1614             final MutableCompositeModification modification = new MutableCompositeModification();
1615             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1616             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1617
1618             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1619             expectMsgClass(duration, ReadyTransactionReply.class);
1620
1621             // Send the CanCommitTransaction message.
1622
1623             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1624             expectMsgClass(duration, akka.actor.Status.Failure.class);
1625
1626             // Send another can commit to ensure the failed one got cleaned up.
1627
1628             reset(cohort);
1629
1630             final String transactionID2 = "tx2";
1631             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1632
1633             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1634             expectMsgClass(duration, ReadyTransactionReply.class);
1635
1636             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1637             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1638                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1639             assertEquals("getCanCommit", true, reply.getCanCommit());
1640
1641             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1642         }};
1643     }
1644
1645     @Test
1646     public void testCanCommitPhaseFalseResponse() throws Throwable {
1647         testCanCommitPhaseFalseResponse(true);
1648         testCanCommitPhaseFalseResponse(false);
1649     }
1650
1651     public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1652         new ShardTestKit(getSystem()) {{
1653             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1654                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1655                     "testCanCommitPhaseFalseResponse-" + readWrite);
1656
1657             waitUntilLeader(shard);
1658
1659             final FiniteDuration duration = duration("5 seconds");
1660
1661             final String transactionID1 = "tx1";
1662             final MutableCompositeModification modification = new MutableCompositeModification();
1663             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1664             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1665
1666             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1667             expectMsgClass(duration, ReadyTransactionReply.class);
1668
1669             // Send the CanCommitTransaction message.
1670
1671             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1672             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1673                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1674             assertEquals("getCanCommit", false, reply.getCanCommit());
1675
1676             // Send another can commit to ensure the failed one got cleaned up.
1677
1678             reset(cohort);
1679
1680             final String transactionID2 = "tx2";
1681             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1682
1683             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1684             expectMsgClass(duration, ReadyTransactionReply.class);
1685
1686             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1687             reply = CanCommitTransactionReply.fromSerializable(
1688                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1689             assertEquals("getCanCommit", true, reply.getCanCommit());
1690
1691             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1692         }};
1693     }
1694
1695     @Test
1696     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1697         testImmediateCommitWithCanCommitPhaseFailure(true);
1698         testImmediateCommitWithCanCommitPhaseFailure(false);
1699     }
1700
1701     public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1702         new ShardTestKit(getSystem()) {{
1703             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1704                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1705                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1706
1707             waitUntilLeader(shard);
1708
1709             final FiniteDuration duration = duration("5 seconds");
1710
1711             final String transactionID1 = "tx1";
1712             final MutableCompositeModification modification = new MutableCompositeModification();
1713             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1714             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1715
1716             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1717
1718             expectMsgClass(duration, akka.actor.Status.Failure.class);
1719
1720             // Send another can commit to ensure the failed one got cleaned up.
1721
1722             reset(cohort);
1723
1724             final String transactionID2 = "tx2";
1725             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1726             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1727             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1728             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1729             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1730             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1731             doReturn(candidateRoot).when(candidate).getRootNode();
1732             doReturn(candidate).when(cohort).getCandidate();
1733
1734             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1735
1736             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1737
1738             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1739         }};
1740     }
1741
1742     @Test
1743     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1744         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1745         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1746     }
1747
1748     public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1749         new ShardTestKit(getSystem()) {{
1750             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1751                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1752                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1753
1754             waitUntilLeader(shard);
1755
1756             final FiniteDuration duration = duration("5 seconds");
1757
1758             final String transactionID = "tx1";
1759             final MutableCompositeModification modification = new MutableCompositeModification();
1760             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1761             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1762
1763             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1764
1765             expectMsgClass(duration, akka.actor.Status.Failure.class);
1766
1767             // Send another can commit to ensure the failed one got cleaned up.
1768
1769             reset(cohort);
1770
1771             final String transactionID2 = "tx2";
1772             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1773             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1774             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1775             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1776             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1777             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1778             doReturn(candidateRoot).when(candidate).getRootNode();
1779             doReturn(candidate).when(cohort).getCandidate();
1780
1781             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1782
1783             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1784
1785             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1786         }};
1787     }
1788
1789     @Test
1790     public void testAbortBeforeFinishCommit() throws Throwable {
1791         testAbortBeforeFinishCommit(true);
1792         testAbortBeforeFinishCommit(false);
1793     }
1794
1795     public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1796         new ShardTestKit(getSystem()) {{
1797             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1798                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1799                     "testAbortBeforeFinishCommit-" + readWrite);
1800
1801             waitUntilLeader(shard);
1802
1803             final FiniteDuration duration = duration("5 seconds");
1804             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1805
1806             final String transactionID = "tx1";
1807             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1808                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1809                 @Override
1810                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1811                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1812
1813                     // Simulate an AbortTransaction message occurring during replication, after
1814                     // persisting and before finishing the commit to the in-memory store.
1815                     // We have no followers so due to optimizations in the RaftActor, it does not
1816                     // attempt replication and thus we can't send an AbortTransaction message b/c
1817                     // it would be processed too late after CommitTransaction completes. So we'll
1818                     // simulate an AbortTransaction message occurring during replication by calling
1819                     // the shard directly.
1820                     //
1821                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1822
1823                     return preCommitFuture;
1824                 }
1825             };
1826
1827             final MutableCompositeModification modification = new MutableCompositeModification();
1828             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1829                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1830                     modification, preCommit);
1831
1832             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1833             expectMsgClass(duration, ReadyTransactionReply.class);
1834
1835             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1836             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1837                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1838             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1839
1840             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1841             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1842
1843             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1844
1845             // Since we're simulating an abort occurring during replication and before finish commit,
1846             // the data should still get written to the in-memory store since we've gotten past
1847             // canCommit and preCommit and persisted the data.
1848             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1849
1850             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1851         }};
1852     }
1853
1854     @Test
1855     public void testTransactionCommitTimeout() throws Throwable {
1856         testTransactionCommitTimeout(true);
1857         testTransactionCommitTimeout(false);
1858     }
1859
1860     public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1861         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1862
1863         new ShardTestKit(getSystem()) {{
1864             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1865                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1866                     "testTransactionCommitTimeout-" + readWrite);
1867
1868             waitUntilLeader(shard);
1869
1870             final FiniteDuration duration = duration("5 seconds");
1871
1872             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1873
1874             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1875             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1876                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1877
1878             // Create 1st Tx - will timeout
1879
1880             final String transactionID1 = "tx1";
1881             final MutableCompositeModification modification1 = new MutableCompositeModification();
1882             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1883                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1884                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1885                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1886                     modification1);
1887
1888             // Create 2nd Tx
1889
1890             final String transactionID2 = "tx3";
1891             final MutableCompositeModification modification2 = new MutableCompositeModification();
1892             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1893                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1894             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1895                     listNodePath,
1896                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1897                     modification2);
1898
1899             // Ready the Tx's
1900
1901             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1902             expectMsgClass(duration, ReadyTransactionReply.class);
1903
1904             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1905             expectMsgClass(duration, ReadyTransactionReply.class);
1906
1907             // canCommit 1st Tx. We don't send the commit so it should timeout.
1908
1909             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1910             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1911
1912             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1913
1914             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1915             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1916
1917             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1918
1919             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1920             expectMsgClass(duration, akka.actor.Status.Failure.class);
1921
1922             // Commit the 2nd Tx.
1923
1924             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1925             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1926
1927             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1928             assertNotNull(listNodePath + " not found", node);
1929
1930             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1931         }};
1932     }
1933
1934     @Test
1935     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1936         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1937
1938         new ShardTestKit(getSystem()) {{
1939             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1940                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1941                     "testTransactionCommitQueueCapacityExceeded");
1942
1943             waitUntilLeader(shard);
1944
1945             final FiniteDuration duration = duration("5 seconds");
1946
1947             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1948
1949             final String transactionID1 = "tx1";
1950             final MutableCompositeModification modification1 = new MutableCompositeModification();
1951             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1952                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1953
1954             final String transactionID2 = "tx2";
1955             final MutableCompositeModification modification2 = new MutableCompositeModification();
1956             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1957                     TestModel.OUTER_LIST_PATH,
1958                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1959                     modification2);
1960
1961             final String transactionID3 = "tx3";
1962             final MutableCompositeModification modification3 = new MutableCompositeModification();
1963             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1964                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1965
1966             // Ready the Tx's
1967
1968             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1969             expectMsgClass(duration, ReadyTransactionReply.class);
1970
1971             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1972             expectMsgClass(duration, ReadyTransactionReply.class);
1973
1974             // The 3rd Tx should exceed queue capacity and fail.
1975
1976             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1977             expectMsgClass(duration, akka.actor.Status.Failure.class);
1978
1979             // canCommit 1st Tx.
1980
1981             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1982             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1983
1984             // canCommit the 2nd Tx - it should get queued.
1985
1986             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1987
1988             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1989
1990             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1991             expectMsgClass(duration, akka.actor.Status.Failure.class);
1992
1993             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1994         }};
1995     }
1996
1997     @Test
1998     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1999         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2000
2001         new ShardTestKit(getSystem()) {{
2002             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2003                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2004                     "testTransactionCommitWithPriorExpiredCohortEntries");
2005
2006             waitUntilLeader(shard);
2007
2008             final FiniteDuration duration = duration("5 seconds");
2009
2010             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2011
2012             final String transactionID1 = "tx1";
2013             final MutableCompositeModification modification1 = new MutableCompositeModification();
2014             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2015                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2016
2017             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2018             expectMsgClass(duration, ReadyTransactionReply.class);
2019
2020             final String transactionID2 = "tx2";
2021             final MutableCompositeModification modification2 = new MutableCompositeModification();
2022             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2023                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2024
2025             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2026             expectMsgClass(duration, ReadyTransactionReply.class);
2027
2028             final String transactionID3 = "tx3";
2029             final MutableCompositeModification modification3 = new MutableCompositeModification();
2030             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2031                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2032
2033             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
2034             expectMsgClass(duration, ReadyTransactionReply.class);
2035
2036             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2037             // should expire from the queue and the last one should be processed.
2038
2039             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2040             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2041
2042             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2043         }};
2044     }
2045
2046     @Test
2047     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2048         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2049
2050         new ShardTestKit(getSystem()) {{
2051             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2052                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2053                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2054
2055             waitUntilLeader(shard);
2056
2057             final FiniteDuration duration = duration("5 seconds");
2058
2059             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2060
2061             final String transactionID1 = "tx1";
2062             final MutableCompositeModification modification1 = new MutableCompositeModification();
2063             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2064                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2065
2066             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2067             expectMsgClass(duration, ReadyTransactionReply.class);
2068
2069             // CanCommit the first one so it's the current in-progress CohortEntry.
2070
2071             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2072             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2073
2074             // Ready the second Tx.
2075
2076             final String transactionID2 = "tx2";
2077             final MutableCompositeModification modification2 = new MutableCompositeModification();
2078             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2079                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2080
2081             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2082             expectMsgClass(duration, ReadyTransactionReply.class);
2083
2084             // Ready the third Tx.
2085
2086             final String transactionID3 = "tx3";
2087             final DataTreeModification modification3 = dataStore.newModification();
2088             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2089                     .apply(modification3);
2090                 modification3.ready();
2091             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2092
2093             shard.tell(readyMessage, getRef());
2094
2095             // Commit the first Tx. After completing, the second should expire from the queue and the third
2096             // Tx committed.
2097
2098             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2099             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2100
2101             // Expect commit reply from the third Tx.
2102
2103             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2104
2105             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2106             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2107
2108             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2109         }};
2110     }
2111
2112     @Test
2113     public void testCanCommitBeforeReadyFailure() throws Throwable {
2114         new ShardTestKit(getSystem()) {{
2115             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2116                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2117                     "testCanCommitBeforeReadyFailure");
2118
2119             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2120             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2121
2122             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2123         }};
2124     }
2125
2126     @Test
2127     public void testAbortCurrentTransaction() throws Throwable {
2128         testAbortCurrentTransaction(true);
2129         testAbortCurrentTransaction(false);
2130     }
2131
2132     public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
2133         new ShardTestKit(getSystem()) {{
2134             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2135                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2136                     "testAbortCurrentTransaction-" + readWrite);
2137
2138             waitUntilLeader(shard);
2139
2140             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2141
2142             final String transactionID1 = "tx1";
2143             final MutableCompositeModification modification1 = new MutableCompositeModification();
2144             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2145             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2146             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2147
2148             final String transactionID2 = "tx2";
2149             final MutableCompositeModification modification2 = new MutableCompositeModification();
2150             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2151             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2152
2153             final FiniteDuration duration = duration("5 seconds");
2154             final Timeout timeout = new Timeout(duration);
2155
2156             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2157             expectMsgClass(duration, ReadyTransactionReply.class);
2158
2159             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2160             expectMsgClass(duration, ReadyTransactionReply.class);
2161
2162             // Send the CanCommitTransaction message for the first Tx.
2163
2164             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2165             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2166                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2167             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2168
2169             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2170             // processed after the first Tx completes.
2171
2172             final Future<Object> canCommitFuture = Patterns.ask(shard,
2173                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2174
2175             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2176             // Tx to proceed.
2177
2178             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2179             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2180
2181             // Wait for the 2nd Tx to complete the canCommit phase.
2182
2183             Await.ready(canCommitFuture, duration);
2184
2185             final InOrder inOrder = inOrder(cohort1, cohort2);
2186             inOrder.verify(cohort1).canCommit();
2187             inOrder.verify(cohort2).canCommit();
2188
2189             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2190         }};
2191     }
2192
2193     @Test
2194     public void testAbortQueuedTransaction() throws Throwable {
2195         testAbortQueuedTransaction(true);
2196         testAbortQueuedTransaction(false);
2197     }
2198
2199     public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
2200         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2201         new ShardTestKit(getSystem()) {{
2202             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2203             @SuppressWarnings("serial")
2204             final Creator<Shard> creator = new Creator<Shard>() {
2205                 @Override
2206                 public Shard create() throws Exception {
2207                     return new Shard(newShardBuilder()) {
2208                         @Override
2209                         public void onReceiveCommand(final Object message) throws Exception {
2210                             super.onReceiveCommand(message);
2211                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2212                                 if(cleaupCheckLatch.get() != null) {
2213                                     cleaupCheckLatch.get().countDown();
2214                                 }
2215                             }
2216                         }
2217                     };
2218                 }
2219             };
2220
2221             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2222                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2223                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2224
2225             waitUntilLeader(shard);
2226
2227             final String transactionID = "tx1";
2228
2229             final MutableCompositeModification modification = new MutableCompositeModification();
2230             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2231             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2232
2233             final FiniteDuration duration = duration("5 seconds");
2234
2235             // Ready the tx.
2236
2237             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2238             expectMsgClass(duration, ReadyTransactionReply.class);
2239
2240             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2241
2242             // Send the AbortTransaction message.
2243
2244             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2245             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2246
2247             verify(cohort).abort();
2248
2249             // Verify the tx cohort is removed from queue at the cleanup check interval.
2250
2251             cleaupCheckLatch.set(new CountDownLatch(1));
2252             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2253                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2254
2255             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2256
2257             // Now send CanCommitTransaction - should fail.
2258
2259             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2260
2261             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2262             assertTrue("Failure type", failure instanceof IllegalStateException);
2263
2264             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2265         }};
2266     }
2267
2268     @Test
2269     public void testCreateSnapshot() throws Exception {
2270         testCreateSnapshot(true, "testCreateSnapshot");
2271     }
2272
2273     @Test
2274     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2275         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2276     }
2277
2278     @SuppressWarnings("serial")
2279     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2280
2281         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2282
2283         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2284         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2285             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2286                 super(delegate);
2287             }
2288
2289             @Override
2290             public void saveSnapshot(final Object o) {
2291                 savedSnapshot.set(o);
2292                 super.saveSnapshot(o);
2293             }
2294         }
2295
2296         dataStoreContextBuilder.persistent(persistent);
2297
2298         new ShardTestKit(getSystem()) {{
2299             class TestShard extends Shard {
2300
2301                 protected TestShard(AbstractBuilder<?, ?> builder) {
2302                     super(builder);
2303                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2304                 }
2305
2306                 @Override
2307                 public void handleCommand(final Object message) {
2308                     super.handleCommand(message);
2309
2310                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2311                         latch.get().countDown();
2312                     }
2313                 }
2314
2315                 @Override
2316                 public RaftActorContext getRaftActorContext() {
2317                     return super.getRaftActorContext();
2318                 }
2319             }
2320
2321             final Creator<Shard> creator = new Creator<Shard>() {
2322                 @Override
2323                 public Shard create() throws Exception {
2324                     return new TestShard(newShardBuilder());
2325                 }
2326             };
2327
2328             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2329                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2330
2331             waitUntilLeader(shard);
2332             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2333
2334             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2335
2336             // Trigger creation of a snapshot by ensuring
2337             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2338             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2339             awaitAndValidateSnapshot(expectedRoot);
2340
2341             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2342             awaitAndValidateSnapshot(expectedRoot);
2343
2344             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2345         }
2346
2347             private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2348                                               ) throws InterruptedException {
2349                 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2350                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2351
2352                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2353                         savedSnapshot.get() instanceof Snapshot);
2354
2355                 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2356
2357                 latch.set(new CountDownLatch(1));
2358                 savedSnapshot.set(null);
2359             }
2360
2361             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2362
2363                 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2364                 assertEquals("Root node", expectedRoot, actual);
2365
2366            }
2367         };
2368     }
2369
2370     /**
2371      * This test simply verifies that the applySnapShot logic will work
2372      * @throws ReadFailedException
2373      * @throws DataValidationFailedException
2374      */
2375     @Test
2376     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2377         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2378         store.setSchemaContext(SCHEMA_CONTEXT);
2379
2380         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2381         putTransaction.write(TestModel.TEST_PATH,
2382             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2383         commitTransaction(store, putTransaction);
2384
2385
2386         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2387
2388         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2389
2390         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2391         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2392
2393         commitTransaction(store, writeTransaction);
2394
2395         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2396
2397         assertEquals(expected, actual);
2398     }
2399
2400     @Test
2401     public void testRecoveryApplicable(){
2402
2403         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2404                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2405
2406         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2407                 schemaContext(SCHEMA_CONTEXT).props();
2408
2409         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2410                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2411
2412         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2413                 schemaContext(SCHEMA_CONTEXT).props();
2414
2415         new ShardTestKit(getSystem()) {{
2416             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2417                     persistentProps, "testPersistence1");
2418
2419             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2420
2421             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2422
2423             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2424                     nonPersistentProps, "testPersistence2");
2425
2426             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2427
2428             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2429
2430         }};
2431
2432     }
2433
2434     @Test
2435     public void testOnDatastoreContext() {
2436         new ShardTestKit(getSystem()) {{
2437             dataStoreContextBuilder.persistent(true);
2438
2439             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2440
2441             assertEquals("isRecoveryApplicable", true,
2442                     shard.underlyingActor().persistence().isRecoveryApplicable());
2443
2444             waitUntilLeader(shard);
2445
2446             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2447
2448             assertEquals("isRecoveryApplicable", false,
2449                 shard.underlyingActor().persistence().isRecoveryApplicable());
2450
2451             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2452
2453             assertEquals("isRecoveryApplicable", true,
2454                 shard.underlyingActor().persistence().isRecoveryApplicable());
2455
2456             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2457         }};
2458     }
2459
2460     @Test
2461     public void testRegisterRoleChangeListener() throws Exception {
2462         new ShardTestKit(getSystem()) {
2463             {
2464                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2465                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2466                         "testRegisterRoleChangeListener");
2467
2468                 waitUntilLeader(shard);
2469
2470                 final TestActorRef<MessageCollectorActor> listener =
2471                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2472
2473                 shard.tell(new RegisterRoleChangeListener(), listener);
2474
2475                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2476
2477                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2478                     ShardLeaderStateChanged.class);
2479                 assertEquals("getLocalShardDataTree present", true,
2480                         leaderStateChanged.getLocalShardDataTree().isPresent());
2481                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2482                     leaderStateChanged.getLocalShardDataTree().get());
2483
2484                 MessageCollectorActor.clearMessages(listener);
2485
2486                 // Force a leader change
2487
2488                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2489
2490                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2491                         ShardLeaderStateChanged.class);
2492                 assertEquals("getLocalShardDataTree present", false,
2493                         leaderStateChanged.getLocalShardDataTree().isPresent());
2494
2495                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2496             }
2497         };
2498     }
2499
2500     @Test
2501     public void testFollowerInitialSyncStatus() throws Exception {
2502         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2503                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2504                 "testFollowerInitialSyncStatus");
2505
2506         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2507
2508         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2509
2510         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2511
2512         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2513
2514         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2515     }
2516
2517     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2518         modification.ready();
2519         store.validate(modification);
2520         store.commit(store.prepare(modification));
2521     }
2522
2523     @Test
2524     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2525         new ShardTestKit(getSystem()) {{
2526             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2527             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2528
2529             final MockDataChangeListener listener = new MockDataChangeListener(1);
2530             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2531                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2532
2533             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2534                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2535                     actorFactory.generateActorId(testName + "-shard"));
2536
2537             waitUntilNoLeader(shard);
2538
2539             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2540
2541             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2542             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2543                 RegisterChangeListenerReply.class);
2544             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2545
2546             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2547
2548             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2549
2550             listener.waitForChangeEvents();
2551         }};
2552     }
2553
2554     @Test
2555     public void testClusteredDataChangeListenerRegistration() throws Exception {
2556         new ShardTestKit(getSystem()) {{
2557             String testName = "testClusteredDataChangeListenerRegistration";
2558             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2559                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2560
2561             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2562                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2563
2564             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2565                     Shard.builder().id(followerShardID).
2566                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2567                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2568                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2569                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2570
2571             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2572                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2573                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2574                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2575                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2576
2577             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2578             String leaderPath = waitUntilLeader(followerShard);
2579             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2580
2581             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2582             final MockDataChangeListener listener = new MockDataChangeListener(1);
2583             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2584                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2585
2586             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2587             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2588                 RegisterChangeListenerReply.class);
2589             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2590
2591             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2592
2593             listener.waitForChangeEvents();
2594         }};
2595     }
2596
2597     @Test
2598     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2599         new ShardTestKit(getSystem()) {{
2600             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2601             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2602
2603             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2604             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2605                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2606
2607             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2608                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2609                     actorFactory.generateActorId(testName + "-shard"));
2610
2611             waitUntilNoLeader(shard);
2612
2613             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2614
2615             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2616             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2617                     RegisterDataTreeChangeListenerReply.class);
2618             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2619
2620             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2621
2622             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2623
2624             listener.waitForChangeEvents();
2625         }};
2626     }
2627
2628     @Test
2629     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2630         new ShardTestKit(getSystem()) {{
2631             String testName = "testClusteredDataTreeChangeListenerRegistration";
2632             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2633                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2634
2635             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2636                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2637
2638             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2639                     Shard.builder().id(followerShardID).
2640                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2641                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2642                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2643                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2644
2645             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2646                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2647                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2648                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2649                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2650
2651             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2652             String leaderPath = waitUntilLeader(followerShard);
2653             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2654
2655             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2656             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2657             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2658                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2659
2660             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2661             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2662                     RegisterDataTreeChangeListenerReply.class);
2663             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2664
2665             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2666
2667             listener.waitForChangeEvents();
2668         }};
2669     }
2670
2671     @Test
2672     public void testServerRemoved() throws Exception {
2673         final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
2674
2675         final ActorRef shard = parent.underlyingActor().context().actorOf(
2676                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2677                 "testServerRemoved");
2678
2679         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2680
2681         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2682
2683     }
2684
2685 }