sal-distributed-datastore: use lambdas
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.inOrder;
20 import static org.mockito.Mockito.mock;
21 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Stopwatch;
35 import com.google.common.base.Throwables;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import java.io.IOException;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Map;
41 import java.util.Set;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
63 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
75 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
76 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
77 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
78 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
80 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
81 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
89 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
90 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
91 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
92 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
93 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
94 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
95 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
97 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
98 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
99 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
100 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
101 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
102 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
103 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
104 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
105 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
106 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
107 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
108 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
109 import org.opendaylight.yangtools.concepts.Identifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
120 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
121 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
122 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
123 import scala.concurrent.Await;
124 import scala.concurrent.Future;
125 import scala.concurrent.duration.FiniteDuration;
126
127 public class ShardTest extends AbstractShardTest {
128     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
129             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
130
131     @Test
132     public void testRegisterChangeListener() throws Exception {
133         new ShardTestKit(getSystem()) {
134             {
135                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
136                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
137                         "testRegisterChangeListener");
138
139                 waitUntilLeader(shard);
140
141                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
142
143                 final MockDataChangeListener listener = new MockDataChangeListener(1);
144                 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
145                         TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
146
147                 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
148                         AsyncDataBroker.DataChangeScope.BASE, true), getRef());
149
150                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
151                         RegisterDataTreeNotificationListenerReply.class);
152                 final String replyPath = reply.getListenerRegistrationPath().toString();
153                 assertTrue("Incorrect reply path: " + replyPath,
154                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
155
156                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
157                 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
158
159                 listener.waitForChangeEvents(path);
160             }
161         };
162     }
163
164     @SuppressWarnings("serial")
165     @Test
166     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
167         // This test tests the timing window in which a change listener is registered before the
168         // shard becomes the leader. We verify that the listener is registered and notified of the
169         // existing data when the shard becomes the leader.
170         // For this test, we want to send the RegisterChangeListener message after the shard
171         // has recovered from persistence and before it becomes the leader. So we subclass
172         // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
173         // we know that the shard has been initialized to a follower and has started the
174         // election process. The following 2 CountDownLatches are used to coordinate the
175         // ElectionTimeout with the sending of the RegisterChangeListener message.
176         final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
177         final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
178         final Creator<Shard> creator = new Creator<Shard>() {
179             boolean firstElectionTimeout = true;
180
181             @Override
182             public Shard create() throws Exception {
183                 // Use a non persistent provider because this test actually invokes persist on the journal
184                 // this will cause all other messages to not be queued properly after that.
185                 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
186                 // it does do a persist)
187                 return new Shard(newShardBuilder()) {
188                     @Override
189                     public void handleCommand(final Object message) {
190                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
191                             // Got the first ElectionTimeout. We don't forward it to the
192                             // base Shard yet until we've sent the RegisterChangeListener
193                             // message. So we signal the onFirstElectionTimeout latch to tell
194                             // the main thread to send the RegisterChangeListener message and
195                             // start a thread to wait on the onChangeListenerRegistered latch,
196                             // which the main thread signals after it has sent the message.
197                             // After the onChangeListenerRegistered is triggered, we send the
198                             // original ElectionTimeout message to proceed with the election.
199                             firstElectionTimeout = false;
200                             final ActorRef self = getSelf();
201                             new Thread(() -> {
202                                 Uninterruptibles.awaitUninterruptibly(
203                                         onChangeListenerRegistered, 5, TimeUnit.SECONDS);
204                                 self.tell(message, self);
205                             }).start();
206
207                             onFirstElectionTimeout.countDown();
208                         } else {
209                             super.handleCommand(message);
210                         }
211                     }
212                 };
213             }
214         };
215
216         setupInMemorySnapshotStore();
217
218         final YangInstanceIdentifier path = TestModel.TEST_PATH;
219         final MockDataChangeListener listener = new MockDataChangeListener(1);
220         final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
221                 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
222
223         final TestActorRef<Shard> shard = actorFactory.createTestActor(
224                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
225                 "testRegisterChangeListenerWhenNotLeaderInitially");
226
227         new ShardTestKit(getSystem()) {
228             {
229                 // Wait until the shard receives the first ElectionTimeout
230                 // message.
231                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
232
233                 // Now send the RegisterChangeListener and wait for the reply.
234                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
235                         getRef());
236
237                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
238                         RegisterDataTreeNotificationListenerReply.class);
239                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
240
241                 // Sanity check - verify the shard is not the leader yet.
242                 shard.tell(FindLeader.INSTANCE, getRef());
243                 final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
244                 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
245
246                 // Signal the onChangeListenerRegistered latch to tell the
247                 // thread above to proceed
248                 // with the election process.
249                 onChangeListenerRegistered.countDown();
250
251                 // Wait for the shard to become the leader and notify our
252                 // listener with the existing
253                 // data in the store.
254                 listener.waitForChangeEvents(path);
255             }
256         };
257     }
258
259     @Test
260     public void testRegisterDataTreeChangeListener() throws Exception {
261         new ShardTestKit(getSystem()) {
262             {
263                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
264                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
265                         "testRegisterDataTreeChangeListener");
266
267                 waitUntilLeader(shard);
268
269                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
270
271                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
272                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
273                         TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
274
275                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
276
277                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
278                         RegisterDataTreeNotificationListenerReply.class);
279                 final String replyPath = reply.getListenerRegistrationPath().toString();
280                 assertTrue("Incorrect reply path: " + replyPath,
281                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
282
283                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
284                 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
285
286                 listener.waitForChangeEvents();
287             }
288         };
289     }
290
291     @SuppressWarnings("serial")
292     @Test
293     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
294         final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
295         final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
296         final Creator<Shard> creator = new Creator<Shard>() {
297             boolean firstElectionTimeout = true;
298
299             @Override
300             public Shard create() throws Exception {
301                 return new Shard(newShardBuilder()) {
302                     @Override
303                     public void handleCommand(final Object message) {
304                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
305                             firstElectionTimeout = false;
306                             final ActorRef self = getSelf();
307                             new Thread(() -> {
308                                 Uninterruptibles.awaitUninterruptibly(
309                                         onChangeListenerRegistered, 5, TimeUnit.SECONDS);
310                                 self.tell(message, self);
311                             }).start();
312
313                             onFirstElectionTimeout.countDown();
314                         } else {
315                             super.handleCommand(message);
316                         }
317                     }
318                 };
319             }
320         };
321
322         setupInMemorySnapshotStore();
323
324         final YangInstanceIdentifier path = TestModel.TEST_PATH;
325         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
326         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
327                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
328
329         final TestActorRef<Shard> shard = actorFactory.createTestActor(
330                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
331                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
332
333         new ShardTestKit(getSystem()) {
334             {
335                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
336
337                 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
338                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
339                         RegisterDataTreeNotificationListenerReply.class);
340                 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
341
342                 shard.tell(FindLeader.INSTANCE, getRef());
343                 final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
344                 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
345
346                 onChangeListenerRegistered.countDown();
347
348                 // TODO: investigate why we do not receive data chage events
349                 listener.waitForChangeEvents();
350             }
351         };
352     }
353
354     @Test
355     public void testCreateTransaction() {
356         new ShardTestKit(getSystem()) {
357             {
358                 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
359
360                 waitUntilLeader(shard);
361
362                 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
363
364                 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
365                         DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
366
367                 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
368                         CreateTransactionReply.class);
369
370                 final String path = reply.getTransactionPath().toString();
371                 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
372                         "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
373                             shardID.getShardName(), shardID.getMemberName().getName())));
374             }
375         };
376     }
377
378     @Test
379     public void testCreateTransactionOnChain() {
380         new ShardTestKit(getSystem()) {
381             {
382                 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
383
384                 waitUntilLeader(shard);
385
386                 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
387                         DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
388
389                 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
390                         CreateTransactionReply.class);
391
392                 final String path = reply.getTransactionPath().toString();
393                 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
394                         "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
395                         shardID.getShardName(), shardID.getMemberName().getName())));
396             }
397         };
398     }
399
400     @Test
401     public void testPeerAddressResolved() throws Exception {
402         new ShardTestKit(getSystem()) {
403             {
404                 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
405                         "config");
406                 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
407                         .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
408                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
409
410                 final String address = "akka://foobar";
411                 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
412
413                 shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
414                 final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
415                 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
416             }
417         };
418     }
419
420     @Test
421     public void testApplySnapshot() throws Exception {
422
423         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
424                 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
425
426         ShardTestKit.waitUntilLeader(shard);
427
428         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
429         store.setSchemaContext(SCHEMA_CONTEXT);
430
431         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
432                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
433                     .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
434                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
435
436         writeToStore(store, TestModel.TEST_PATH, container);
437
438         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
439         final NormalizedNode<?,?> expected = readStore(store, root);
440
441         final Snapshot snapshot = Snapshot.create(
442                 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
443                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4, -1, null, null);
444
445         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
446
447         final Stopwatch sw = Stopwatch.createStarted();
448         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
449             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
450
451             try {
452                 assertEquals("Root node", expected, readStore(shard, root));
453                 return;
454             } catch (final AssertionError e) {
455                 // try again
456             }
457         }
458
459         fail("Snapshot was not applied");
460     }
461
462     @Test
463     public void testApplyState() throws Exception {
464         final TestActorRef<Shard> shard = actorFactory.createTestActor(
465                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
466
467         ShardTestKit.waitUntilLeader(shard);
468
469         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
470         store.setSchemaContext(SCHEMA_CONTEXT);
471
472         final DataTreeModification writeMod = store.takeSnapshot().newModification();
473         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
474         writeMod.write(TestModel.TEST_PATH, node);
475         writeMod.ready();
476
477         final TransactionIdentifier tx = nextTransactionId();
478         shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
479
480         final Stopwatch sw = Stopwatch.createStarted();
481         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
482             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
483
484             final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
485             if (actual != null) {
486                 assertEquals("Applied state", node, actual);
487                 return;
488             }
489         }
490
491         fail("State was not applied");
492     }
493
494     @Test
495     public void testDataTreeCandidateRecovery() throws Exception {
496         // Set up the InMemorySnapshotStore.
497         final DataTree source = setupInMemorySnapshotStore();
498
499         final DataTreeModification writeMod = source.takeSnapshot().newModification();
500         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
501         writeMod.ready();
502         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
503
504         // Set up the InMemoryJournal.
505         InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
506             payloadForModification(source, writeMod, nextTransactionId())));
507
508         final int nListEntries = 16;
509         final Set<Integer> listEntryKeys = new HashSet<>();
510
511         // Add some ModificationPayload entries
512         for (int i = 1; i <= nListEntries; i++) {
513             listEntryKeys.add(Integer.valueOf(i));
514
515             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
516                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
517
518             final DataTreeModification mod = source.takeSnapshot().newModification();
519             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
520             mod.ready();
521
522             InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
523                 payloadForModification(source, mod, nextTransactionId())));
524         }
525
526         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
527             new ApplyJournalEntries(nListEntries));
528
529         testRecovery(listEntryKeys);
530     }
531
532     @Test
533     @SuppressWarnings("checkstyle:IllegalCatch")
534     public void testConcurrentThreePhaseCommits() throws Exception {
535         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
536         final CountDownLatch commitLatch = new CountDownLatch(2);
537
538         final long timeoutSec = 5;
539         final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
540         final Timeout timeout = new Timeout(duration);
541
542         final TestActorRef<Shard> shard = actorFactory.createTestActor(
543                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
544                 "testConcurrentThreePhaseCommits");
545
546         class OnFutureComplete extends OnComplete<Object> {
547             private final Class<?> expRespType;
548
549             OnFutureComplete(final Class<?> expRespType) {
550                 this.expRespType = expRespType;
551             }
552
553             @Override
554             public void onComplete(final Throwable error, final Object resp) {
555                 if (error != null) {
556                     caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
557                 } else {
558                     try {
559                         assertEquals("Commit response type", expRespType, resp.getClass());
560                         onSuccess(resp);
561                     } catch (final Exception e) {
562                         caughtEx.set(e);
563                     }
564                 }
565             }
566
567             void onSuccess(final Object resp) throws Exception {
568             }
569         }
570
571         class OnCommitFutureComplete extends OnFutureComplete {
572             OnCommitFutureComplete() {
573                 super(CommitTransactionReply.class);
574             }
575
576             @Override
577             public void onComplete(final Throwable error, final Object resp) {
578                 super.onComplete(error, resp);
579                 commitLatch.countDown();
580             }
581         }
582
583         class OnCanCommitFutureComplete extends OnFutureComplete {
584             private final TransactionIdentifier transactionID;
585
586             OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
587                 super(CanCommitTransactionReply.class);
588                 this.transactionID = transactionID;
589             }
590
591             @Override
592             void onSuccess(final Object resp) throws Exception {
593                 final CanCommitTransactionReply canCommitReply =
594                         CanCommitTransactionReply.fromSerializable(resp);
595                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
596
597                 final Future<Object> commitFuture = Patterns.ask(shard,
598                         new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
599                 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
600             }
601         }
602
603         new ShardTestKit(getSystem()) {
604             {
605                 waitUntilLeader(shard);
606
607                 final TransactionIdentifier transactionID1 = nextTransactionId();
608                 final TransactionIdentifier transactionID2 = nextTransactionId();
609                 final TransactionIdentifier transactionID3 = nextTransactionId();
610
611                 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
612                         shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
613                 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
614                 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
615                 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
616
617                 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
618                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
619                 final ReadyTransactionReply readyReply = ReadyTransactionReply
620                         .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
621                 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
622                 // Send the CanCommitTransaction message for the first Tx.
623
624                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
625                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
626                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
627                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
628
629                 // Ready 2 more Tx's.
630
631                 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
632                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
633                 expectMsgClass(duration, ReadyTransactionReply.class);
634
635                 shard.tell(
636                         prepareBatchedModifications(transactionID3,
637                                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
638                                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
639                                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
640                         getRef());
641                 expectMsgClass(duration, ReadyTransactionReply.class);
642
643                 // Send the CanCommitTransaction message for the next 2 Tx's.
644                 // These should get queued and
645                 // processed after the first Tx completes.
646
647                 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
648                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
649
650                 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
651                         new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
652
653                 // Send the CommitTransaction message for the first Tx. After it
654                 // completes, it should
655                 // trigger the 2nd Tx to proceed which should in turn then
656                 // trigger the 3rd.
657
658                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
659                 expectMsgClass(duration, CommitTransactionReply.class);
660
661                 // Wait for the next 2 Tx's to complete.
662
663                 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
664
665                 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
666
667                 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
668
669                 if (caughtEx.get() != null) {
670                     Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
671                     Throwables.propagate(caughtEx.get());
672                 }
673
674                 assertEquals("Commits complete", true, done);
675
676 //                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
677 //                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
678 //                        cohort3.getPreCommit(), cohort3.getCommit());
679 //                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
680 //                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
681 //                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
682 //                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
683 //                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
684 //                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
685 //                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
686 //                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
687 //                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
688
689                 // Verify data in the data store.
690
691                 verifyOuterListEntry(shard, 1);
692
693                 verifyLastApplied(shard, 2);
694             }
695         };
696     }
697
698     @Test
699     public void testBatchedModificationsWithNoCommitOnReady() throws Exception {
700         new ShardTestKit(getSystem()) {
701             {
702                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
703                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
704                         "testBatchedModificationsWithNoCommitOnReady");
705
706                 waitUntilLeader(shard);
707
708                 final TransactionIdentifier transactionID = nextTransactionId();
709                 final FiniteDuration duration = duration("5 seconds");
710
711                 // Send a BatchedModifications to start a transaction.
712
713                 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
714                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
715                 expectMsgClass(duration, BatchedModificationsReply.class);
716
717                 // Send a couple more BatchedModifications.
718
719                 shard.tell(
720                         newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
721                                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
722                         getRef());
723                 expectMsgClass(duration, BatchedModificationsReply.class);
724
725                 shard.tell(newBatchedModifications(transactionID,
726                         YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
727                                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
728                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
729                         getRef());
730                 expectMsgClass(duration, ReadyTransactionReply.class);
731
732                 // Send the CanCommitTransaction message.
733
734                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
735                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
736                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
737                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
738
739                 // Send the CommitTransaction message.
740
741                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
742                 expectMsgClass(duration, CommitTransactionReply.class);
743
744                 // Verify data in the data store.
745
746                 verifyOuterListEntry(shard, 1);
747             }
748         };
749     }
750
751     @Test
752     public void testBatchedModificationsWithCommitOnReady() throws Exception {
753         new ShardTestKit(getSystem()) {
754             {
755                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
756                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
757                         "testBatchedModificationsWithCommitOnReady");
758
759                 waitUntilLeader(shard);
760
761                 final TransactionIdentifier transactionID = nextTransactionId();
762                 final FiniteDuration duration = duration("5 seconds");
763
764                 // Send a BatchedModifications to start a transaction.
765
766                 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
767                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
768                 expectMsgClass(duration, BatchedModificationsReply.class);
769
770                 // Send a couple more BatchedModifications.
771
772                 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
773                                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
774                         getRef());
775                 expectMsgClass(duration, BatchedModificationsReply.class);
776
777                 shard.tell(newBatchedModifications(transactionID,
778                         YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
779                                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
780                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
781                         getRef());
782
783                 expectMsgClass(duration, CommitTransactionReply.class);
784
785                 // Verify data in the data store.
786
787                 verifyOuterListEntry(shard, 1);
788             }
789         };
790     }
791
792     @Test(expected = IllegalStateException.class)
793     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
794         new ShardTestKit(getSystem()) {
795             {
796                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
797                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
798                         "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
799
800                 waitUntilLeader(shard);
801
802                 final TransactionIdentifier transactionID = nextTransactionId();
803                 final BatchedModifications batched = new BatchedModifications(transactionID,
804                         DataStoreVersions.CURRENT_VERSION);
805                 batched.setReady(true);
806                 batched.setTotalMessagesSent(2);
807
808                 shard.tell(batched, getRef());
809
810                 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
811
812                 if (failure != null) {
813                     Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
814                     Throwables.propagate(failure.cause());
815                 }
816             }
817         };
818     }
819
820     @Test
821     public void testBatchedModificationsWithOperationFailure() throws Exception {
822         new ShardTestKit(getSystem()) {
823             {
824                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
825                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
826                         "testBatchedModificationsWithOperationFailure");
827
828                 waitUntilLeader(shard);
829
830                 // Test merge with invalid data. An exception should occur when
831                 // the merge is applied. Note that
832                 // write will not validate the children for performance reasons.
833
834                 final TransactionIdentifier transactionID = nextTransactionId();
835
836                 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
837                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
838                         .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
839
840                 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
841                 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
842                 shard.tell(batched, getRef());
843                 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
844
845                 final Throwable cause = failure.cause();
846
847                 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
848                 batched.setReady(true);
849                 batched.setTotalMessagesSent(2);
850
851                 shard.tell(batched, getRef());
852
853                 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
854                 assertEquals("Failure cause", cause, failure.cause());
855             }
856         };
857     }
858
859     @Test
860     public void testBatchedModificationsOnTransactionChain() throws Exception {
861         new ShardTestKit(getSystem()) {
862             {
863                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
864                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
865                         "testBatchedModificationsOnTransactionChain");
866
867                 waitUntilLeader(shard);
868
869                 final LocalHistoryIdentifier historyId = nextHistoryId();
870                 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
871                 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
872
873                 final FiniteDuration duration = duration("5 seconds");
874
875                 // Send a BatchedModifications to start a chained write
876                 // transaction and ready it.
877
878                 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
879                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
880                 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
881                 expectMsgClass(duration, ReadyTransactionReply.class);
882
883                 // Create a read Tx on the same chain.
884
885                 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
886                         DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
887
888                 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"),
889                         CreateTransactionReply.class);
890
891                 getSystem().actorSelection(createReply.getTransactionPath())
892                         .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
893                 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
894                 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
895
896                 // Commit the write transaction.
897
898                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
899                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
900                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
901                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
902
903                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
904                 expectMsgClass(duration, CommitTransactionReply.class);
905
906                 // Verify data in the data store.
907
908                 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
909                 assertEquals("Stored node", containerNode, actualNode);
910             }
911         };
912     }
913
914     @Test
915     public void testOnBatchedModificationsWhenNotLeader() {
916         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
917         new ShardTestKit(getSystem()) {
918             {
919                 final Creator<Shard> creator = new Creator<Shard>() {
920                     private static final long serialVersionUID = 1L;
921
922                     @Override
923                     public Shard create() throws Exception {
924                         return new Shard(newShardBuilder()) {
925                             @Override
926                             protected boolean isLeader() {
927                                 return overrideLeaderCalls.get() ? false : super.isLeader();
928                             }
929
930                             @Override
931                             public ActorSelection getLeader() {
932                                 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path())
933                                         : super.getLeader();
934                             }
935                         };
936                     }
937                 };
938
939                 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
940                         .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
941                         "testOnBatchedModificationsWhenNotLeader");
942
943                 waitUntilLeader(shard);
944
945                 overrideLeaderCalls.set(true);
946
947                 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
948                         DataStoreVersions.CURRENT_VERSION);
949
950                 shard.tell(batched, ActorRef.noSender());
951
952                 expectMsgEquals(batched);
953             }
954         };
955     }
956
957     @Test
958     public void testTransactionMessagesWithNoLeader() {
959         new ShardTestKit(getSystem()) {
960             {
961                 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
962                         .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
963                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
964                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
965                         "testTransactionMessagesWithNoLeader");
966
967                 waitUntilNoLeader(shard);
968
969                 final TransactionIdentifier txId = nextTransactionId();
970                 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
971                 Failure failure = expectMsgClass(Failure.class);
972                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
973
974                 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
975                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
976                 failure = expectMsgClass(Failure.class);
977                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
978
979                 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
980                 failure = expectMsgClass(Failure.class);
981                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
982             }
983         };
984     }
985
986     @Test
987     public void testReadyWithReadWriteImmediateCommit() throws Exception {
988         testReadyWithImmediateCommit(true);
989     }
990
991     @Test
992     public void testReadyWithWriteOnlyImmediateCommit() throws Exception {
993         testReadyWithImmediateCommit(false);
994     }
995
996     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception {
997         new ShardTestKit(getSystem()) {
998             {
999                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1000                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1001                         "testReadyWithImmediateCommit-" + readWrite);
1002
1003                 waitUntilLeader(shard);
1004
1005                 final TransactionIdentifier transactionID = nextTransactionId();
1006                 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1007                 if (readWrite) {
1008                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1009                             containerNode, true), getRef());
1010                 } else {
1011                     shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
1012                             getRef());
1013                 }
1014
1015                 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
1016
1017                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1018                 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1019             }
1020         };
1021     }
1022
1023     @Test
1024     public void testReadyLocalTransactionWithImmediateCommit() throws Exception {
1025         new ShardTestKit(getSystem()) {
1026             {
1027                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1028                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1029                         "testReadyLocalTransactionWithImmediateCommit");
1030
1031                 waitUntilLeader(shard);
1032
1033                 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1034
1035                 final DataTreeModification modification = dataStore.newModification();
1036
1037                 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1038                 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1039                 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1040                 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1041
1042                 final TransactionIdentifier txId = nextTransactionId();
1043                 modification.ready();
1044                 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1045
1046                 shard.tell(readyMessage, getRef());
1047
1048                 expectMsgClass(CommitTransactionReply.class);
1049
1050                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1051                 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1052             }
1053         };
1054     }
1055
1056     @Test
1057     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception {
1058         new ShardTestKit(getSystem()) {
1059             {
1060                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1061                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1062                         "testReadyLocalTransactionWithThreePhaseCommit");
1063
1064                 waitUntilLeader(shard);
1065
1066                 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1067
1068                 final DataTreeModification modification = dataStore.newModification();
1069
1070                 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1071                 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1072                 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1073                 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1074
1075                 final TransactionIdentifier txId = nextTransactionId();
1076                 modification.ready();
1077                 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1078
1079                 shard.tell(readyMessage, getRef());
1080
1081                 expectMsgClass(ReadyTransactionReply.class);
1082
1083                 // Send the CanCommitTransaction message.
1084
1085                 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1086                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1087                         .fromSerializable(expectMsgClass(CanCommitTransactionReply.class));
1088                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1089
1090                 // Send the CanCommitTransaction message.
1091
1092                 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1093                 expectMsgClass(CommitTransactionReply.class);
1094
1095                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1096                 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1097             }
1098         };
1099     }
1100
1101     @Test
1102     public void testReadWriteCommitWithPersistenceDisabled() throws Exception {
1103         dataStoreContextBuilder.persistent(false);
1104         new ShardTestKit(getSystem()) {
1105             {
1106                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1107                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1108                         "testCommitWithPersistenceDisabled");
1109
1110                 waitUntilLeader(shard);
1111
1112                 // Setup a simulated transactions with a mock cohort.
1113
1114                 final FiniteDuration duration = duration("5 seconds");
1115
1116                 final TransactionIdentifier transactionID = nextTransactionId();
1117                 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1118                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
1119                         getRef());
1120                 expectMsgClass(duration, ReadyTransactionReply.class);
1121
1122                 // Send the CanCommitTransaction message.
1123
1124                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1125                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1126                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1127                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1128
1129                 // Send the CanCommitTransaction message.
1130
1131                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1132                 expectMsgClass(duration, CommitTransactionReply.class);
1133
1134                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1135                 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1136             }
1137         };
1138     }
1139
1140     @Test
1141     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
1142         testCommitWhenTransactionHasModifications(true);
1143     }
1144
1145     @Test
1146     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
1147         testCommitWhenTransactionHasModifications(false);
1148     }
1149
1150     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
1151         new ShardTestKit(getSystem()) {
1152             {
1153                 final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1154                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1155                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1156                         "testCommitWhenTransactionHasModifications-" + readWrite);
1157
1158                 waitUntilLeader(shard);
1159
1160                 final FiniteDuration duration = duration("5 seconds");
1161                 final TransactionIdentifier transactionID = nextTransactionId();
1162
1163                 if (readWrite) {
1164                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1165                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1166                 } else {
1167                     shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1168                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1169                 }
1170
1171                 expectMsgClass(duration, ReadyTransactionReply.class);
1172
1173                 // Send the CanCommitTransaction message.
1174
1175                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1176                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1177                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1178                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1179
1180                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1181                 expectMsgClass(duration, CommitTransactionReply.class);
1182
1183                 final InOrder inOrder = inOrder(dataTree);
1184                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1185                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1186                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1187
1188                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1189                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1190
1191                 // Use MBean for verification
1192                 // Committed transaction count should increase as usual
1193                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1194
1195                 // Commit index should advance as we do not have an empty
1196                 // modification
1197                 assertEquals(0, shardStats.getCommitIndex());
1198             }
1199         };
1200     }
1201
1202     @Test
1203     public void testCommitPhaseFailure() throws Exception {
1204         new ShardTestKit(getSystem()) {
1205             {
1206                 final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1207                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1208                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1209                         "testCommitPhaseFailure");
1210
1211                 waitUntilLeader(shard);
1212
1213                 final FiniteDuration duration = duration("5 seconds");
1214                 final Timeout timeout = new Timeout(duration);
1215
1216                 // Setup 2 simulated transactions with mock cohorts. The first
1217                 // one fails in the
1218                 // commit phase.
1219
1220                 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1221                         .commit(any(DataTreeCandidate.class));
1222
1223                 final TransactionIdentifier transactionID1 = nextTransactionId();
1224                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1225                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1226                 expectMsgClass(duration, ReadyTransactionReply.class);
1227
1228                 final TransactionIdentifier transactionID2 = nextTransactionId();
1229                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1230                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1231                 expectMsgClass(duration, ReadyTransactionReply.class);
1232
1233                 // Send the CanCommitTransaction message for the first Tx.
1234
1235                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1236                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1237                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1238                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1239
1240                 // Send the CanCommitTransaction message for the 2nd Tx. This
1241                 // should get queued and
1242                 // processed after the first Tx completes.
1243
1244                 final Future<Object> canCommitFuture = Patterns.ask(shard,
1245                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1246
1247                 // Send the CommitTransaction message for the first Tx. This
1248                 // should send back an error
1249                 // and trigger the 2nd Tx to proceed.
1250
1251                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1252                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1253
1254                 // Wait for the 2nd Tx to complete the canCommit phase.
1255
1256                 final CountDownLatch latch = new CountDownLatch(1);
1257                 canCommitFuture.onComplete(new OnComplete<Object>() {
1258                     @Override
1259                     public void onComplete(final Throwable failure, final Object resp) {
1260                         latch.countDown();
1261                     }
1262                 }, getSystem().dispatcher());
1263
1264                 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1265
1266                 final InOrder inOrder = inOrder(dataTree);
1267                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1268                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1269
1270                 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1271                 //        validate performs wrapping and we capture that mock
1272                 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1273
1274                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1275             }
1276         };
1277     }
1278
1279     @Test
1280     public void testPreCommitPhaseFailure() throws Exception {
1281         new ShardTestKit(getSystem()) {
1282             {
1283                 final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1284                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1285                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1286                         "testPreCommitPhaseFailure");
1287
1288                 waitUntilLeader(shard);
1289
1290                 final FiniteDuration duration = duration("5 seconds");
1291                 final Timeout timeout = new Timeout(duration);
1292
1293                 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1294                         .prepare(any(DataTreeModification.class));
1295
1296                 final TransactionIdentifier transactionID1 = nextTransactionId();
1297                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1298                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1299                 expectMsgClass(duration, ReadyTransactionReply.class);
1300
1301                 final TransactionIdentifier transactionID2 = nextTransactionId();
1302                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1303                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1304                 expectMsgClass(duration, ReadyTransactionReply.class);
1305
1306                 // Send the CanCommitTransaction message for the first Tx.
1307
1308                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1309                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1310                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1311                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1312
1313                 // Send the CanCommitTransaction message for the 2nd Tx. This
1314                 // should get queued and
1315                 // processed after the first Tx completes.
1316
1317                 final Future<Object> canCommitFuture = Patterns.ask(shard,
1318                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1319
1320                 // Send the CommitTransaction message for the first Tx. This
1321                 // should send back an error
1322                 // and trigger the 2nd Tx to proceed.
1323
1324                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1325                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1326
1327                 // Wait for the 2nd Tx to complete the canCommit phase.
1328
1329                 final CountDownLatch latch = new CountDownLatch(1);
1330                 canCommitFuture.onComplete(new OnComplete<Object>() {
1331                     @Override
1332                     public void onComplete(final Throwable failure, final Object resp) {
1333                         latch.countDown();
1334                     }
1335                 }, getSystem().dispatcher());
1336
1337                 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1338
1339                 final InOrder inOrder = inOrder(dataTree);
1340                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1341                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1342                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1343             }
1344         };
1345     }
1346
1347     @Test
1348     public void testCanCommitPhaseFailure() throws Exception {
1349         new ShardTestKit(getSystem()) {
1350             {
1351                 final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1352                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1353                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1354                         "testCanCommitPhaseFailure");
1355
1356                 waitUntilLeader(shard);
1357
1358                 final FiniteDuration duration = duration("5 seconds");
1359                 final TransactionIdentifier transactionID1 = nextTransactionId();
1360
1361                 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1362                         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1363
1364                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1365                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1366                 expectMsgClass(duration, ReadyTransactionReply.class);
1367
1368                 // Send the CanCommitTransaction message.
1369
1370                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1371                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1372
1373                 // Send another can commit to ensure the failed one got cleaned
1374                 // up.
1375
1376                 final TransactionIdentifier transactionID2 = nextTransactionId();
1377                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1378                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1379                 expectMsgClass(duration, ReadyTransactionReply.class);
1380
1381                 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1382                 final CanCommitTransactionReply reply = CanCommitTransactionReply
1383                         .fromSerializable(expectMsgClass(CanCommitTransactionReply.class));
1384                 assertEquals("getCanCommit", true, reply.getCanCommit());
1385             }
1386         };
1387     }
1388
1389     @Test
1390     public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1391         testImmediateCommitWithCanCommitPhaseFailure(true);
1392         testImmediateCommitWithCanCommitPhaseFailure(false);
1393     }
1394
1395     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1396         new ShardTestKit(getSystem()) {
1397             {
1398                 final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1399                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1400                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1401                         "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1402
1403                 waitUntilLeader(shard);
1404
1405                 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1406                         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1407
1408                 final FiniteDuration duration = duration("5 seconds");
1409
1410                 final TransactionIdentifier transactionID1 = nextTransactionId();
1411
1412                 if (readWrite) {
1413                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1414                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1415                 } else {
1416                     shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1417                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1418                 }
1419
1420                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1421
1422                 // Send another can commit to ensure the failed one got cleaned
1423                 // up.
1424
1425                 final TransactionIdentifier transactionID2 = nextTransactionId();
1426                 if (readWrite) {
1427                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1428                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1429                 } else {
1430                     shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1431                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1432                 }
1433
1434                 expectMsgClass(duration, CommitTransactionReply.class);
1435             }
1436         };
1437     }
1438
1439     @Test
1440     public void testAbortWithCommitPending() throws Exception {
1441         new ShardTestKit(getSystem()) {
1442             {
1443                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1444                     @Override
1445                     void persistPayload(final Identifier id, final Payload payload,
1446                             final boolean batchHint) {
1447                         // Simulate an AbortTransaction message occurring during
1448                         // replication, after
1449                         // persisting and before finishing the commit to the
1450                         // in-memory store.
1451
1452                         doAbortTransaction(id, null);
1453                         super.persistPayload(id, payload, batchHint);
1454                     }
1455                 };
1456
1457                 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1458                         .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1459                         "testAbortWithCommitPending");
1460
1461                 waitUntilLeader(shard);
1462
1463                 final FiniteDuration duration = duration("5 seconds");
1464
1465                 final TransactionIdentifier transactionID = nextTransactionId();
1466
1467                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1468                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1469                 expectMsgClass(duration, ReadyTransactionReply.class);
1470
1471                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1472                 expectMsgClass(duration, CanCommitTransactionReply.class);
1473
1474                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1475                 expectMsgClass(duration, CommitTransactionReply.class);
1476
1477                 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1478
1479                 // Since we're simulating an abort occurring during replication
1480                 // and before finish commit,
1481                 // the data should still get written to the in-memory store
1482                 // since we've gotten past
1483                 // canCommit and preCommit and persisted the data.
1484                 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1485             }
1486         };
1487     }
1488
1489     @Test
1490     public void testTransactionCommitTimeout() throws Exception {
1491         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1492         new ShardTestKit(getSystem()) {
1493             {
1494                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1495                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1496                         "testTransactionCommitTimeout");
1497
1498                 waitUntilLeader(shard);
1499
1500                 final FiniteDuration duration = duration("5 seconds");
1501
1502                 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1503                 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1504                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1505
1506                 // Ready 2 Tx's - the first will timeout
1507
1508                 final TransactionIdentifier transactionID1 = nextTransactionId();
1509                 shard.tell(
1510                         prepareBatchedModifications(transactionID1,
1511                                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1512                                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1513                                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1514                         getRef());
1515                 expectMsgClass(duration, ReadyTransactionReply.class);
1516
1517                 final TransactionIdentifier transactionID2 = nextTransactionId();
1518                 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1519                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1520                 shard.tell(
1521                         prepareBatchedModifications(transactionID2, listNodePath,
1522                                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false),
1523                         getRef());
1524                 expectMsgClass(duration, ReadyTransactionReply.class);
1525
1526                 // canCommit 1st Tx. We don't send the commit so it should
1527                 // timeout.
1528
1529                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1530                 expectMsgClass(duration, CanCommitTransactionReply.class);
1531
1532                 // canCommit the 2nd Tx - it should complete after the 1st Tx
1533                 // times out.
1534
1535                 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1536                 expectMsgClass(duration, CanCommitTransactionReply.class);
1537
1538                 // Try to commit the 1st Tx - should fail as it's not the
1539                 // current Tx.
1540
1541                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1542                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1543
1544                 // Commit the 2nd Tx.
1545
1546                 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1547                 expectMsgClass(duration, CommitTransactionReply.class);
1548
1549                 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1550                 assertNotNull(listNodePath + " not found", node);
1551             }
1552         };
1553     }
1554
1555 //    @Test
1556 //    @Ignore
1557 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1558 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1559 //
1560 //        new ShardTestKit(getSystem()) {{
1561 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1562 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1563 //                    "testTransactionCommitQueueCapacityExceeded");
1564 //
1565 //            waitUntilLeader(shard);
1566 //
1567 //            final FiniteDuration duration = duration("5 seconds");
1568 //
1569 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1570 //
1571 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1572 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1573 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1574 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1575 //                    modification1);
1576 //
1577 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1578 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1579 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1580 //                    TestModel.OUTER_LIST_PATH,
1581 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1582 //                    modification2);
1583 //
1584 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1585 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1586 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1587 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1588 //                    modification3);
1589 //
1590 //            // Ready the Tx's
1591 //
1592 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1593 //                    modification1), getRef());
1594 //            expectMsgClass(duration, ReadyTransactionReply.class);
1595 //
1596 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1597 //                    modification2), getRef());
1598 //            expectMsgClass(duration, ReadyTransactionReply.class);
1599 //
1600 //            // The 3rd Tx should exceed queue capacity and fail.
1601 //
1602 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1603 //                    modification3), getRef());
1604 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1605 //
1606 //            // canCommit 1st Tx.
1607 //
1608 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1609 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1610 //
1611 //            // canCommit the 2nd Tx - it should get queued.
1612 //
1613 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1614 //
1615 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1616 //
1617 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1618 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1619 //        }};
1620 //    }
1621
1622     @Test
1623     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Exception {
1624         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1625         new ShardTestKit(getSystem()) {
1626             {
1627                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1628                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1629                         "testTransactionCommitWithPriorExpiredCohortEntries");
1630
1631                 waitUntilLeader(shard);
1632
1633                 final FiniteDuration duration = duration("5 seconds");
1634
1635                 final TransactionIdentifier transactionID1 = nextTransactionId();
1636                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1637                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1638                 expectMsgClass(duration, ReadyTransactionReply.class);
1639
1640                 final TransactionIdentifier transactionID2 = nextTransactionId();
1641                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1642                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1643                 expectMsgClass(duration, ReadyTransactionReply.class);
1644
1645                 final TransactionIdentifier transactionID3 = nextTransactionId();
1646                 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1647                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1648                 expectMsgClass(duration, ReadyTransactionReply.class);
1649
1650                 // All Tx's are readied. We'll send canCommit for the last one
1651                 // but not the others. The others
1652                 // should expire from the queue and the last one should be
1653                 // processed.
1654
1655                 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1656                 expectMsgClass(duration, CanCommitTransactionReply.class);
1657             }
1658         };
1659     }
1660
1661     @Test
1662     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Exception {
1663         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1664         new ShardTestKit(getSystem()) {
1665             {
1666                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1667                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1668                         "testTransactionCommitWithSubsequentExpiredCohortEntry");
1669
1670                 waitUntilLeader(shard);
1671
1672                 final FiniteDuration duration = duration("5 seconds");
1673
1674                 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1675
1676                 final TransactionIdentifier transactionID1 = nextTransactionId();
1677                 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1678                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1679                 expectMsgClass(duration, ReadyTransactionReply.class);
1680
1681                 // CanCommit the first Tx so it's the current in-progress Tx.
1682
1683                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1684                 expectMsgClass(duration, CanCommitTransactionReply.class);
1685
1686                 // Ready the second Tx.
1687
1688                 final TransactionIdentifier transactionID2 = nextTransactionId();
1689                 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1690                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1691                 expectMsgClass(duration, ReadyTransactionReply.class);
1692
1693                 // Ready the third Tx.
1694
1695                 final TransactionIdentifier transactionID3 = nextTransactionId();
1696                 final DataTreeModification modification3 = dataStore.newModification();
1697                 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1698                         .apply(modification3);
1699                 modification3.ready();
1700                 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1701                         true);
1702                 shard.tell(readyMessage, getRef());
1703
1704                 // Commit the first Tx. After completing, the second should
1705                 // expire from the queue and the third
1706                 // Tx committed.
1707
1708                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1709                 expectMsgClass(duration, CommitTransactionReply.class);
1710
1711                 // Expect commit reply from the third Tx.
1712
1713                 expectMsgClass(duration, CommitTransactionReply.class);
1714
1715                 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1716                 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1717             }
1718         };
1719     }
1720
1721     @Test
1722     public void testCanCommitBeforeReadyFailure() throws Exception {
1723         new ShardTestKit(getSystem()) {
1724             {
1725                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1726                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1727                         "testCanCommitBeforeReadyFailure");
1728
1729                 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1730                 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1731             }
1732         };
1733     }
1734
1735     @Test
1736     public void testAbortAfterCanCommit() throws Exception {
1737         new ShardTestKit(getSystem()) {
1738             {
1739                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1740                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1741
1742                 waitUntilLeader(shard);
1743
1744                 final FiniteDuration duration = duration("5 seconds");
1745                 final Timeout timeout = new Timeout(duration);
1746
1747                 // Ready 2 transactions - the first one will be aborted.
1748
1749                 final TransactionIdentifier transactionID1 = nextTransactionId();
1750                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1751                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1752                 expectMsgClass(duration, ReadyTransactionReply.class);
1753
1754                 final TransactionIdentifier transactionID2 = nextTransactionId();
1755                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1756                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1757                 expectMsgClass(duration, ReadyTransactionReply.class);
1758
1759                 // Send the CanCommitTransaction message for the first Tx.
1760
1761                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1762                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1763                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1764                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1765
1766                 // Send the CanCommitTransaction message for the 2nd Tx. This
1767                 // should get queued and
1768                 // processed after the first Tx completes.
1769
1770                 final Future<Object> canCommitFuture = Patterns.ask(shard,
1771                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1772
1773                 // Send the AbortTransaction message for the first Tx. This
1774                 // should trigger the 2nd
1775                 // Tx to proceed.
1776
1777                 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1778                 expectMsgClass(duration, AbortTransactionReply.class);
1779
1780                 // Wait for the 2nd Tx to complete the canCommit phase.
1781
1782                 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1783                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1784             }
1785         };
1786     }
1787
1788     @Test
1789     public void testAbortAfterReady() throws Exception {
1790         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1791         new ShardTestKit(getSystem()) {
1792             {
1793                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1794                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1795
1796                 waitUntilLeader(shard);
1797
1798                 final FiniteDuration duration = duration("5 seconds");
1799
1800                 // Ready a tx.
1801
1802                 final TransactionIdentifier transactionID1 = nextTransactionId();
1803                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1804                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1805                 expectMsgClass(duration, ReadyTransactionReply.class);
1806
1807                 // Send the AbortTransaction message.
1808
1809                 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1810                 expectMsgClass(duration, AbortTransactionReply.class);
1811
1812                 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1813
1814                 // Now send CanCommitTransaction - should fail.
1815
1816                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1817                 final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1818                 assertTrue("Failure type", failure instanceof IllegalStateException);
1819
1820                 // Ready and CanCommit another and verify success.
1821
1822                 final TransactionIdentifier transactionID2 = nextTransactionId();
1823                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1824                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1825                 expectMsgClass(duration, ReadyTransactionReply.class);
1826
1827                 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1828                 expectMsgClass(duration, CanCommitTransactionReply.class);
1829             }
1830         };
1831     }
1832
1833     @Test
1834     public void testAbortQueuedTransaction() throws Exception {
1835         new ShardTestKit(getSystem()) {
1836             {
1837                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1838                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1839
1840                 waitUntilLeader(shard);
1841
1842                 final FiniteDuration duration = duration("5 seconds");
1843
1844                 // Ready 3 tx's.
1845
1846                 final TransactionIdentifier transactionID1 = nextTransactionId();
1847                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1848                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1849                 expectMsgClass(duration, ReadyTransactionReply.class);
1850
1851                 final TransactionIdentifier transactionID2 = nextTransactionId();
1852                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1853                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1854                 expectMsgClass(duration, ReadyTransactionReply.class);
1855
1856                 final TransactionIdentifier transactionID3 = nextTransactionId();
1857                 shard.tell(
1858                         newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1859                                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1),
1860                         getRef());
1861                 expectMsgClass(duration, ReadyTransactionReply.class);
1862
1863                 // Abort the second tx while it's queued.
1864
1865                 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1866                 expectMsgClass(duration, AbortTransactionReply.class);
1867
1868                 // Commit the other 2.
1869
1870                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1871                 expectMsgClass(duration, CanCommitTransactionReply.class);
1872
1873                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1874                 expectMsgClass(duration, CommitTransactionReply.class);
1875
1876                 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1877                 expectMsgClass(duration, CanCommitTransactionReply.class);
1878
1879                 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1880                 expectMsgClass(duration, CommitTransactionReply.class);
1881
1882                 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1883             }
1884         };
1885     }
1886
1887     @Test
1888     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1889         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1890     }
1891
1892     @Test
1893     public void testCreateSnapshot() throws Exception {
1894         testCreateSnapshot(true, "testCreateSnapshot");
1895     }
1896
1897     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1898         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1899
1900         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1901         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1902             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1903                 super(delegate);
1904             }
1905
1906             @Override
1907             public void saveSnapshot(final Object obj) {
1908                 savedSnapshot.set(obj);
1909                 super.saveSnapshot(obj);
1910             }
1911         }
1912
1913         dataStoreContextBuilder.persistent(persistent);
1914
1915         class TestShard extends Shard {
1916
1917             protected TestShard(final AbstractBuilder<?, ?> builder) {
1918                 super(builder);
1919                 setPersistence(new TestPersistentDataProvider(super.persistence()));
1920             }
1921
1922             @Override
1923             public void handleCommand(final Object message) {
1924                 super.handleCommand(message);
1925
1926                 // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1927                 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1928                     latch.get().countDown();
1929                 }
1930             }
1931
1932             @Override
1933             public RaftActorContext getRaftActorContext() {
1934                 return super.getRaftActorContext();
1935             }
1936         }
1937
1938         new ShardTestKit(getSystem()) {
1939             {
1940                 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1941
1942                 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1943                         .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1944                         shardActorName);
1945
1946                 waitUntilLeader(shard);
1947                 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1948
1949                 final NormalizedNode<?, ?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1950
1951                 // Trigger creation of a snapshot by ensuring
1952                 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1953                 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1954                 awaitAndValidateSnapshot(expectedRoot);
1955
1956                 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1957                 awaitAndValidateSnapshot(expectedRoot);
1958             }
1959
1960             private void awaitAndValidateSnapshot(final NormalizedNode<?, ?> expectedRoot)
1961                     throws InterruptedException, IOException {
1962                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1963
1964                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1965
1966                 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1967
1968                 latch.set(new CountDownLatch(1));
1969                 savedSnapshot.set(null);
1970             }
1971
1972             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot)
1973                     throws IOException {
1974                 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
1975                         .getRootNode().get();
1976                 assertEquals("Root node", expectedRoot, actual);
1977             }
1978         };
1979     }
1980
1981     /**
1982      * This test simply verifies that the applySnapShot logic will work.
1983      */
1984     @Test
1985     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
1986         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1987         store.setSchemaContext(SCHEMA_CONTEXT);
1988
1989         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1990         putTransaction.write(TestModel.TEST_PATH,
1991             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1992         commitTransaction(store, putTransaction);
1993
1994
1995         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1996
1997         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1998
1999         writeTransaction.delete(YangInstanceIdentifier.EMPTY);
2000         writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
2001
2002         commitTransaction(store, writeTransaction);
2003
2004         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
2005
2006         assertEquals(expected, actual);
2007     }
2008
2009     @Test
2010     public void testRecoveryApplicable() {
2011
2012         final DatastoreContext persistentContext = DatastoreContext.newBuilder()
2013                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2014
2015         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
2016                 .schemaContext(SCHEMA_CONTEXT).props();
2017
2018         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
2019                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2020
2021         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
2022                 .schemaContext(SCHEMA_CONTEXT).props();
2023
2024         new ShardTestKit(getSystem()) {
2025             {
2026                 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2027
2028                 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2029
2030                 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2031
2032                 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2033             }
2034         };
2035     }
2036
2037     @Test
2038     public void testOnDatastoreContext() {
2039         new ShardTestKit(getSystem()) {
2040             {
2041                 dataStoreContextBuilder.persistent(true);
2042
2043                 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(),
2044                         "testOnDatastoreContext");
2045
2046                 assertEquals("isRecoveryApplicable", true,
2047                         shard.underlyingActor().persistence().isRecoveryApplicable());
2048
2049                 waitUntilLeader(shard);
2050
2051                 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2052
2053                 assertEquals("isRecoveryApplicable", false,
2054                         shard.underlyingActor().persistence().isRecoveryApplicable());
2055
2056                 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2057
2058                 assertEquals("isRecoveryApplicable", true,
2059                         shard.underlyingActor().persistence().isRecoveryApplicable());
2060             }
2061         };
2062     }
2063
2064     @Test
2065     public void testRegisterRoleChangeListener() throws Exception {
2066         new ShardTestKit(getSystem()) {
2067             {
2068                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2069                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2070                         "testRegisterRoleChangeListener");
2071
2072                 waitUntilLeader(shard);
2073
2074                 final TestActorRef<MessageCollectorActor> listener =
2075                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2076
2077                 shard.tell(new RegisterRoleChangeListener(), listener);
2078
2079                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2080
2081                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2082                     ShardLeaderStateChanged.class);
2083                 assertEquals("getLocalShardDataTree present", true,
2084                         leaderStateChanged.getLocalShardDataTree().isPresent());
2085                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2086                     leaderStateChanged.getLocalShardDataTree().get());
2087
2088                 MessageCollectorActor.clearMessages(listener);
2089
2090                 // Force a leader change
2091
2092                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2093
2094                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2095                         ShardLeaderStateChanged.class);
2096                 assertEquals("getLocalShardDataTree present", false,
2097                         leaderStateChanged.getLocalShardDataTree().isPresent());
2098             }
2099         };
2100     }
2101
2102     @Test
2103     public void testFollowerInitialSyncStatus() throws Exception {
2104         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2105                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2106                 "testFollowerInitialSyncStatus");
2107
2108         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
2109                 "member-1-shard-inventory-operational"));
2110
2111         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2112
2113         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
2114                 "member-1-shard-inventory-operational"));
2115
2116         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2117     }
2118
2119     @Test
2120     public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
2121         new ShardTestKit(getSystem()) {
2122             {
2123                 final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
2124                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
2125                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2126
2127                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2128                 final MockDataChangeListener listener = new MockDataChangeListener(1);
2129                 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
2130                         actorFactory.generateActorId(testName + "-DataChangeListener"));
2131
2132                 setupInMemorySnapshotStore();
2133
2134                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2135                         newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2136                         actorFactory.generateActorId(testName + "-shard"));
2137
2138                 waitUntilNoLeader(shard);
2139
2140                 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
2141                         getRef());
2142                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2143                         RegisterDataTreeNotificationListenerReply.class);
2144                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2145
2146                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2147                         .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2148
2149                 listener.waitForChangeEvents();
2150             }
2151         };
2152     }
2153
2154     @Test
2155     public void testClusteredDataChangeListenerRegistration() throws Exception {
2156         new ShardTestKit(getSystem()) {
2157             {
2158                 final String testName = "testClusteredDataChangeListenerRegistration";
2159                 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2160                         MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2161
2162                 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2163                         MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2164
2165                 final TestActorRef<Shard> followerShard = actorFactory
2166                         .createTestActor(Shard.builder().id(followerShardID)
2167                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
2168                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2169                                         "akka://test/user/" + leaderShardID.toString()))
2170                                 .schemaContext(SCHEMA_CONTEXT).props()
2171                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2172
2173                 final TestActorRef<Shard> leaderShard = actorFactory
2174                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
2175                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
2176                                         "akka://test/user/" + followerShardID.toString()))
2177                                 .schemaContext(SCHEMA_CONTEXT).props()
2178                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2179
2180                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2181                 final String leaderPath = waitUntilLeader(followerShard);
2182                 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2183
2184                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2185                 final MockDataChangeListener listener = new MockDataChangeListener(1);
2186                 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
2187                         actorFactory.generateActorId(testName + "-DataChangeListener"));
2188
2189                 followerShard.tell(
2190                         new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
2191                         getRef());
2192                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2193                         RegisterDataTreeNotificationListenerReply.class);
2194                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2195
2196                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2197
2198                 listener.waitForChangeEvents();
2199             }
2200         };
2201     }
2202
2203     @Test
2204     public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
2205         new ShardTestKit(getSystem()) {
2206             {
2207                 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
2208                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
2209                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2210
2211                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2212                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2213                         TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2214
2215                 setupInMemorySnapshotStore();
2216
2217                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2218                         newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2219                         actorFactory.generateActorId(testName + "-shard"));
2220
2221                 waitUntilNoLeader(shard);
2222
2223                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2224                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2225                         RegisterDataTreeNotificationListenerReply.class);
2226                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2227
2228                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2229                         .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2230
2231                 listener.waitForChangeEvents();
2232             }
2233         };
2234     }
2235
2236     @Test
2237     public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
2238         new ShardTestKit(getSystem()) {
2239             {
2240                 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
2241                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
2242                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2243
2244                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
2245                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2246                         TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2247
2248                 setupInMemorySnapshotStore();
2249
2250                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2251                         newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2252                         actorFactory.generateActorId(testName + "-shard"));
2253
2254                 waitUntilNoLeader(shard);
2255
2256                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2257                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2258                         RegisterDataTreeNotificationListenerReply.class);
2259                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2260
2261                 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
2262                 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
2263                 expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
2264
2265                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2266                         .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2267
2268                 listener.expectNoMoreChanges("Received unexpected change after close");
2269             }
2270         };
2271     }
2272
2273     @Test
2274     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2275         new ShardTestKit(getSystem()) {
2276             {
2277                 final String testName = "testClusteredDataTreeChangeListenerRegistration";
2278                 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2279                         MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2280
2281                 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2282                         MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2283
2284                 final TestActorRef<Shard> followerShard = actorFactory
2285                         .createTestActor(Shard.builder().id(followerShardID)
2286                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
2287                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2288                                         "akka://test/user/" + leaderShardID.toString()))
2289                                 .schemaContext(SCHEMA_CONTEXT).props()
2290                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2291
2292                 final TestActorRef<Shard> leaderShard = actorFactory
2293                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
2294                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
2295                                         "akka://test/user/" + followerShardID.toString()))
2296                                 .schemaContext(SCHEMA_CONTEXT).props()
2297                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2298
2299                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2300                 final String leaderPath = waitUntilLeader(followerShard);
2301                 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2302
2303                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2304                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2305                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
2306                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2307
2308                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2309                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2310                         RegisterDataTreeNotificationListenerReply.class);
2311                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2312
2313                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2314
2315                 listener.waitForChangeEvents();
2316             }
2317         };
2318     }
2319
2320     @Test
2321     public void testServerRemoved() throws Exception {
2322         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2323
2324         final ActorRef shard = parent.underlyingActor().context().actorOf(
2325                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2326                 "testServerRemoved");
2327
2328         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2329
2330         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2331     }
2332 }