ae751fa65d2b0a3a0917e672a1dfeab59f52a670
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Set;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Test;
49 import org.mockito.InOrder;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
74 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
75 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.Modification;
78 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
79 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
96 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
97 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
100 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
105 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
106 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
107 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
108 import org.opendaylight.yangtools.yang.common.QName;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
123 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
124 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
125 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
126 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
127 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
128 import scala.concurrent.Await;
129 import scala.concurrent.Future;
130 import scala.concurrent.duration.FiniteDuration;
131
132 public class ShardTest extends AbstractShardTest {
133     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
134
135     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
136
137     @Test
138     public void testRegisterChangeListener() throws Exception {
139         new ShardTestKit(getSystem()) {{
140             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
141                     newShardProps(),  "testRegisterChangeListener");
142
143             waitUntilLeader(shard);
144
145             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
146
147             final MockDataChangeListener listener = new MockDataChangeListener(1);
148             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
149                     "testRegisterChangeListener-DataChangeListener");
150
151             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
152                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
153
154             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
155                     RegisterChangeListenerReply.class);
156             final String replyPath = reply.getListenerRegistrationPath().toString();
157             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
158                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
159
160             final YangInstanceIdentifier path = TestModel.TEST_PATH;
161             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
162
163             listener.waitForChangeEvents(path);
164
165             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
166             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
167         }};
168     }
169
170     @SuppressWarnings("serial")
171     @Test
172     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
173         // This test tests the timing window in which a change listener is registered before the
174         // shard becomes the leader. We verify that the listener is registered and notified of the
175         // existing data when the shard becomes the leader.
176         new ShardTestKit(getSystem()) {{
177             // For this test, we want to send the RegisterChangeListener message after the shard
178             // has recovered from persistence and before it becomes the leader. So we subclass
179             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
180             // we know that the shard has been initialized to a follower and has started the
181             // election process. The following 2 CountDownLatches are used to coordinate the
182             // ElectionTimeout with the sending of the RegisterChangeListener message.
183             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
184             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
185             final Creator<Shard> creator = new Creator<Shard>() {
186                 boolean firstElectionTimeout = true;
187
188                 @Override
189                 public Shard create() throws Exception {
190                     // Use a non persistent provider because this test actually invokes persist on the journal
191                     // this will cause all other messages to not be queued properly after that.
192                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
193                     // it does do a persist)
194                     return new Shard(newShardBuilder()) {
195                         @Override
196                         public void onReceiveCommand(final Object message) throws Exception {
197                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
198                                 // Got the first ElectionTimeout. We don't forward it to the
199                                 // base Shard yet until we've sent the RegisterChangeListener
200                                 // message. So we signal the onFirstElectionTimeout latch to tell
201                                 // the main thread to send the RegisterChangeListener message and
202                                 // start a thread to wait on the onChangeListenerRegistered latch,
203                                 // which the main thread signals after it has sent the message.
204                                 // After the onChangeListenerRegistered is triggered, we send the
205                                 // original ElectionTimeout message to proceed with the election.
206                                 firstElectionTimeout = false;
207                                 final ActorRef self = getSelf();
208                                 new Thread() {
209                                     @Override
210                                     public void run() {
211                                         Uninterruptibles.awaitUninterruptibly(
212                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
213                                         self.tell(message, self);
214                                     }
215                                 }.start();
216
217                                 onFirstElectionTimeout.countDown();
218                             } else {
219                                 super.onReceiveCommand(message);
220                             }
221                         }
222                     };
223                 }
224             };
225
226             final MockDataChangeListener listener = new MockDataChangeListener(1);
227             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
228                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
229
230             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
231                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
232                     "testRegisterChangeListenerWhenNotLeaderInitially");
233
234             // Write initial data into the in-memory store.
235             final YangInstanceIdentifier path = TestModel.TEST_PATH;
236             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
237
238             // Wait until the shard receives the first ElectionTimeout message.
239             assertEquals("Got first ElectionTimeout", true,
240                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
241
242             // Now send the RegisterChangeListener and wait for the reply.
243             shard.tell(new RegisterChangeListener(path, dclActor,
244                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
245
246             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
247                     RegisterChangeListenerReply.class);
248             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
249
250             // Sanity check - verify the shard is not the leader yet.
251             shard.tell(new FindLeader(), getRef());
252             final FindLeaderReply findLeadeReply =
253                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
254             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
255
256             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
257             // with the election process.
258             onChangeListenerRegistered.countDown();
259
260             // Wait for the shard to become the leader and notify our listener with the existing
261             // data in the store.
262             listener.waitForChangeEvents(path);
263
264             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
265             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
266         }};
267     }
268
269     @Test
270     public void testRegisterDataTreeChangeListener() throws Exception {
271         new ShardTestKit(getSystem()) {{
272             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
273                     newShardProps(), "testRegisterDataTreeChangeListener");
274
275             waitUntilLeader(shard);
276
277             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
278
279             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
280             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
281                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
282
283             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
284
285             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
286                     RegisterDataTreeChangeListenerReply.class);
287             final String replyPath = reply.getListenerRegistrationPath().toString();
288             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
289                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
290
291             final YangInstanceIdentifier path = TestModel.TEST_PATH;
292             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
293
294             listener.waitForChangeEvents();
295
296             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
297             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
298         }};
299     }
300
301     @SuppressWarnings("serial")
302     @Test
303     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
304         new ShardTestKit(getSystem()) {{
305             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
306             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
307             final Creator<Shard> creator = new Creator<Shard>() {
308                 boolean firstElectionTimeout = true;
309
310                 @Override
311                 public Shard create() throws Exception {
312                     return new Shard(Shard.builder().id(shardID).datastoreContext(
313                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
314                         @Override
315                         public void onReceiveCommand(final Object message) throws Exception {
316                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
317                                 firstElectionTimeout = false;
318                                 final ActorRef self = getSelf();
319                                 new Thread() {
320                                     @Override
321                                     public void run() {
322                                         Uninterruptibles.awaitUninterruptibly(
323                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
324                                         self.tell(message, self);
325                                     }
326                                 }.start();
327
328                                 onFirstElectionTimeout.countDown();
329                             } else {
330                                 super.onReceiveCommand(message);
331                             }
332                         }
333                     };
334                 }
335             };
336
337             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
338             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
339                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
340
341             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
342                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
343                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
344
345             final YangInstanceIdentifier path = TestModel.TEST_PATH;
346             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
347
348             assertEquals("Got first ElectionTimeout", true,
349                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
350
351             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
352             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
353                 RegisterDataTreeChangeListenerReply.class);
354             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
355
356             shard.tell(new FindLeader(), getRef());
357             final FindLeaderReply findLeadeReply =
358                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
359             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
360
361             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
362
363             onChangeListenerRegistered.countDown();
364
365             // TODO: investigate why we do not receive data chage events
366             listener.waitForChangeEvents();
367
368             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
369             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
370         }};
371     }
372
373     @Test
374     public void testCreateTransaction(){
375         new ShardTestKit(getSystem()) {{
376             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
377
378             waitUntilLeader(shard);
379
380             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
381
382             shard.tell(new CreateTransaction("txn-1",
383                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
384
385             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
386                     CreateTransactionReply.class);
387
388             final String path = reply.getTransactionActorPath().toString();
389             assertTrue("Unexpected transaction path " + path,
390                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
391
392             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
393         }};
394     }
395
396     @Test
397     public void testCreateTransactionOnChain(){
398         new ShardTestKit(getSystem()) {{
399             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
400
401             waitUntilLeader(shard);
402
403             shard.tell(new CreateTransaction("txn-1",
404                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
405                     getRef());
406
407             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
408                     CreateTransactionReply.class);
409
410             final String path = reply.getTransactionActorPath().toString();
411             assertTrue("Unexpected transaction path " + path,
412                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
413
414             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
415         }};
416     }
417
418     @SuppressWarnings("serial")
419     @Test
420     public void testPeerAddressResolved() throws Exception {
421         new ShardTestKit(getSystem()) {{
422             final CountDownLatch recoveryComplete = new CountDownLatch(1);
423             class TestShard extends Shard {
424                 TestShard() {
425                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
426                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
427                             schemaContext(SCHEMA_CONTEXT));
428                 }
429
430                 String getPeerAddress(String id) {
431                     return getRaftActorContext().getPeerAddress(id);
432                 }
433
434                 @Override
435                 protected void onRecoveryComplete() {
436                     try {
437                         super.onRecoveryComplete();
438                     } finally {
439                         recoveryComplete.countDown();
440                     }
441                 }
442             }
443
444             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
445                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
446                         @Override
447                         public TestShard create() throws Exception {
448                             return new TestShard();
449                         }
450                     })), "testPeerAddressResolved");
451
452             assertEquals("Recovery complete", true,
453                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
454
455             final String address = "akka://foobar";
456             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
457
458             assertEquals("getPeerAddress", address,
459                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
460
461             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
462         }};
463     }
464
465     @Test
466     public void testApplySnapshot() throws Exception {
467
468         ShardTestKit testkit = new ShardTestKit(getSystem());
469
470         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
471                 "testApplySnapshot");
472
473         testkit.waitUntilLeader(shard);
474
475         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
476         store.setSchemaContext(SCHEMA_CONTEXT);
477
478         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
479                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
480                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
481                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
482
483         writeToStore(store, TestModel.TEST_PATH, container);
484
485         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
486         final NormalizedNode<?,?> expected = readStore(store, root);
487
488         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
489                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
490
491         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
492
493         final NormalizedNode<?,?> actual = readStore(shard, root);
494
495         assertEquals("Root node", expected, actual);
496
497         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
498     }
499
500     @Test
501     public void testApplyState() throws Exception {
502
503         ShardTestKit testkit = new ShardTestKit(getSystem());
504
505         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
506
507         testkit.waitUntilLeader(shard);
508
509         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
510
511         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
512                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
513
514         shard.underlyingActor().onReceiveCommand(applyState);
515
516         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
517         assertEquals("Applied state", node, actual);
518
519         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
520     }
521
522     @Test
523     public void testApplyStateWithCandidatePayload() throws Exception {
524
525         ShardTestKit testkit = new ShardTestKit(getSystem());
526
527         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
528
529         testkit.waitUntilLeader(shard);
530
531         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
532         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
533
534         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
535                 DataTreeCandidatePayload.create(candidate)));
536
537         shard.underlyingActor().onReceiveCommand(applyState);
538
539         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
540         assertEquals("Applied state", node, actual);
541
542         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
543     }
544
545     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
546         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
547         testStore.setSchemaContext(SCHEMA_CONTEXT);
548
549         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
550
551         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
552
553         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
554                 SerializationUtils.serializeNormalizedNode(root),
555                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
556         return testStore;
557     }
558
559     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
560         source.validate(mod);
561         final DataTreeCandidate candidate = source.prepare(mod);
562         source.commit(candidate);
563         return DataTreeCandidatePayload.create(candidate);
564     }
565
566     @Test
567     public void testDataTreeCandidateRecovery() throws Exception {
568         // Set up the InMemorySnapshotStore.
569         final DataTree source = setupInMemorySnapshotStore();
570
571         final DataTreeModification writeMod = source.takeSnapshot().newModification();
572         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
573         writeMod.ready();
574         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
575
576         // Set up the InMemoryJournal.
577         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
578
579         final int nListEntries = 16;
580         final Set<Integer> listEntryKeys = new HashSet<>();
581
582         // Add some ModificationPayload entries
583         for (int i = 1; i <= nListEntries; i++) {
584             listEntryKeys.add(Integer.valueOf(i));
585
586             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
587                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
588
589             final DataTreeModification mod = source.takeSnapshot().newModification();
590             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
591             mod.ready();
592             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
593                 payloadForModification(source, mod)));
594         }
595
596         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
597             new ApplyJournalEntries(nListEntries));
598
599         testRecovery(listEntryKeys);
600     }
601
602     @Test
603     public void testModicationRecovery() throws Exception {
604
605         // Set up the InMemorySnapshotStore.
606         setupInMemorySnapshotStore();
607
608         // Set up the InMemoryJournal.
609
610         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
611
612         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
613             new WriteModification(TestModel.OUTER_LIST_PATH,
614                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
615
616         final int nListEntries = 16;
617         final Set<Integer> listEntryKeys = new HashSet<>();
618
619         // Add some ModificationPayload entries
620         for(int i = 1; i <= nListEntries; i++) {
621             listEntryKeys.add(Integer.valueOf(i));
622             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
623                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
624             final Modification mod = new MergeModification(path,
625                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
626             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
627                     newModificationPayload(mod)));
628         }
629
630         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
631                 new ApplyJournalEntries(nListEntries));
632
633         testRecovery(listEntryKeys);
634     }
635
636     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
637         final MutableCompositeModification compMod = new MutableCompositeModification();
638         for(final Modification mod: mods) {
639             compMod.addModification(mod);
640         }
641
642         return new ModificationPayload(compMod);
643     }
644
645     @Test
646     public void testConcurrentThreePhaseCommits() throws Throwable {
647         new ShardTestKit(getSystem()) {{
648             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
649                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
650                     "testConcurrentThreePhaseCommits");
651
652             waitUntilLeader(shard);
653
654          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
655
656             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
657
658             final String transactionID1 = "tx1";
659             final MutableCompositeModification modification1 = new MutableCompositeModification();
660             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
661                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
662
663             final String transactionID2 = "tx2";
664             final MutableCompositeModification modification2 = new MutableCompositeModification();
665             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
666                     TestModel.OUTER_LIST_PATH,
667                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
668                     modification2);
669
670             final String transactionID3 = "tx3";
671             final MutableCompositeModification modification3 = new MutableCompositeModification();
672             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
673                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
674                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
675                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
676                     modification3);
677
678             final long timeoutSec = 5;
679             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
680             final Timeout timeout = new Timeout(duration);
681
682             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
683             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
684                     expectMsgClass(duration, ReadyTransactionReply.class));
685             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
686
687             // Send the CanCommitTransaction message for the first Tx.
688
689             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
690             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
691                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
692             assertEquals("Can commit", true, canCommitReply.getCanCommit());
693
694             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
695             expectMsgClass(duration, ReadyTransactionReply.class);
696
697             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
698             expectMsgClass(duration, ReadyTransactionReply.class);
699
700             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
701             // processed after the first Tx completes.
702
703             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
704                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
705
706             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
707                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
708
709             // Send the CommitTransaction message for the first Tx. After it completes, it should
710             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
711
712             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
713             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
714
715             // Wait for the next 2 Tx's to complete.
716
717             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
718             final CountDownLatch commitLatch = new CountDownLatch(2);
719
720             class OnFutureComplete extends OnComplete<Object> {
721                 private final Class<?> expRespType;
722
723                 OnFutureComplete(final Class<?> expRespType) {
724                     this.expRespType = expRespType;
725                 }
726
727                 @Override
728                 public void onComplete(final Throwable error, final Object resp) {
729                     if(error != null) {
730                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
731                     } else {
732                         try {
733                             assertEquals("Commit response type", expRespType, resp.getClass());
734                             onSuccess(resp);
735                         } catch (final Exception e) {
736                             caughtEx.set(e);
737                         }
738                     }
739                 }
740
741                 void onSuccess(final Object resp) throws Exception {
742                 }
743             }
744
745             class OnCommitFutureComplete extends OnFutureComplete {
746                 OnCommitFutureComplete() {
747                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
748                 }
749
750                 @Override
751                 public void onComplete(final Throwable error, final Object resp) {
752                     super.onComplete(error, resp);
753                     commitLatch.countDown();
754                 }
755             }
756
757             class OnCanCommitFutureComplete extends OnFutureComplete {
758                 private final String transactionID;
759
760                 OnCanCommitFutureComplete(final String transactionID) {
761                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
762                     this.transactionID = transactionID;
763                 }
764
765                 @Override
766                 void onSuccess(final Object resp) throws Exception {
767                     final CanCommitTransactionReply canCommitReply =
768                             CanCommitTransactionReply.fromSerializable(resp);
769                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
770
771                     final Future<Object> commitFuture = Patterns.ask(shard,
772                             new CommitTransaction(transactionID).toSerializable(), timeout);
773                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
774                 }
775             }
776
777             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
778                     getSystem().dispatcher());
779
780             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
781                     getSystem().dispatcher());
782
783             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
784
785             if(caughtEx.get() != null) {
786                 throw caughtEx.get();
787             }
788
789             assertEquals("Commits complete", true, done);
790
791             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
792             inOrder.verify(cohort1).canCommit();
793             inOrder.verify(cohort1).preCommit();
794             inOrder.verify(cohort1).commit();
795             inOrder.verify(cohort2).canCommit();
796             inOrder.verify(cohort2).preCommit();
797             inOrder.verify(cohort2).commit();
798             inOrder.verify(cohort3).canCommit();
799             inOrder.verify(cohort3).preCommit();
800             inOrder.verify(cohort3).commit();
801
802             // Verify data in the data store.
803
804             verifyOuterListEntry(shard, 1);
805
806             verifyLastApplied(shard, 2);
807
808             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
809         }};
810     }
811
812     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
813             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
814         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
815     }
816
817     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
818             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
819             final int messagesSent) {
820         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
821         batched.addModification(new WriteModification(path, data));
822         batched.setReady(ready);
823         batched.setDoCommitOnReady(doCommitOnReady);
824         batched.setTotalMessagesSent(messagesSent);
825         return batched;
826     }
827
828     @Test
829     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
830         new ShardTestKit(getSystem()) {{
831             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
832                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
833                     "testBatchedModificationsWithNoCommitOnReady");
834
835             waitUntilLeader(shard);
836
837             final String transactionID = "tx";
838             final FiniteDuration duration = duration("5 seconds");
839
840             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
841             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
842                 @Override
843                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
844                     if(mockCohort.get() == null) {
845                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
846                     }
847
848                     return mockCohort.get();
849                 }
850             };
851
852             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
853
854             // Send a BatchedModifications to start a transaction.
855
856             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
857                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
858             expectMsgClass(duration, BatchedModificationsReply.class);
859
860             // Send a couple more BatchedModifications.
861
862             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
863                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
864             expectMsgClass(duration, BatchedModificationsReply.class);
865
866             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
867                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
868                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
869             expectMsgClass(duration, ReadyTransactionReply.class);
870
871             // Send the CanCommitTransaction message.
872
873             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
874             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
875                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
876             assertEquals("Can commit", true, canCommitReply.getCanCommit());
877
878             // Send the CanCommitTransaction message.
879
880             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
881             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
882
883             final InOrder inOrder = inOrder(mockCohort.get());
884             inOrder.verify(mockCohort.get()).canCommit();
885             inOrder.verify(mockCohort.get()).preCommit();
886             inOrder.verify(mockCohort.get()).commit();
887
888             // Verify data in the data store.
889
890             verifyOuterListEntry(shard, 1);
891
892             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
893         }};
894     }
895
896     @Test
897     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
898         new ShardTestKit(getSystem()) {{
899             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
900                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
901                     "testBatchedModificationsWithCommitOnReady");
902
903             waitUntilLeader(shard);
904
905             final String transactionID = "tx";
906             final FiniteDuration duration = duration("5 seconds");
907
908             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
909             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
910                 @Override
911                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
912                     if(mockCohort.get() == null) {
913                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
914                     }
915
916                     return mockCohort.get();
917                 }
918             };
919
920             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
921
922             // Send a BatchedModifications to start a transaction.
923
924             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
925                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
926             expectMsgClass(duration, BatchedModificationsReply.class);
927
928             // Send a couple more BatchedModifications.
929
930             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
931                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
932             expectMsgClass(duration, BatchedModificationsReply.class);
933
934             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
935                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
936                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
937
938             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
939
940             final InOrder inOrder = inOrder(mockCohort.get());
941             inOrder.verify(mockCohort.get()).canCommit();
942             inOrder.verify(mockCohort.get()).preCommit();
943             inOrder.verify(mockCohort.get()).commit();
944
945             // Verify data in the data store.
946
947             verifyOuterListEntry(shard, 1);
948
949             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
950         }};
951     }
952
953     @Test(expected=IllegalStateException.class)
954     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
955         new ShardTestKit(getSystem()) {{
956             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
957                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
958                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
959
960             waitUntilLeader(shard);
961
962             final String transactionID = "tx1";
963             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
964             batched.setReady(true);
965             batched.setTotalMessagesSent(2);
966
967             shard.tell(batched, getRef());
968
969             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
970
971             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
972
973             if(failure != null) {
974                 throw failure.cause();
975             }
976         }};
977     }
978
979     @Test
980     public void testBatchedModificationsWithOperationFailure() throws Throwable {
981         new ShardTestKit(getSystem()) {{
982             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
983                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
984                     "testBatchedModificationsWithOperationFailure");
985
986             waitUntilLeader(shard);
987
988             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
989             // write will not validate the children for performance reasons.
990
991             String transactionID = "tx1";
992
993             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
994                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
995                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
996
997             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
998             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
999             shard.tell(batched, getRef());
1000             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1001
1002             Throwable cause = failure.cause();
1003
1004             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1005             batched.setReady(true);
1006             batched.setTotalMessagesSent(2);
1007
1008             shard.tell(batched, getRef());
1009
1010             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1011             assertEquals("Failure cause", cause, failure.cause());
1012
1013             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1014         }};
1015     }
1016
1017     @SuppressWarnings("unchecked")
1018     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1019         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1020         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1021         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1022                 outerList.getValue() instanceof Iterable);
1023         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1024         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1025                 entry instanceof MapEntryNode);
1026         final MapEntryNode mapEntry = (MapEntryNode)entry;
1027         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1028                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1029         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1030         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1031     }
1032
1033     @Test
1034     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1035         new ShardTestKit(getSystem()) {{
1036             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1037                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038                     "testBatchedModificationsOnTransactionChain");
1039
1040             waitUntilLeader(shard);
1041
1042             final String transactionChainID = "txChain";
1043             final String transactionID1 = "tx1";
1044             final String transactionID2 = "tx2";
1045
1046             final FiniteDuration duration = duration("5 seconds");
1047
1048             // Send a BatchedModifications to start a chained write transaction and ready it.
1049
1050             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1051             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1052             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1053                     containerNode, true, false, 1), getRef());
1054             expectMsgClass(duration, ReadyTransactionReply.class);
1055
1056             // Create a read Tx on the same chain.
1057
1058             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1059                     transactionChainID).toSerializable(), getRef());
1060
1061             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1062
1063             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1064             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1065             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1066
1067             // Commit the write transaction.
1068
1069             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1070             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1071                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1072             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1073
1074             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1075             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1076
1077             // Verify data in the data store.
1078
1079             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1080             assertEquals("Stored node", containerNode, actualNode);
1081
1082             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1083         }};
1084     }
1085
1086     @Test
1087     public void testOnBatchedModificationsWhenNotLeader() {
1088         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1089         new ShardTestKit(getSystem()) {{
1090             final Creator<Shard> creator = new Creator<Shard>() {
1091                 private static final long serialVersionUID = 1L;
1092
1093                 @Override
1094                 public Shard create() throws Exception {
1095                     return new Shard(newShardBuilder()) {
1096                         @Override
1097                         protected boolean isLeader() {
1098                             return overrideLeaderCalls.get() ? false : super.isLeader();
1099                         }
1100
1101                         @Override
1102                         protected ActorSelection getLeader() {
1103                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1104                                 super.getLeader();
1105                         }
1106                     };
1107                 }
1108             };
1109
1110             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1111                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1112
1113             waitUntilLeader(shard);
1114
1115             overrideLeaderCalls.set(true);
1116
1117             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1118
1119             shard.tell(batched, ActorRef.noSender());
1120
1121             expectMsgEquals(batched);
1122
1123             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1124         }};
1125     }
1126
1127     @Test
1128     public void testTransactionMessagesWithNoLeader() {
1129         new ShardTestKit(getSystem()) {{
1130             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
1131                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
1132             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1133                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1134                     "testTransactionMessagesWithNoLeader");
1135
1136             waitUntilNoLeader(shard);
1137
1138             shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
1139             Failure failure = expectMsgClass(Failure.class);
1140             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
1141
1142             shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
1143                     DataStoreVersions.CURRENT_VERSION, true), getRef());
1144             failure = expectMsgClass(Failure.class);
1145             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
1146
1147             shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
1148             failure = expectMsgClass(Failure.class);
1149             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
1150         }};
1151     }
1152
1153     @Test
1154     public void testReadyWithImmediateCommit() throws Exception{
1155         testReadyWithImmediateCommit(true);
1156         testReadyWithImmediateCommit(false);
1157     }
1158
1159     public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
1160         new ShardTestKit(getSystem()) {{
1161             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1162                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1163                     "testReadyWithImmediateCommit-" + readWrite);
1164
1165             waitUntilLeader(shard);
1166
1167             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1168
1169             final String transactionID = "tx1";
1170             final MutableCompositeModification modification = new MutableCompositeModification();
1171             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1172             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1173                     TestModel.TEST_PATH, containerNode, modification);
1174
1175             final FiniteDuration duration = duration("5 seconds");
1176
1177             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1178
1179             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1180
1181             final InOrder inOrder = inOrder(cohort);
1182             inOrder.verify(cohort).canCommit();
1183             inOrder.verify(cohort).preCommit();
1184             inOrder.verify(cohort).commit();
1185
1186             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1187             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1188
1189             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1190         }};
1191     }
1192
1193     @Test
1194     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1195         new ShardTestKit(getSystem()) {{
1196             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1197                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1198                     "testReadyLocalTransactionWithImmediateCommit");
1199
1200             waitUntilLeader(shard);
1201
1202             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1203
1204             final DataTreeModification modification = dataStore.newModification();
1205
1206             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1207             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1208             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1209             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1210
1211             final String txId = "tx1";
1212             modification.ready();
1213             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1214
1215             shard.tell(readyMessage, getRef());
1216
1217             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1218
1219             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1220             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1221
1222             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1223         }};
1224     }
1225
1226     @Test
1227     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1228         new ShardTestKit(getSystem()) {{
1229             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1230                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1231                     "testReadyLocalTransactionWithThreePhaseCommit");
1232
1233             waitUntilLeader(shard);
1234
1235             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1236
1237             final DataTreeModification modification = dataStore.newModification();
1238
1239             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1240             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1241             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1242             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1243
1244             final String txId = "tx1";
1245                 modification.ready();
1246             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1247
1248             shard.tell(readyMessage, getRef());
1249
1250             expectMsgClass(ReadyTransactionReply.class);
1251
1252             // Send the CanCommitTransaction message.
1253
1254             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1255             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1256                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1257             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1258
1259             // Send the CanCommitTransaction message.
1260
1261             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1262             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1263
1264             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1265             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1266
1267             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1268         }};
1269     }
1270
1271     @Test
1272     public void testCommitWithPersistenceDisabled() throws Throwable {
1273         testCommitWithPersistenceDisabled(true);
1274         testCommitWithPersistenceDisabled(false);
1275     }
1276
1277     public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1278         dataStoreContextBuilder.persistent(false);
1279         new ShardTestKit(getSystem()) {{
1280             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1281                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1282                     "testCommitWithPersistenceDisabled-" + readWrite);
1283
1284             waitUntilLeader(shard);
1285
1286             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1287
1288             // Setup a simulated transactions with a mock cohort.
1289
1290             final String transactionID = "tx";
1291             final MutableCompositeModification modification = new MutableCompositeModification();
1292             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1293             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1294                 TestModel.TEST_PATH, containerNode, modification);
1295
1296             final FiniteDuration duration = duration("5 seconds");
1297
1298             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1299             expectMsgClass(duration, ReadyTransactionReply.class);
1300
1301             // Send the CanCommitTransaction message.
1302
1303             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1304             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1305                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1306             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1307
1308             // Send the CanCommitTransaction message.
1309
1310             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1311             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1312
1313             final InOrder inOrder = inOrder(cohort);
1314             inOrder.verify(cohort).canCommit();
1315             inOrder.verify(cohort).preCommit();
1316             inOrder.verify(cohort).commit();
1317
1318             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1319             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1320
1321             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1322         }};
1323     }
1324
1325     private static DataTreeCandidateTip mockCandidate(final String name) {
1326         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1327         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1328         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1329         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1330         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1331         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1332         return mockCandidate;
1333     }
1334
1335     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1336         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1337         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1338         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1339         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1340         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1341         return mockCandidate;
1342     }
1343
1344     @Test
1345     public void testCommitWhenTransactionHasNoModifications() {
1346         testCommitWhenTransactionHasNoModifications(true);
1347         testCommitWhenTransactionHasNoModifications(false);
1348     }
1349
1350     public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1351         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1352         // but here that need not happen
1353         new ShardTestKit(getSystem()) {
1354             {
1355                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1356                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1357                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1358
1359                 waitUntilLeader(shard);
1360
1361                 final String transactionID = "tx1";
1362                 final MutableCompositeModification modification = new MutableCompositeModification();
1363                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1364                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1365                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1366                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1367                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1368
1369                 final FiniteDuration duration = duration("5 seconds");
1370
1371                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1372                 expectMsgClass(duration, ReadyTransactionReply.class);
1373
1374                 // Send the CanCommitTransaction message.
1375
1376                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1377                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1378                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1379                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1380
1381                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1382                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1383
1384                 final InOrder inOrder = inOrder(cohort);
1385                 inOrder.verify(cohort).canCommit();
1386                 inOrder.verify(cohort).preCommit();
1387                 inOrder.verify(cohort).commit();
1388
1389                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1390                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1391
1392                 // Use MBean for verification
1393                 // Committed transaction count should increase as usual
1394                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1395
1396                 // Commit index should not advance because this does not go into the journal
1397                 assertEquals(-1, shardStats.getCommitIndex());
1398
1399                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1400
1401             }
1402         };
1403     }
1404
1405     @Test
1406     public void testCommitWhenTransactionHasModifications() {
1407         testCommitWhenTransactionHasModifications(true);
1408         testCommitWhenTransactionHasModifications(false);
1409     }
1410
1411     public void testCommitWhenTransactionHasModifications(final boolean readWrite){
1412         new ShardTestKit(getSystem()) {
1413             {
1414                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1415                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1416                         "testCommitWhenTransactionHasModifications-" + readWrite);
1417
1418                 waitUntilLeader(shard);
1419
1420                 final String transactionID = "tx1";
1421                 final MutableCompositeModification modification = new MutableCompositeModification();
1422                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1423                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1424                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1425                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1426                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1427                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1428
1429                 final FiniteDuration duration = duration("5 seconds");
1430
1431                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1432                 expectMsgClass(duration, ReadyTransactionReply.class);
1433
1434                 // Send the CanCommitTransaction message.
1435
1436                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1437                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1438                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1439                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1440
1441                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1442                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1443
1444                 final InOrder inOrder = inOrder(cohort);
1445                 inOrder.verify(cohort).canCommit();
1446                 inOrder.verify(cohort).preCommit();
1447                 inOrder.verify(cohort).commit();
1448
1449                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1450                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1451
1452                 // Use MBean for verification
1453                 // Committed transaction count should increase as usual
1454                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1455
1456                 // Commit index should advance as we do not have an empty modification
1457                 assertEquals(0, shardStats.getCommitIndex());
1458
1459                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1460
1461             }
1462         };
1463     }
1464
1465     @Test
1466     public void testCommitPhaseFailure() throws Throwable {
1467         testCommitPhaseFailure(true);
1468         testCommitPhaseFailure(false);
1469     }
1470
1471     public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1472         new ShardTestKit(getSystem()) {{
1473             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1474                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1475                     "testCommitPhaseFailure-" + readWrite);
1476
1477             waitUntilLeader(shard);
1478
1479             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1480             // commit phase.
1481
1482             final String transactionID1 = "tx1";
1483             final MutableCompositeModification modification1 = new MutableCompositeModification();
1484             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1485             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1486             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1487             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1488             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1489
1490             final String transactionID2 = "tx2";
1491             final MutableCompositeModification modification2 = new MutableCompositeModification();
1492             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1493             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1494
1495             final FiniteDuration duration = duration("5 seconds");
1496             final Timeout timeout = new Timeout(duration);
1497
1498             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1499             expectMsgClass(duration, ReadyTransactionReply.class);
1500
1501             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1502             expectMsgClass(duration, ReadyTransactionReply.class);
1503
1504             // Send the CanCommitTransaction message for the first Tx.
1505
1506             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1507             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1508                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1509             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1510
1511             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1512             // processed after the first Tx completes.
1513
1514             final Future<Object> canCommitFuture = Patterns.ask(shard,
1515                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1516
1517             // Send the CommitTransaction message for the first Tx. This should send back an error
1518             // and trigger the 2nd Tx to proceed.
1519
1520             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1521             expectMsgClass(duration, akka.actor.Status.Failure.class);
1522
1523             // Wait for the 2nd Tx to complete the canCommit phase.
1524
1525             final CountDownLatch latch = new CountDownLatch(1);
1526             canCommitFuture.onComplete(new OnComplete<Object>() {
1527                 @Override
1528                 public void onComplete(final Throwable t, final Object resp) {
1529                     latch.countDown();
1530                 }
1531             }, getSystem().dispatcher());
1532
1533             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1534
1535             final InOrder inOrder = inOrder(cohort1, cohort2);
1536             inOrder.verify(cohort1).canCommit();
1537             inOrder.verify(cohort1).preCommit();
1538             inOrder.verify(cohort1).commit();
1539             inOrder.verify(cohort2).canCommit();
1540
1541             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1542         }};
1543     }
1544
1545     @Test
1546     public void testPreCommitPhaseFailure() throws Throwable {
1547         testPreCommitPhaseFailure(true);
1548         testPreCommitPhaseFailure(false);
1549     }
1550
1551     public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1552         new ShardTestKit(getSystem()) {{
1553             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1554                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1555                     "testPreCommitPhaseFailure-" + readWrite);
1556
1557             waitUntilLeader(shard);
1558
1559             final String transactionID1 = "tx1";
1560             final MutableCompositeModification modification1 = new MutableCompositeModification();
1561             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1562             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1563             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1564
1565             final String transactionID2 = "tx2";
1566             final MutableCompositeModification modification2 = new MutableCompositeModification();
1567             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1568             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1569
1570             final FiniteDuration duration = duration("5 seconds");
1571             final Timeout timeout = new Timeout(duration);
1572
1573             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1574             expectMsgClass(duration, ReadyTransactionReply.class);
1575
1576             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1577             expectMsgClass(duration, ReadyTransactionReply.class);
1578
1579             // Send the CanCommitTransaction message for the first Tx.
1580
1581             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1582             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1583                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1584             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1585
1586             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1587             // processed after the first Tx completes.
1588
1589             final Future<Object> canCommitFuture = Patterns.ask(shard,
1590                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1591
1592             // Send the CommitTransaction message for the first Tx. This should send back an error
1593             // and trigger the 2nd Tx to proceed.
1594
1595             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1596             expectMsgClass(duration, akka.actor.Status.Failure.class);
1597
1598             // Wait for the 2nd Tx to complete the canCommit phase.
1599
1600             final CountDownLatch latch = new CountDownLatch(1);
1601             canCommitFuture.onComplete(new OnComplete<Object>() {
1602                 @Override
1603                 public void onComplete(final Throwable t, final Object resp) {
1604                     latch.countDown();
1605                 }
1606             }, getSystem().dispatcher());
1607
1608             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1609
1610             final InOrder inOrder = inOrder(cohort1, cohort2);
1611             inOrder.verify(cohort1).canCommit();
1612             inOrder.verify(cohort1).preCommit();
1613             inOrder.verify(cohort2).canCommit();
1614
1615             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1616         }};
1617     }
1618
1619     @Test
1620     public void testCanCommitPhaseFailure() throws Throwable {
1621         testCanCommitPhaseFailure(true);
1622         testCanCommitPhaseFailure(false);
1623     }
1624
1625     public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1626         new ShardTestKit(getSystem()) {{
1627             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1628                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1629                     "testCanCommitPhaseFailure-" + readWrite);
1630
1631             waitUntilLeader(shard);
1632
1633             final FiniteDuration duration = duration("5 seconds");
1634
1635             final String transactionID1 = "tx1";
1636             final MutableCompositeModification modification = new MutableCompositeModification();
1637             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1638             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1639
1640             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1641             expectMsgClass(duration, ReadyTransactionReply.class);
1642
1643             // Send the CanCommitTransaction message.
1644
1645             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1646             expectMsgClass(duration, akka.actor.Status.Failure.class);
1647
1648             // Send another can commit to ensure the failed one got cleaned up.
1649
1650             reset(cohort);
1651
1652             final String transactionID2 = "tx2";
1653             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1654
1655             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1656             expectMsgClass(duration, ReadyTransactionReply.class);
1657
1658             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1659             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1660                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1661             assertEquals("getCanCommit", true, reply.getCanCommit());
1662
1663             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1664         }};
1665     }
1666
1667     @Test
1668     public void testCanCommitPhaseFalseResponse() throws Throwable {
1669         testCanCommitPhaseFalseResponse(true);
1670         testCanCommitPhaseFalseResponse(false);
1671     }
1672
1673     public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1674         new ShardTestKit(getSystem()) {{
1675             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1676                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1677                     "testCanCommitPhaseFalseResponse-" + readWrite);
1678
1679             waitUntilLeader(shard);
1680
1681             final FiniteDuration duration = duration("5 seconds");
1682
1683             final String transactionID1 = "tx1";
1684             final MutableCompositeModification modification = new MutableCompositeModification();
1685             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1686             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1687
1688             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1689             expectMsgClass(duration, ReadyTransactionReply.class);
1690
1691             // Send the CanCommitTransaction message.
1692
1693             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1694             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1695                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1696             assertEquals("getCanCommit", false, reply.getCanCommit());
1697
1698             // Send another can commit to ensure the failed one got cleaned up.
1699
1700             reset(cohort);
1701
1702             final String transactionID2 = "tx2";
1703             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1704
1705             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1706             expectMsgClass(duration, ReadyTransactionReply.class);
1707
1708             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1709             reply = CanCommitTransactionReply.fromSerializable(
1710                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1711             assertEquals("getCanCommit", true, reply.getCanCommit());
1712
1713             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1714         }};
1715     }
1716
1717     @Test
1718     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1719         testImmediateCommitWithCanCommitPhaseFailure(true);
1720         testImmediateCommitWithCanCommitPhaseFailure(false);
1721     }
1722
1723     public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1724         new ShardTestKit(getSystem()) {{
1725             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1726                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1727                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1728
1729             waitUntilLeader(shard);
1730
1731             final FiniteDuration duration = duration("5 seconds");
1732
1733             final String transactionID1 = "tx1";
1734             final MutableCompositeModification modification = new MutableCompositeModification();
1735             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1736             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1737
1738             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1739
1740             expectMsgClass(duration, akka.actor.Status.Failure.class);
1741
1742             // Send another can commit to ensure the failed one got cleaned up.
1743
1744             reset(cohort);
1745
1746             final String transactionID2 = "tx2";
1747             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1748             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1749             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1750             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1751             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1752             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1753             doReturn(candidateRoot).when(candidate).getRootNode();
1754             doReturn(candidate).when(cohort).getCandidate();
1755
1756             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1757
1758             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1759
1760             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1761         }};
1762     }
1763
1764     @Test
1765     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1766         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1767         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1768     }
1769
1770     public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1771         new ShardTestKit(getSystem()) {{
1772             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1773                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1774                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1775
1776             waitUntilLeader(shard);
1777
1778             final FiniteDuration duration = duration("5 seconds");
1779
1780             final String transactionID = "tx1";
1781             final MutableCompositeModification modification = new MutableCompositeModification();
1782             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1783             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1784
1785             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1786
1787             expectMsgClass(duration, akka.actor.Status.Failure.class);
1788
1789             // Send another can commit to ensure the failed one got cleaned up.
1790
1791             reset(cohort);
1792
1793             final String transactionID2 = "tx2";
1794             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1795             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1796             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1797             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1798             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1799             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1800             doReturn(candidateRoot).when(candidate).getRootNode();
1801             doReturn(candidate).when(cohort).getCandidate();
1802
1803             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1804
1805             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1806
1807             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1808         }};
1809     }
1810
1811     @Test
1812     public void testAbortBeforeFinishCommit() throws Throwable {
1813         testAbortBeforeFinishCommit(true);
1814         testAbortBeforeFinishCommit(false);
1815     }
1816
1817     public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1818         new ShardTestKit(getSystem()) {{
1819             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1820                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1821                     "testAbortBeforeFinishCommit-" + readWrite);
1822
1823             waitUntilLeader(shard);
1824
1825             final FiniteDuration duration = duration("5 seconds");
1826             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1827
1828             final String transactionID = "tx1";
1829             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1830                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1831                 @Override
1832                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1833                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1834
1835                     // Simulate an AbortTransaction message occurring during replication, after
1836                     // persisting and before finishing the commit to the in-memory store.
1837                     // We have no followers so due to optimizations in the RaftActor, it does not
1838                     // attempt replication and thus we can't send an AbortTransaction message b/c
1839                     // it would be processed too late after CommitTransaction completes. So we'll
1840                     // simulate an AbortTransaction message occurring during replication by calling
1841                     // the shard directly.
1842                     //
1843                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1844
1845                     return preCommitFuture;
1846                 }
1847             };
1848
1849             final MutableCompositeModification modification = new MutableCompositeModification();
1850             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1851                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1852                     modification, preCommit);
1853
1854             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1855             expectMsgClass(duration, ReadyTransactionReply.class);
1856
1857             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1858             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1859                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1860             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1861
1862             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1863             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1864
1865             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1866
1867             // Since we're simulating an abort occurring during replication and before finish commit,
1868             // the data should still get written to the in-memory store since we've gotten past
1869             // canCommit and preCommit and persisted the data.
1870             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1871
1872             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1873         }};
1874     }
1875
1876     @Test
1877     public void testTransactionCommitTimeout() throws Throwable {
1878         testTransactionCommitTimeout(true);
1879         testTransactionCommitTimeout(false);
1880     }
1881
1882     public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1883         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1884
1885         new ShardTestKit(getSystem()) {{
1886             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1887                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1888                     "testTransactionCommitTimeout-" + readWrite);
1889
1890             waitUntilLeader(shard);
1891
1892             final FiniteDuration duration = duration("5 seconds");
1893
1894             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1895
1896             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1897             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1898                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1899
1900             // Create 1st Tx - will timeout
1901
1902             final String transactionID1 = "tx1";
1903             final MutableCompositeModification modification1 = new MutableCompositeModification();
1904             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1905                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1906                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1907                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1908                     modification1);
1909
1910             // Create 2nd Tx
1911
1912             final String transactionID2 = "tx3";
1913             final MutableCompositeModification modification2 = new MutableCompositeModification();
1914             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1915                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1916             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1917                     listNodePath,
1918                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1919                     modification2);
1920
1921             // Ready the Tx's
1922
1923             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1924             expectMsgClass(duration, ReadyTransactionReply.class);
1925
1926             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1927             expectMsgClass(duration, ReadyTransactionReply.class);
1928
1929             // canCommit 1st Tx. We don't send the commit so it should timeout.
1930
1931             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1932             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1933
1934             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1935
1936             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1937             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1938
1939             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1940
1941             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1942             expectMsgClass(duration, akka.actor.Status.Failure.class);
1943
1944             // Commit the 2nd Tx.
1945
1946             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1947             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1948
1949             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1950             assertNotNull(listNodePath + " not found", node);
1951
1952             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1953         }};
1954     }
1955
1956     @Test
1957     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1958         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1959
1960         new ShardTestKit(getSystem()) {{
1961             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1962                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1963                     "testTransactionCommitQueueCapacityExceeded");
1964
1965             waitUntilLeader(shard);
1966
1967             final FiniteDuration duration = duration("5 seconds");
1968
1969             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1970
1971             final String transactionID1 = "tx1";
1972             final MutableCompositeModification modification1 = new MutableCompositeModification();
1973             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1974                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1975
1976             final String transactionID2 = "tx2";
1977             final MutableCompositeModification modification2 = new MutableCompositeModification();
1978             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1979                     TestModel.OUTER_LIST_PATH,
1980                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1981                     modification2);
1982
1983             final String transactionID3 = "tx3";
1984             final MutableCompositeModification modification3 = new MutableCompositeModification();
1985             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1986                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1987
1988             // Ready the Tx's
1989
1990             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1991             expectMsgClass(duration, ReadyTransactionReply.class);
1992
1993             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1994             expectMsgClass(duration, ReadyTransactionReply.class);
1995
1996             // The 3rd Tx should exceed queue capacity and fail.
1997
1998             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1999             expectMsgClass(duration, akka.actor.Status.Failure.class);
2000
2001             // canCommit 1st Tx.
2002
2003             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2004             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2005
2006             // canCommit the 2nd Tx - it should get queued.
2007
2008             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
2009
2010             // canCommit the 3rd Tx - should exceed queue capacity and fail.
2011
2012             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2013             expectMsgClass(duration, akka.actor.Status.Failure.class);
2014
2015             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2016         }};
2017     }
2018
2019     @Test
2020     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
2021         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2022
2023         new ShardTestKit(getSystem()) {{
2024             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2025                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2026                     "testTransactionCommitWithPriorExpiredCohortEntries");
2027
2028             waitUntilLeader(shard);
2029
2030             final FiniteDuration duration = duration("5 seconds");
2031
2032             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2033
2034             final String transactionID1 = "tx1";
2035             final MutableCompositeModification modification1 = new MutableCompositeModification();
2036             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2037                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2038
2039             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2040             expectMsgClass(duration, ReadyTransactionReply.class);
2041
2042             final String transactionID2 = "tx2";
2043             final MutableCompositeModification modification2 = new MutableCompositeModification();
2044             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2045                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2046
2047             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2048             expectMsgClass(duration, ReadyTransactionReply.class);
2049
2050             final String transactionID3 = "tx3";
2051             final MutableCompositeModification modification3 = new MutableCompositeModification();
2052             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2053                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2054
2055             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
2056             expectMsgClass(duration, ReadyTransactionReply.class);
2057
2058             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2059             // should expire from the queue and the last one should be processed.
2060
2061             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2062             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2063
2064             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2065         }};
2066     }
2067
2068     @Test
2069     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2070         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2071
2072         new ShardTestKit(getSystem()) {{
2073             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2074                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2075                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2076
2077             waitUntilLeader(shard);
2078
2079             final FiniteDuration duration = duration("5 seconds");
2080
2081             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2082
2083             final String transactionID1 = "tx1";
2084             final MutableCompositeModification modification1 = new MutableCompositeModification();
2085             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2086                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2087
2088             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2089             expectMsgClass(duration, ReadyTransactionReply.class);
2090
2091             // CanCommit the first one so it's the current in-progress CohortEntry.
2092
2093             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2094             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2095
2096             // Ready the second Tx.
2097
2098             final String transactionID2 = "tx2";
2099             final MutableCompositeModification modification2 = new MutableCompositeModification();
2100             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2101                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2102
2103             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2104             expectMsgClass(duration, ReadyTransactionReply.class);
2105
2106             // Ready the third Tx.
2107
2108             final String transactionID3 = "tx3";
2109             final DataTreeModification modification3 = dataStore.newModification();
2110             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2111                     .apply(modification3);
2112                 modification3.ready();
2113             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2114
2115             shard.tell(readyMessage, getRef());
2116
2117             // Commit the first Tx. After completing, the second should expire from the queue and the third
2118             // Tx committed.
2119
2120             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2121             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2122
2123             // Expect commit reply from the third Tx.
2124
2125             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2126
2127             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2128             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2129
2130             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2131         }};
2132     }
2133
2134     @Test
2135     public void testCanCommitBeforeReadyFailure() throws Throwable {
2136         new ShardTestKit(getSystem()) {{
2137             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2138                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2139                     "testCanCommitBeforeReadyFailure");
2140
2141             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2142             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2143
2144             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2145         }};
2146     }
2147
2148     @Test
2149     public void testAbortCurrentTransaction() throws Throwable {
2150         testAbortCurrentTransaction(true);
2151         testAbortCurrentTransaction(false);
2152     }
2153
2154     public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
2155         new ShardTestKit(getSystem()) {{
2156             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2157                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2158                     "testAbortCurrentTransaction-" + readWrite);
2159
2160             waitUntilLeader(shard);
2161
2162             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2163
2164             final String transactionID1 = "tx1";
2165             final MutableCompositeModification modification1 = new MutableCompositeModification();
2166             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2167             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2168             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2169
2170             final String transactionID2 = "tx2";
2171             final MutableCompositeModification modification2 = new MutableCompositeModification();
2172             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2173             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2174
2175             final FiniteDuration duration = duration("5 seconds");
2176             final Timeout timeout = new Timeout(duration);
2177
2178             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2179             expectMsgClass(duration, ReadyTransactionReply.class);
2180
2181             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2182             expectMsgClass(duration, ReadyTransactionReply.class);
2183
2184             // Send the CanCommitTransaction message for the first Tx.
2185
2186             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2187             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2188                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2189             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2190
2191             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2192             // processed after the first Tx completes.
2193
2194             final Future<Object> canCommitFuture = Patterns.ask(shard,
2195                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2196
2197             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2198             // Tx to proceed.
2199
2200             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2201             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2202
2203             // Wait for the 2nd Tx to complete the canCommit phase.
2204
2205             Await.ready(canCommitFuture, duration);
2206
2207             final InOrder inOrder = inOrder(cohort1, cohort2);
2208             inOrder.verify(cohort1).canCommit();
2209             inOrder.verify(cohort2).canCommit();
2210
2211             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2212         }};
2213     }
2214
2215     @Test
2216     public void testAbortQueuedTransaction() throws Throwable {
2217         testAbortQueuedTransaction(true);
2218         testAbortQueuedTransaction(false);
2219     }
2220
2221     public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
2222         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2223         new ShardTestKit(getSystem()) {{
2224             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2225             @SuppressWarnings("serial")
2226             final Creator<Shard> creator = new Creator<Shard>() {
2227                 @Override
2228                 public Shard create() throws Exception {
2229                     return new Shard(newShardBuilder()) {
2230                         @Override
2231                         public void onReceiveCommand(final Object message) throws Exception {
2232                             super.onReceiveCommand(message);
2233                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2234                                 if(cleaupCheckLatch.get() != null) {
2235                                     cleaupCheckLatch.get().countDown();
2236                                 }
2237                             }
2238                         }
2239                     };
2240                 }
2241             };
2242
2243             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2244                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2245                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2246
2247             waitUntilLeader(shard);
2248
2249             final String transactionID = "tx1";
2250
2251             final MutableCompositeModification modification = new MutableCompositeModification();
2252             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2253             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2254
2255             final FiniteDuration duration = duration("5 seconds");
2256
2257             // Ready the tx.
2258
2259             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2260             expectMsgClass(duration, ReadyTransactionReply.class);
2261
2262             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2263
2264             // Send the AbortTransaction message.
2265
2266             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2267             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2268
2269             verify(cohort).abort();
2270
2271             // Verify the tx cohort is removed from queue at the cleanup check interval.
2272
2273             cleaupCheckLatch.set(new CountDownLatch(1));
2274             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2275                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2276
2277             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2278
2279             // Now send CanCommitTransaction - should fail.
2280
2281             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2282
2283             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2284             assertTrue("Failure type", failure instanceof IllegalStateException);
2285
2286             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2287         }};
2288     }
2289
2290     @Test
2291     public void testCreateSnapshot() throws Exception {
2292         testCreateSnapshot(true, "testCreateSnapshot");
2293     }
2294
2295     @Test
2296     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2297         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2298     }
2299
2300     @SuppressWarnings("serial")
2301     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2302
2303         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2304
2305         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2306         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2307             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2308                 super(delegate);
2309             }
2310
2311             @Override
2312             public void saveSnapshot(final Object o) {
2313                 savedSnapshot.set(o);
2314                 super.saveSnapshot(o);
2315             }
2316         }
2317
2318         dataStoreContextBuilder.persistent(persistent);
2319
2320         new ShardTestKit(getSystem()) {{
2321             class TestShard extends Shard {
2322
2323                 protected TestShard(AbstractBuilder<?, ?> builder) {
2324                     super(builder);
2325                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2326                 }
2327
2328                 @Override
2329                 public void handleCommand(final Object message) {
2330                     super.handleCommand(message);
2331
2332                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2333                         latch.get().countDown();
2334                     }
2335                 }
2336
2337                 @Override
2338                 public RaftActorContext getRaftActorContext() {
2339                     return super.getRaftActorContext();
2340                 }
2341             }
2342
2343             final Creator<Shard> creator = new Creator<Shard>() {
2344                 @Override
2345                 public Shard create() throws Exception {
2346                     return new TestShard(newShardBuilder());
2347                 }
2348             };
2349
2350             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2351                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2352
2353             waitUntilLeader(shard);
2354             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2355
2356             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2357
2358             // Trigger creation of a snapshot by ensuring
2359             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2360             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2361             awaitAndValidateSnapshot(expectedRoot);
2362
2363             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2364             awaitAndValidateSnapshot(expectedRoot);
2365
2366             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2367         }
2368
2369             private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2370                                               ) throws InterruptedException {
2371                 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2372                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2373
2374                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2375                         savedSnapshot.get() instanceof Snapshot);
2376
2377                 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2378
2379                 latch.set(new CountDownLatch(1));
2380                 savedSnapshot.set(null);
2381             }
2382
2383             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2384
2385                 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2386                 assertEquals("Root node", expectedRoot, actual);
2387
2388            }
2389         };
2390     }
2391
2392     /**
2393      * This test simply verifies that the applySnapShot logic will work
2394      * @throws ReadFailedException
2395      * @throws DataValidationFailedException
2396      */
2397     @Test
2398     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2399         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2400         store.setSchemaContext(SCHEMA_CONTEXT);
2401
2402         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2403         putTransaction.write(TestModel.TEST_PATH,
2404             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2405         commitTransaction(store, putTransaction);
2406
2407
2408         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2409
2410         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2411
2412         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2413         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2414
2415         commitTransaction(store, writeTransaction);
2416
2417         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2418
2419         assertEquals(expected, actual);
2420     }
2421
2422     @Test
2423     public void testRecoveryApplicable(){
2424
2425         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2426                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2427
2428         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2429                 schemaContext(SCHEMA_CONTEXT).props();
2430
2431         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2432                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2433
2434         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2435                 schemaContext(SCHEMA_CONTEXT).props();
2436
2437         new ShardTestKit(getSystem()) {{
2438             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2439                     persistentProps, "testPersistence1");
2440
2441             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2442
2443             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2444
2445             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2446                     nonPersistentProps, "testPersistence2");
2447
2448             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2449
2450             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2451
2452         }};
2453
2454     }
2455
2456     @Test
2457     public void testOnDatastoreContext() {
2458         new ShardTestKit(getSystem()) {{
2459             dataStoreContextBuilder.persistent(true);
2460
2461             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2462
2463             assertEquals("isRecoveryApplicable", true,
2464                     shard.underlyingActor().persistence().isRecoveryApplicable());
2465
2466             waitUntilLeader(shard);
2467
2468             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2469
2470             assertEquals("isRecoveryApplicable", false,
2471                 shard.underlyingActor().persistence().isRecoveryApplicable());
2472
2473             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2474
2475             assertEquals("isRecoveryApplicable", true,
2476                 shard.underlyingActor().persistence().isRecoveryApplicable());
2477
2478             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2479         }};
2480     }
2481
2482     @Test
2483     public void testRegisterRoleChangeListener() throws Exception {
2484         new ShardTestKit(getSystem()) {
2485             {
2486                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2487                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2488                         "testRegisterRoleChangeListener");
2489
2490                 waitUntilLeader(shard);
2491
2492                 final TestActorRef<MessageCollectorActor> listener =
2493                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2494
2495                 shard.tell(new RegisterRoleChangeListener(), listener);
2496
2497                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2498
2499                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2500                     ShardLeaderStateChanged.class);
2501                 assertEquals("getLocalShardDataTree present", true,
2502                         leaderStateChanged.getLocalShardDataTree().isPresent());
2503                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2504                     leaderStateChanged.getLocalShardDataTree().get());
2505
2506                 MessageCollectorActor.clearMessages(listener);
2507
2508                 // Force a leader change
2509
2510                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2511
2512                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2513                         ShardLeaderStateChanged.class);
2514                 assertEquals("getLocalShardDataTree present", false,
2515                         leaderStateChanged.getLocalShardDataTree().isPresent());
2516
2517                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2518             }
2519         };
2520     }
2521
2522     @Test
2523     public void testFollowerInitialSyncStatus() throws Exception {
2524         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2525                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2526                 "testFollowerInitialSyncStatus");
2527
2528         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2529
2530         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2531
2532         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2533
2534         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2535
2536         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2537     }
2538
2539     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2540         modification.ready();
2541         store.validate(modification);
2542         store.commit(store.prepare(modification));
2543     }
2544
2545     @Test
2546     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2547         new ShardTestKit(getSystem()) {{
2548             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2549             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2550
2551             final MockDataChangeListener listener = new MockDataChangeListener(1);
2552             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2553                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2554
2555             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2556                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2557                     actorFactory.generateActorId(testName + "-shard"));
2558
2559             waitUntilNoLeader(shard);
2560
2561             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2562
2563             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2564             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2565                 RegisterChangeListenerReply.class);
2566             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2567
2568             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2569
2570             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2571
2572             listener.waitForChangeEvents();
2573         }};
2574     }
2575
2576     @Test
2577     public void testClusteredDataChangeListenerRegistration() throws Exception {
2578         new ShardTestKit(getSystem()) {{
2579             String testName = "testClusteredDataChangeListenerRegistration";
2580             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2581                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2582
2583             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2584                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2585
2586             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2587                     Shard.builder().id(followerShardID).
2588                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2589                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2590                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2591                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2592
2593             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2594                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2595                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2596                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2597                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2598
2599             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2600             String leaderPath = waitUntilLeader(followerShard);
2601             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2602
2603             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2604             final MockDataChangeListener listener = new MockDataChangeListener(1);
2605             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2606                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2607
2608             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2609             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2610                 RegisterChangeListenerReply.class);
2611             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2612
2613             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2614
2615             listener.waitForChangeEvents();
2616         }};
2617     }
2618
2619     @Test
2620     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2621         new ShardTestKit(getSystem()) {{
2622             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2623             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2624
2625             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2626             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2627                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2628
2629             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2630                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2631                     actorFactory.generateActorId(testName + "-shard"));
2632
2633             waitUntilNoLeader(shard);
2634
2635             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2636
2637             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2638             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2639                     RegisterDataTreeChangeListenerReply.class);
2640             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2641
2642             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2643
2644             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2645
2646             listener.waitForChangeEvents();
2647         }};
2648     }
2649
2650     @Test
2651     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2652         new ShardTestKit(getSystem()) {{
2653             String testName = "testClusteredDataTreeChangeListenerRegistration";
2654             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2655                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2656
2657             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2658                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2659
2660             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2661                     Shard.builder().id(followerShardID).
2662                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2663                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2664                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2665                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2666
2667             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2668                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2669                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2670                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2671                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2672
2673             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2674             String leaderPath = waitUntilLeader(followerShard);
2675             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2676
2677             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2678             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2679             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2680                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2681
2682             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2683             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2684                     RegisterDataTreeChangeListenerReply.class);
2685             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2686
2687             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2688
2689             listener.waitForChangeEvents();
2690         }};
2691     }
2692
2693     @Test
2694     public void testServerRemoved() throws Exception {
2695         final TestActorRef<MessageCollectorActor> parent = TestActorRef.create(getSystem(), MessageCollectorActor.props());
2696
2697         final ActorRef shard = parent.underlyingActor().context().actorOf(
2698                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2699                 "testServerRemoved");
2700
2701         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2702
2703         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2704
2705     }
2706
2707 }