Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.endsWith;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.junit.Assert.fail;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.Mockito.doThrow;
21 import static org.mockito.Mockito.inOrder;
22 import static org.mockito.Mockito.mock;
23 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
24
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.Props;
28 import akka.actor.Status.Failure;
29 import akka.dispatch.Dispatchers;
30 import akka.dispatch.OnComplete;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.SaveSnapshotSuccess;
34 import akka.testkit.TestActorRef;
35 import akka.util.Timeout;
36 import com.google.common.base.Stopwatch;
37 import com.google.common.base.Throwables;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.time.Duration;
40 import java.util.Collections;
41 import java.util.HashSet;
42 import java.util.Map;
43 import java.util.Optional;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference;
49 import org.junit.Test;
50 import org.mockito.InOrder;
51 import org.opendaylight.controller.cluster.DataPersistenceProvider;
52 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
53 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
57 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
58 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
61 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
65 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
66 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
69 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
74 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
75 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
76 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
77 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
78 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
79 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
82 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
89 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
90 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
91 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
92 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
93 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
94 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
95 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.messages.Payload;
97 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
98 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
99 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
100 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
101 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
104 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
105 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
106 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
107 import org.opendaylight.yangtools.concepts.Identifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
116 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
117 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
118 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
119 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
120 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
121 import scala.concurrent.Await;
122 import scala.concurrent.Future;
123 import scala.concurrent.duration.FiniteDuration;
124
125 public class ShardTest extends AbstractShardTest {
126     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
127             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
128
129     @Test
130     public void testRegisterDataTreeChangeListener() throws Exception {
131         final ShardTestKit testKit = new ShardTestKit(getSystem());
132         final TestActorRef<Shard> shard = actorFactory.createTestActor(
133             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
134             "testRegisterDataTreeChangeListener");
135
136         ShardTestKit.waitUntilLeader(shard);
137
138         shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
139
140         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
141         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
142             TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
143
144         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
145
146         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
147             RegisterDataTreeNotificationListenerReply.class);
148         final String replyPath = reply.getListenerRegistrationPath().toString();
149         assertTrue("Incorrect reply path: " + replyPath,
150             replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
151
152         final YangInstanceIdentifier path = TestModel.TEST_PATH;
153         writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
154
155         listener.waitForChangeEvents();
156         listener.verifyOnInitialDataEvent();
157
158         final MockDataTreeChangeListener listener2 = new MockDataTreeChangeListener(1);
159         final ActorRef dclActor2 = actorFactory.createActor(DataTreeChangeListenerActor.props(listener2,
160             TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener2");
161
162         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor2, false), testKit.getRef());
163
164         testKit.expectMsgClass(Duration.ofSeconds(3), RegisterDataTreeNotificationListenerReply.class);
165
166         listener2.waitForChangeEvents();
167         listener2.verifyNoOnInitialDataEvent();
168     }
169
170     @SuppressWarnings("serial")
171     @Test
172     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
173         final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
174         final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
175         final Creator<Shard> creator = new Creator<>() {
176             boolean firstElectionTimeout = true;
177
178             @Override
179             public Shard create() {
180                 return new Shard(newShardBuilder()) {
181                     @Override
182                     public void handleCommand(final Object message) {
183                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
184                             firstElectionTimeout = false;
185                             final ActorRef self = getSelf();
186                             new Thread(() -> {
187                                 Uninterruptibles.awaitUninterruptibly(
188                                         onChangeListenerRegistered, 5, TimeUnit.SECONDS);
189                                 self.tell(message, self);
190                             }).start();
191
192                             onFirstElectionTimeout.countDown();
193                         } else {
194                             super.handleCommand(message);
195                         }
196                     }
197                 };
198             }
199         };
200
201         setupInMemorySnapshotStore();
202
203         final YangInstanceIdentifier path = TestModel.TEST_PATH;
204         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
205         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
206                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
207
208         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
209                 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
210                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
211
212         final ShardTestKit testKit = new ShardTestKit(getSystem());
213         assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
214
215         shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
216         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
217             RegisterDataTreeNotificationListenerReply.class);
218         assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
219
220         shard.tell(FindLeader.INSTANCE, testKit.getRef());
221         final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
222         assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
223
224         onChangeListenerRegistered.countDown();
225
226         // TODO: investigate why we do not receive data chage events
227         listener.waitForChangeEvents();
228     }
229
230     @Test
231     public void testCreateTransaction() {
232         final ShardTestKit testKit = new ShardTestKit(getSystem());
233         final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
234
235         ShardTestKit.waitUntilLeader(shard);
236
237         shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
238
239         shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
240             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
241
242         final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
243             CreateTransactionReply.class);
244
245         final String path = reply.getTransactionPath().toString();
246         assertThat(path, containsString(String.format("/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
247             shardID.getShardName(), shardID.getMemberName().getName())));
248     }
249
250     @Test
251     public void testCreateTransactionOnChain() {
252         final ShardTestKit testKit = new ShardTestKit(getSystem());
253         final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
254
255         ShardTestKit.waitUntilLeader(shard);
256
257         shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
258             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
259
260         final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
261             CreateTransactionReply.class);
262
263         final String path = reply.getTransactionPath().toString();
264         assertThat(path, containsString(String.format(
265             "/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
266             shardID.getShardName(), shardID.getMemberName().getName())));
267     }
268
269     @Test
270     public void testPeerAddressResolved() {
271         final ShardTestKit testKit = new ShardTestKit(getSystem());
272         final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
273                 "config");
274         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
275             .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
276             .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
277
278         final String address = "akka://foobar";
279         shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
280
281         shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
282         final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
283         assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
284     }
285
286     @Test
287     public void testApplySnapshot() throws Exception {
288
289         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
290                 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
291
292         ShardTestKit.waitUntilLeader(shard);
293
294         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
295             SCHEMA_CONTEXT);
296
297         final ContainerNode container = Builders.containerBuilder()
298             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
299             .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
300                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1))
301                 .build())
302             .build();
303
304         writeToStore(store, TestModel.TEST_PATH, container);
305
306         final YangInstanceIdentifier root = YangInstanceIdentifier.empty();
307         final NormalizedNode expected = readStore(store, root);
308
309         final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
310                 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
311
312         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
313
314         final Stopwatch sw = Stopwatch.createStarted();
315         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
316             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
317
318             try {
319                 assertEquals("Root node", expected, readStore(shard, root));
320                 return;
321             } catch (final AssertionError e) {
322                 // try again
323             }
324         }
325
326         fail("Snapshot was not applied");
327     }
328
329     @Test
330     public void testApplyState() throws Exception {
331         final TestActorRef<Shard> shard = actorFactory.createTestActor(
332                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
333
334         ShardTestKit.waitUntilLeader(shard);
335
336         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
337             SCHEMA_CONTEXT);
338
339         final DataTreeModification writeMod = store.takeSnapshot().newModification();
340         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
341         writeMod.write(TestModel.TEST_PATH, node);
342         writeMod.ready();
343
344         final TransactionIdentifier tx = nextTransactionId();
345         shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
346
347         final Stopwatch sw = Stopwatch.createStarted();
348         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
349             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
350
351             final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH);
352             if (actual != null) {
353                 assertEquals("Applied state", node, actual);
354                 return;
355             }
356         }
357
358         fail("State was not applied");
359     }
360
361     @Test
362     public void testDataTreeCandidateRecovery() throws Exception {
363         // Set up the InMemorySnapshotStore.
364         final DataTree source = setupInMemorySnapshotStore();
365
366         final DataTreeModification writeMod = source.takeSnapshot().newModification();
367         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
368         writeMod.ready();
369         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
370
371         // Set up the InMemoryJournal.
372         InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
373             payloadForModification(source, writeMod, nextTransactionId())));
374
375         final int nListEntries = 16;
376         final Set<Integer> listEntryKeys = new HashSet<>();
377
378         // Add some ModificationPayload entries
379         for (int i = 1; i <= nListEntries; i++) {
380             listEntryKeys.add(i);
381
382             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
383                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
384
385             final DataTreeModification mod = source.takeSnapshot().newModification();
386             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
387             mod.ready();
388
389             InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
390                 payloadForModification(source, mod, nextTransactionId())));
391         }
392
393         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
394             new ApplyJournalEntries(nListEntries));
395
396         testRecovery(listEntryKeys, true);
397     }
398
399     @Test
400     @SuppressWarnings("checkstyle:IllegalCatch")
401     public void testConcurrentThreePhaseCommits() throws Exception {
402         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
403         final CountDownLatch commitLatch = new CountDownLatch(2);
404
405         final long timeoutSec = 5;
406         final Duration duration = Duration.ofSeconds(timeoutSec);
407         final Timeout timeout = Timeout.create(duration);
408
409         final TestActorRef<Shard> shard = actorFactory.createTestActor(
410                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
411                 "testConcurrentThreePhaseCommits");
412
413         class OnFutureComplete extends OnComplete<Object> {
414             private final Class<?> expRespType;
415
416             OnFutureComplete(final Class<?> expRespType) {
417                 this.expRespType = expRespType;
418             }
419
420             @Override
421             public void onComplete(final Throwable error, final Object resp) {
422                 if (error != null) {
423                     caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
424                 } else {
425                     try {
426                         assertEquals("Commit response type", expRespType, resp.getClass());
427                         onSuccess(resp);
428                     } catch (final Exception e) {
429                         caughtEx.set(e);
430                     }
431                 }
432             }
433
434             void onSuccess(final Object resp) {
435             }
436         }
437
438         class OnCommitFutureComplete extends OnFutureComplete {
439             OnCommitFutureComplete() {
440                 super(CommitTransactionReply.class);
441             }
442
443             @Override
444             public void onComplete(final Throwable error, final Object resp) {
445                 super.onComplete(error, resp);
446                 commitLatch.countDown();
447             }
448         }
449
450         class OnCanCommitFutureComplete extends OnFutureComplete {
451             private final TransactionIdentifier transactionID;
452
453             OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
454                 super(CanCommitTransactionReply.class);
455                 this.transactionID = transactionID;
456             }
457
458             @Override
459             void onSuccess(final Object resp) {
460                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
461                 assertTrue("Can commit", canCommitReply.getCanCommit());
462
463                 final Future<Object> commitFuture = Patterns.ask(shard,
464                         new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
465                 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
466             }
467         }
468
469         final ShardTestKit testKit = new ShardTestKit(getSystem());
470         ShardTestKit.waitUntilLeader(shard);
471
472         final TransactionIdentifier transactionID1 = nextTransactionId();
473         final TransactionIdentifier transactionID2 = nextTransactionId();
474         final TransactionIdentifier transactionID3 = nextTransactionId();
475
476         final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
477             shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
478         final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
479         final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
480         final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
481
482         shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
483             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
484         final ReadyTransactionReply readyReply = ReadyTransactionReply
485                 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
486
487         String pathSuffix = shard.path().toString().replaceFirst("akka://test", "");
488         assertThat(readyReply.getCohortPath(), endsWith(pathSuffix));
489         // Send the CanCommitTransaction message for the first Tx.
490
491         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
492         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
493                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
494         assertTrue("Can commit", canCommitReply.getCanCommit());
495
496         // Ready 2 more Tx's.
497
498         shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
499             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
500         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
501
502         shard.tell(
503             prepareBatchedModifications(transactionID3,
504                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
505                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
506                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
507         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
508
509         // Send the CanCommitTransaction message for the next 2 Tx's.
510         // These should get queued and
511         // processed after the first Tx completes.
512
513         final Future<Object> canCommitFuture1 = Patterns.ask(shard,
514             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
515
516         final Future<Object> canCommitFuture2 = Patterns.ask(shard,
517             new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
518
519         // Send the CommitTransaction message for the first Tx. After it
520         // completes, it should
521         // trigger the 2nd Tx to proceed which should in turn then
522         // trigger the 3rd.
523
524         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
525         testKit.expectMsgClass(duration, CommitTransactionReply.class);
526
527         // Wait for the next 2 Tx's to complete.
528
529         canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
530
531         canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
532
533         final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
534
535         final Throwable t = caughtEx.get();
536         if (t != null) {
537             Throwables.propagateIfPossible(t, Exception.class);
538             throw new RuntimeException(t);
539         }
540
541         assertTrue("Commits complete", done);
542
543 //                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
544 //                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
545 //                        cohort3.getPreCommit(), cohort3.getCommit());
546 //                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
547 //                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
548 //                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
549 //                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
550 //                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
551 //                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
552 //                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
553 //                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
554 //                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
555
556         // Verify data in the data store.
557
558         verifyOuterListEntry(shard, 1);
559
560         verifyLastApplied(shard, 3);
561     }
562
563     @Test
564     public void testBatchedModificationsWithNoCommitOnReady() {
565         final ShardTestKit testKit = new ShardTestKit(getSystem());
566         final TestActorRef<Shard> shard = actorFactory.createTestActor(
567             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
568             "testBatchedModificationsWithNoCommitOnReady");
569
570         ShardTestKit.waitUntilLeader(shard);
571
572         final TransactionIdentifier transactionID = nextTransactionId();
573         final Duration duration = Duration.ofSeconds(5);
574
575         // Send a BatchedModifications to start a transaction.
576
577         shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
578             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
579         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
580
581         // Send a couple more BatchedModifications.
582
583         shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
584                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
585             testKit.getRef());
586         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
587
588         shard.tell(newBatchedModifications(transactionID,
589             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
590             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
591             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
592             testKit.getRef());
593         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
594
595         // Send the CanCommitTransaction message.
596
597         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
598         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
599                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
600         assertTrue("Can commit", canCommitReply.getCanCommit());
601
602         // Send the CommitTransaction message.
603
604         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
605         testKit.expectMsgClass(duration, CommitTransactionReply.class);
606
607         // Verify data in the data store.
608
609         verifyOuterListEntry(shard, 1);
610     }
611
612     @Test
613     public void testBatchedModificationsWithCommitOnReady() {
614         final ShardTestKit testKit = new ShardTestKit(getSystem());
615         final TestActorRef<Shard> shard = actorFactory.createTestActor(
616             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
617             "testBatchedModificationsWithCommitOnReady");
618
619         ShardTestKit.waitUntilLeader(shard);
620
621         final TransactionIdentifier transactionID = nextTransactionId();
622         final Duration duration = Duration.ofSeconds(5);
623
624         // Send a BatchedModifications to start a transaction.
625
626         shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
627             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
628         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
629
630         // Send a couple more BatchedModifications.
631
632         shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
633             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
634             testKit.getRef());
635         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
636
637         shard.tell(newBatchedModifications(transactionID,
638             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
639             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
640             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
641             testKit.getRef());
642
643         testKit.expectMsgClass(duration, CommitTransactionReply.class);
644
645         // Verify data in the data store.
646         verifyOuterListEntry(shard, 1);
647     }
648
649     @Test(expected = IllegalStateException.class)
650     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
651         final ShardTestKit testKit = new ShardTestKit(getSystem());
652         final TestActorRef<Shard> shard = actorFactory.createTestActor(
653             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
654             "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
655
656         ShardTestKit.waitUntilLeader(shard);
657
658         final TransactionIdentifier transactionID = nextTransactionId();
659         final BatchedModifications batched = new BatchedModifications(transactionID,
660             DataStoreVersions.CURRENT_VERSION);
661         batched.setReady();
662         batched.setTotalMessagesSent(2);
663
664         shard.tell(batched, testKit.getRef());
665
666         final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
667
668         if (failure != null) {
669             Throwables.propagateIfPossible(failure.cause(), Exception.class);
670             throw new RuntimeException(failure.cause());
671         }
672     }
673
674     @Test
675     public void testBatchedModificationsWithOperationFailure() {
676         final ShardTestKit testKit = new ShardTestKit(getSystem());
677         final TestActorRef<Shard> shard = actorFactory.createTestActor(
678             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
679             "testBatchedModificationsWithOperationFailure");
680
681         ShardTestKit.waitUntilLeader(shard);
682
683         // Test merge with invalid data. An exception should occur when
684         // the merge is applied. Note that
685         // write will not validate the children for performance reasons.
686
687         final TransactionIdentifier transactionID = nextTransactionId();
688
689         final ContainerNode invalidData = Builders.containerBuilder()
690             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
691             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
692             .build();
693
694         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
695         batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
696         shard.tell(batched, testKit.getRef());
697         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
698
699         final Throwable cause = failure.cause();
700
701         batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
702         batched.setReady();
703         batched.setTotalMessagesSent(2);
704
705         shard.tell(batched, testKit.getRef());
706
707         failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
708         assertEquals("Failure cause", cause, failure.cause());
709     }
710
711     @Test
712     public void testBatchedModificationsOnTransactionChain() {
713         final ShardTestKit testKit = new ShardTestKit(getSystem());
714         final TestActorRef<Shard> shard = actorFactory.createTestActor(
715             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
716             "testBatchedModificationsOnTransactionChain");
717
718         ShardTestKit.waitUntilLeader(shard);
719
720         final LocalHistoryIdentifier historyId = nextHistoryId();
721         final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
722         final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
723
724         final Duration duration = Duration.ofSeconds(5);
725
726         // Send a BatchedModifications to start a chained write
727         // transaction and ready it.
728
729         final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
730         final YangInstanceIdentifier path = TestModel.TEST_PATH;
731         shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
732         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
733
734         // Create a read Tx on the same chain.
735
736         shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
737             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
738
739         final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
740             CreateTransactionReply.class);
741
742         getSystem().actorSelection(createReply.getTransactionPath())
743         .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
744         final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
745         assertEquals("Read node", containerNode, readReply.getNormalizedNode());
746
747         // Commit the write transaction.
748
749         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
750         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
751                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
752         assertTrue("Can commit", canCommitReply.getCanCommit());
753
754         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
755         testKit.expectMsgClass(duration, CommitTransactionReply.class);
756
757         // Verify data in the data store.
758
759         final NormalizedNode actualNode = readStore(shard, path);
760         assertEquals("Stored node", containerNode, actualNode);
761     }
762
763     @Test
764     public void testOnBatchedModificationsWhenNotLeader() {
765         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
766         final ShardTestKit testKit = new ShardTestKit(getSystem());
767         final Creator<Shard> creator = new Creator<>() {
768             private static final long serialVersionUID = 1L;
769
770             @Override
771             public Shard create() {
772                 return new Shard(newShardBuilder()) {
773                     @Override
774                     protected boolean isLeader() {
775                         return overrideLeaderCalls.get() ? false : super.isLeader();
776                     }
777
778                     @Override
779                     public ActorSelection getLeader() {
780                         return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
781                                 : super.getLeader();
782                     }
783                 };
784             }
785         };
786
787         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
788             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
789             "testOnBatchedModificationsWhenNotLeader");
790
791         ShardTestKit.waitUntilLeader(shard);
792
793         overrideLeaderCalls.set(true);
794
795         final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
796             DataStoreVersions.CURRENT_VERSION);
797
798         shard.tell(batched, ActorRef.noSender());
799
800         testKit.expectMsgEquals(batched);
801     }
802
803     @Test
804     public void testTransactionMessagesWithNoLeader() {
805         final ShardTestKit testKit = new ShardTestKit(getSystem());
806         dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
807         .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
808         final TestActorRef<Shard> shard = actorFactory.createTestActor(
809             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
810             "testTransactionMessagesWithNoLeader");
811
812         testKit.waitUntilNoLeader(shard);
813
814         final TransactionIdentifier txId = nextTransactionId();
815         shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
816         Failure failure = testKit.expectMsgClass(Failure.class);
817         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
818
819         shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
820             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
821         failure = testKit.expectMsgClass(Failure.class);
822         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
823
824         shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
825             testKit.getRef());
826         failure = testKit.expectMsgClass(Failure.class);
827         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
828     }
829
830     @Test
831     public void testReadyWithReadWriteImmediateCommit() {
832         testReadyWithImmediateCommit(true);
833     }
834
835     @Test
836     public void testReadyWithWriteOnlyImmediateCommit() {
837         testReadyWithImmediateCommit(false);
838     }
839
840     private void testReadyWithImmediateCommit(final boolean readWrite) {
841         final ShardTestKit testKit = new ShardTestKit(getSystem());
842         final TestActorRef<Shard> shard = actorFactory.createTestActor(
843             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
844             "testReadyWithImmediateCommit-" + readWrite);
845
846         ShardTestKit.waitUntilLeader(shard);
847
848         final TransactionIdentifier transactionID = nextTransactionId();
849         final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
850         if (readWrite) {
851             shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
852                 testKit.getRef());
853         } else {
854             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
855                 testKit.getRef());
856         }
857
858         testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
859
860         final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
861         assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
862     }
863
864     @Test
865     public void testReadyLocalTransactionWithImmediateCommit() {
866         final ShardTestKit testKit = new ShardTestKit(getSystem());
867         final TestActorRef<Shard> shard = actorFactory.createTestActor(
868             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
869             "testReadyLocalTransactionWithImmediateCommit");
870
871         ShardTestKit.waitUntilLeader(shard);
872
873         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
874
875         final DataTreeModification modification = dataStore.newModification();
876
877         final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
878         new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
879         final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
880                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
881                 .build();
882         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
883
884         final TransactionIdentifier txId = nextTransactionId();
885         modification.ready();
886         final ReadyLocalTransaction readyMessage =
887                 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
888
889         shard.tell(readyMessage, testKit.getRef());
890
891         testKit.expectMsgClass(CommitTransactionReply.class);
892
893         final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
894         assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
895     }
896
897     @Test
898     public void testReadyLocalTransactionWithThreePhaseCommit() {
899         final ShardTestKit testKit = new ShardTestKit(getSystem());
900         final TestActorRef<Shard> shard = actorFactory.createTestActor(
901             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
902             "testReadyLocalTransactionWithThreePhaseCommit");
903
904         ShardTestKit.waitUntilLeader(shard);
905
906         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
907
908         final DataTreeModification modification = dataStore.newModification();
909
910         final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
911         new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
912         final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
913                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
914                 .build();
915         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
916
917         final TransactionIdentifier txId = nextTransactionId();
918         modification.ready();
919         final ReadyLocalTransaction readyMessage =
920                 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
921
922         shard.tell(readyMessage, testKit.getRef());
923
924         testKit.expectMsgClass(ReadyTransactionReply.class);
925
926         // Send the CanCommitTransaction message.
927
928         shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
929         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
930                 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
931         assertTrue("Can commit", canCommitReply.getCanCommit());
932
933         // Send the CanCommitTransaction message.
934
935         shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
936         testKit.expectMsgClass(CommitTransactionReply.class);
937
938         final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
939         assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
940     }
941
942     @Test
943     public void testReadWriteCommitWithPersistenceDisabled() {
944         dataStoreContextBuilder.persistent(false);
945         final ShardTestKit testKit = new ShardTestKit(getSystem());
946         final TestActorRef<Shard> shard = actorFactory.createTestActor(
947             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
948             "testCommitWithPersistenceDisabled");
949
950         ShardTestKit.waitUntilLeader(shard);
951
952         // Setup a simulated transactions with a mock cohort.
953
954         final Duration duration = Duration.ofSeconds(5);
955
956         final TransactionIdentifier transactionID = nextTransactionId();
957         final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
958         shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
959             testKit.getRef());
960         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
961
962         // Send the CanCommitTransaction message.
963
964         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
965         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
966                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
967         assertTrue("Can commit", canCommitReply.getCanCommit());
968
969         // Send the CanCommitTransaction message.
970
971         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
972         testKit.expectMsgClass(duration, CommitTransactionReply.class);
973
974         final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
975         assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
976     }
977
978     @Test
979     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
980         testCommitWhenTransactionHasModifications(true);
981     }
982
983     @Test
984     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
985         testCommitWhenTransactionHasModifications(false);
986     }
987
988     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
989         final ShardTestKit testKit = new ShardTestKit(getSystem());
990         final DataTree dataTree = createDelegatingMockDataTree();
991         final TestActorRef<Shard> shard = actorFactory.createTestActor(
992             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
993             "testCommitWhenTransactionHasModifications-" + readWrite);
994
995         ShardTestKit.waitUntilLeader(shard);
996
997         final Duration duration = Duration.ofSeconds(5);
998         final TransactionIdentifier transactionID = nextTransactionId();
999
1000         if (readWrite) {
1001             shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1002                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1003         } else {
1004             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1005                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1006         }
1007
1008         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1009
1010         // Send the CanCommitTransaction message.
1011
1012         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1013         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1014                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1015         assertTrue("Can commit", canCommitReply.getCanCommit());
1016
1017         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1018         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1019
1020         final InOrder inOrder = inOrder(dataTree);
1021         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1022         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1023         inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1024
1025         // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1026         // the journal
1027         Thread.sleep(HEARTBEAT_MILLIS * 2);
1028
1029         shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1030         final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1031
1032         // Use MBean for verification
1033         // Committed transaction count should increase as usual
1034         assertEquals(1, shardStats.getCommittedTransactionsCount());
1035
1036         // Commit index should advance 1 to account for disabling metadata
1037         assertEquals(1, shardStats.getCommitIndex());
1038     }
1039
1040     @Test
1041     public void testCommitPhaseFailure() throws Exception {
1042         final ShardTestKit testKit = new ShardTestKit(getSystem());
1043         final DataTree dataTree = createDelegatingMockDataTree();
1044         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1045             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1046             "testCommitPhaseFailure");
1047
1048         ShardTestKit.waitUntilLeader(shard);
1049
1050         final Duration duration = Duration.ofSeconds(5);
1051         final Timeout timeout = Timeout.create(duration);
1052
1053         // Setup 2 simulated transactions with mock cohorts. The first
1054         // one fails in the
1055         // commit phase.
1056
1057         doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1058         .commit(any(DataTreeCandidate.class));
1059
1060         final TransactionIdentifier transactionID1 = nextTransactionId();
1061         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1062             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1063         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1064
1065         final TransactionIdentifier transactionID2 = nextTransactionId();
1066         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1067             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1068         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1069
1070         // Send the CanCommitTransaction message for the first Tx.
1071
1072         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1073         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1074                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1075         assertTrue("Can commit", canCommitReply.getCanCommit());
1076
1077         // Send the CanCommitTransaction message for the 2nd Tx. This
1078         // should get queued and
1079         // processed after the first Tx completes.
1080
1081         final Future<Object> canCommitFuture = Patterns.ask(shard,
1082             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1083
1084         // Send the CommitTransaction message for the first Tx. This
1085         // should send back an error
1086         // and trigger the 2nd Tx to proceed.
1087
1088         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1089         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1090
1091         // Wait for the 2nd Tx to complete the canCommit phase.
1092
1093         final CountDownLatch latch = new CountDownLatch(1);
1094         canCommitFuture.onComplete(new OnComplete<>() {
1095             @Override
1096             public void onComplete(final Throwable failure, final Object resp) {
1097                 latch.countDown();
1098             }
1099         }, getSystem().dispatcher());
1100
1101         assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1102
1103         final InOrder inOrder = inOrder(dataTree);
1104         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1105         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1106
1107         // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1108         //        validate performs wrapping and we capture that mock
1109         // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1110
1111         inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1112     }
1113
1114     @Test
1115     public void testPreCommitPhaseFailure() throws Exception {
1116         final ShardTestKit testKit = new ShardTestKit(getSystem());
1117         final DataTree dataTree = createDelegatingMockDataTree();
1118         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1119             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1120             "testPreCommitPhaseFailure");
1121
1122         ShardTestKit.waitUntilLeader(shard);
1123
1124         final Duration duration = Duration.ofSeconds(5);
1125         final Timeout timeout = Timeout.create(duration);
1126
1127         doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1128         .prepare(any(DataTreeModification.class));
1129
1130         final TransactionIdentifier transactionID1 = nextTransactionId();
1131         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1132             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1133         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1134
1135         final TransactionIdentifier transactionID2 = nextTransactionId();
1136         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1137             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1138         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1139
1140         // Send the CanCommitTransaction message for the first Tx.
1141
1142         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1143         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1144                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1145         assertTrue("Can commit", canCommitReply.getCanCommit());
1146
1147         // Send the CanCommitTransaction message for the 2nd Tx. This
1148         // should get queued and
1149         // processed after the first Tx completes.
1150
1151         final Future<Object> canCommitFuture = Patterns.ask(shard,
1152             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1153
1154         // Send the CommitTransaction message for the first Tx. This
1155         // should send back an error
1156         // and trigger the 2nd Tx to proceed.
1157
1158         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1159         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1160
1161         // Wait for the 2nd Tx to complete the canCommit phase.
1162
1163         final CountDownLatch latch = new CountDownLatch(1);
1164         canCommitFuture.onComplete(new OnComplete<>() {
1165             @Override
1166             public void onComplete(final Throwable failure, final Object resp) {
1167                 latch.countDown();
1168             }
1169         }, getSystem().dispatcher());
1170
1171         assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1172
1173         final InOrder inOrder = inOrder(dataTree);
1174         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1175         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1176         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1177     }
1178
1179     @Test
1180     public void testCanCommitPhaseFailure() throws Exception {
1181         final ShardTestKit testKit = new ShardTestKit(getSystem());
1182         final DataTree dataTree = createDelegatingMockDataTree();
1183         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1184             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1185             "testCanCommitPhaseFailure");
1186
1187         ShardTestKit.waitUntilLeader(shard);
1188
1189         final Duration duration = Duration.ofSeconds(5);
1190         final TransactionIdentifier transactionID1 = nextTransactionId();
1191
1192         doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure"))
1193         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1194
1195         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1196             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1197         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1198
1199         // Send the CanCommitTransaction message.
1200
1201         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1202         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1203
1204         // Send another can commit to ensure the failed one got cleaned
1205         // up.
1206
1207         final TransactionIdentifier transactionID2 = nextTransactionId();
1208         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1209             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1210         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1211
1212         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1213         final CanCommitTransactionReply reply = CanCommitTransactionReply
1214                 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1215         assertTrue("getCanCommit", reply.getCanCommit());
1216     }
1217
1218     @Test
1219     public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1220         testImmediateCommitWithCanCommitPhaseFailure(true);
1221         testImmediateCommitWithCanCommitPhaseFailure(false);
1222     }
1223
1224     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1225         final ShardTestKit testKit = new ShardTestKit(getSystem());
1226         final DataTree dataTree = createDelegatingMockDataTree();
1227         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1228             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1229             "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1230
1231         ShardTestKit.waitUntilLeader(shard);
1232
1233         doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure"))
1234         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1235
1236         final Duration duration = Duration.ofSeconds(5);
1237
1238         final TransactionIdentifier transactionID1 = nextTransactionId();
1239
1240         if (readWrite) {
1241             shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1242                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1243         } else {
1244             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1245                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1246         }
1247
1248         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1249
1250         // Send another can commit to ensure the failed one got cleaned
1251         // up.
1252
1253         final TransactionIdentifier transactionID2 = nextTransactionId();
1254         if (readWrite) {
1255             shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1256                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1257         } else {
1258             shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1259                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1260         }
1261
1262         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1263     }
1264
1265     @Test
1266     public void testAbortWithCommitPending() {
1267         final ShardTestKit testKit = new ShardTestKit(getSystem());
1268         final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1269             @Override
1270             void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
1271                 // Simulate an AbortTransaction message occurring during
1272                 // replication, after
1273                 // persisting and before finishing the commit to the
1274                 // in-memory store.
1275
1276                 doAbortTransaction(id, null);
1277                 super.persistPayload(id, payload, batchHint);
1278             }
1279         };
1280
1281         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1282             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1283             "testAbortWithCommitPending");
1284
1285         ShardTestKit.waitUntilLeader(shard);
1286
1287         final Duration duration = Duration.ofSeconds(5);
1288
1289         final TransactionIdentifier transactionID = nextTransactionId();
1290
1291         shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1292             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1293         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1294
1295         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1296         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1297
1298         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1299         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1300
1301         final NormalizedNode node = readStore(shard, TestModel.TEST_PATH);
1302
1303         // Since we're simulating an abort occurring during replication
1304         // and before finish commit,
1305         // the data should still get written to the in-memory store
1306         // since we've gotten past
1307         // canCommit and preCommit and persisted the data.
1308         assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1309     }
1310
1311     @Test
1312     public void testTransactionCommitTimeout() throws Exception {
1313         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1314         final ShardTestKit testKit = new ShardTestKit(getSystem());
1315         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1316             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1317             "testTransactionCommitTimeout");
1318
1319         ShardTestKit.waitUntilLeader(shard);
1320
1321         final Duration duration = Duration.ofSeconds(5);
1322
1323         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1324         writeToStore(shard, TestModel.OUTER_LIST_PATH,
1325             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1326
1327         // Ready 2 Tx's - the first will timeout
1328
1329         final TransactionIdentifier transactionID1 = nextTransactionId();
1330         shard.tell(
1331             prepareBatchedModifications(transactionID1,
1332                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1333                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1334                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1335             testKit.getRef());
1336         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1337
1338         final TransactionIdentifier transactionID2 = nextTransactionId();
1339         final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1340                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1341         shard.tell(
1342             prepareBatchedModifications(transactionID2, listNodePath,
1343                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1344         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1345
1346         // canCommit 1st Tx. We don't send the commit so it should
1347         // timeout.
1348
1349         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1350         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1351
1352         // canCommit the 2nd Tx - it should complete after the 1st Tx
1353         // times out.
1354
1355         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1356         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1357
1358         // Try to commit the 1st Tx - should fail as it's not the
1359         // current Tx.
1360
1361         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1362         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1363
1364         // Commit the 2nd Tx.
1365
1366         shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1367         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1368
1369         final NormalizedNode node = readStore(shard, listNodePath);
1370         assertNotNull(listNodePath + " not found", node);
1371     }
1372
1373 //    @Test
1374 //    @Ignore
1375 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1376 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1377 //
1378 //        new ShardTestKit(getSystem()) {{
1379 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1380 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1381 //                    "testTransactionCommitQueueCapacityExceeded");
1382 //
1383 //            waitUntilLeader(shard);
1384 //
1385 //            final FiniteDuration duration = duration("5 seconds");
1386 //
1387 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1388 //
1389 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1390 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1391 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1392 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1393 //                    modification1);
1394 //
1395 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1396 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1397 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1398 //                    TestModel.OUTER_LIST_PATH,
1399 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1400 //                    modification2);
1401 //
1402 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1403 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1404 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1405 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1406 //                    modification3);
1407 //
1408 //            // Ready the Tx's
1409 //
1410 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1411 //                    modification1), getRef());
1412 //            expectMsgClass(duration, ReadyTransactionReply.class);
1413 //
1414 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1415 //                    modification2), getRef());
1416 //            expectMsgClass(duration, ReadyTransactionReply.class);
1417 //
1418 //            // The 3rd Tx should exceed queue capacity and fail.
1419 //
1420 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1421 //                    modification3), getRef());
1422 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1423 //
1424 //            // canCommit 1st Tx.
1425 //
1426 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1427 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1428 //
1429 //            // canCommit the 2nd Tx - it should get queued.
1430 //
1431 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1432 //
1433 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1434 //
1435 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1436 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1437 //        }};
1438 //    }
1439
1440     @Test
1441     public void testTransactionCommitWithPriorExpiredCohortEntries() {
1442         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1443         final ShardTestKit testKit = new ShardTestKit(getSystem());
1444         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1445             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1446             "testTransactionCommitWithPriorExpiredCohortEntries");
1447
1448         ShardTestKit.waitUntilLeader(shard);
1449
1450         final Duration duration = Duration.ofSeconds(5);
1451
1452         final TransactionIdentifier transactionID1 = nextTransactionId();
1453         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1454             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1455         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1456
1457         final TransactionIdentifier transactionID2 = nextTransactionId();
1458         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1459             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1460         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1461
1462         final TransactionIdentifier transactionID3 = nextTransactionId();
1463         shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1464             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1465         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1466
1467         // All Tx's are readied. We'll send canCommit for the last one
1468         // but not the others. The others
1469         // should expire from the queue and the last one should be
1470         // processed.
1471
1472         shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1473         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1474     }
1475
1476     @Test
1477     public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1478         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1479         final ShardTestKit testKit = new ShardTestKit(getSystem());
1480         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1481             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1482             "testTransactionCommitWithSubsequentExpiredCohortEntry");
1483
1484         ShardTestKit.waitUntilLeader(shard);
1485
1486         final Duration duration = Duration.ofSeconds(5);
1487
1488         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1489
1490         final TransactionIdentifier transactionID1 = nextTransactionId();
1491         shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1492             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1493         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1494
1495         // CanCommit the first Tx so it's the current in-progress Tx.
1496
1497         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1498         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1499
1500         // Ready the second Tx.
1501
1502         final TransactionIdentifier transactionID2 = nextTransactionId();
1503         shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1504             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1505         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1506
1507         // Ready the third Tx.
1508
1509         final TransactionIdentifier transactionID3 = nextTransactionId();
1510         final DataTreeModification modification3 = dataStore.newModification();
1511         new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1512             .apply(modification3);
1513         modification3.ready();
1514         final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1515             true, Optional.empty());
1516         shard.tell(readyMessage, testKit.getRef());
1517
1518         // Commit the first Tx. After completing, the second should
1519         // expire from the queue and the third
1520         // Tx committed.
1521
1522         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1523         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1524
1525         // Expect commit reply from the third Tx.
1526
1527         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1528
1529         final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH);
1530         assertNotNull(TestModel.TEST2_PATH + " not found", node);
1531     }
1532
1533     @Test
1534     public void testCanCommitBeforeReadyFailure() {
1535         final ShardTestKit testKit = new ShardTestKit(getSystem());
1536         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1537             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1538             "testCanCommitBeforeReadyFailure");
1539
1540         shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1541         testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
1542     }
1543
1544     @Test
1545     public void testAbortAfterCanCommit() throws Exception {
1546         final ShardTestKit testKit = new ShardTestKit(getSystem());
1547         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1548             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1549
1550         ShardTestKit.waitUntilLeader(shard);
1551
1552         final Duration duration = Duration.ofSeconds(5);
1553         final Timeout timeout = Timeout.create(duration);
1554
1555         // Ready 2 transactions - the first one will be aborted.
1556
1557         final TransactionIdentifier transactionID1 = nextTransactionId();
1558         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1559             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1560         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1561
1562         final TransactionIdentifier transactionID2 = nextTransactionId();
1563         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1564             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1565         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1566
1567         // Send the CanCommitTransaction message for the first Tx.
1568
1569         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1570         CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1571                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1572         assertTrue("Can commit", canCommitReply.getCanCommit());
1573
1574         // Send the CanCommitTransaction message for the 2nd Tx. This
1575         // should get queued and
1576         // processed after the first Tx completes.
1577
1578         final Future<Object> canCommitFuture = Patterns.ask(shard,
1579             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1580
1581         // Send the AbortTransaction message for the first Tx. This
1582         // should trigger the 2nd
1583         // Tx to proceed.
1584
1585         shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1586         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1587
1588         // Wait for the 2nd Tx to complete the canCommit phase.
1589
1590         canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
1591             FiniteDuration.create(5, TimeUnit.SECONDS));
1592         assertTrue("Can commit", canCommitReply.getCanCommit());
1593     }
1594
1595     @Test
1596     public void testAbortAfterReady() {
1597         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1598         final ShardTestKit testKit = new ShardTestKit(getSystem());
1599         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1600             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1601
1602         ShardTestKit.waitUntilLeader(shard);
1603
1604         final Duration duration = Duration.ofSeconds(5);
1605
1606         // Ready a tx.
1607
1608         final TransactionIdentifier transactionID1 = nextTransactionId();
1609         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1610             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1611         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1612
1613         // Send the AbortTransaction message.
1614
1615         shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1616         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1617
1618         assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1619
1620         // Now send CanCommitTransaction - should fail.
1621
1622         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1623         final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1624         assertTrue("Failure type", failure instanceof IllegalStateException);
1625
1626         // Ready and CanCommit another and verify success.
1627
1628         final TransactionIdentifier transactionID2 = nextTransactionId();
1629         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1630             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1631         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1632
1633         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1634         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1635     }
1636
1637     @Test
1638     public void testAbortQueuedTransaction() {
1639         final ShardTestKit testKit = new ShardTestKit(getSystem());
1640         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1641             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1642
1643         ShardTestKit.waitUntilLeader(shard);
1644
1645         final Duration duration = Duration.ofSeconds(5);
1646
1647         // Ready 3 tx's.
1648
1649         final TransactionIdentifier transactionID1 = nextTransactionId();
1650         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1651             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1652         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1653
1654         final TransactionIdentifier transactionID2 = nextTransactionId();
1655         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1656             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1657         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1658
1659         final TransactionIdentifier transactionID3 = nextTransactionId();
1660         shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1661                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1662         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1663
1664         // Abort the second tx while it's queued.
1665
1666         shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1667         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1668
1669         // Commit the other 2.
1670
1671         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1672         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1673
1674         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1675         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1676
1677         shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1678         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1679
1680         shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1681         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1682
1683         assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1684     }
1685
1686     @Test
1687     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1688         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1689     }
1690
1691     @Test
1692     public void testCreateSnapshot() throws Exception {
1693         testCreateSnapshot(true, "testCreateSnapshot");
1694     }
1695
1696     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1697         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1698
1699         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1700         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1701             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1702                 super(delegate);
1703             }
1704
1705             @Override
1706             public void saveSnapshot(final Object obj) {
1707                 savedSnapshot.set(obj);
1708                 super.saveSnapshot(obj);
1709             }
1710         }
1711
1712         dataStoreContextBuilder.persistent(persistent);
1713
1714         final class TestShard extends Shard {
1715
1716             TestShard(final AbstractBuilder<?, ?> builder) {
1717                 super(builder);
1718                 setPersistence(new TestPersistentDataProvider(super.persistence()));
1719             }
1720
1721             @Override
1722             public void handleCommand(final Object message) {
1723                 super.handleCommand(message);
1724
1725                 // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1726                 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1727                     latch.get().countDown();
1728                 }
1729             }
1730
1731             @Override
1732             public RaftActorContext getRaftActorContext() {
1733                 return super.getRaftActorContext();
1734             }
1735         }
1736
1737         final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1738
1739         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1740             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1741
1742         ShardTestKit.waitUntilLeader(shard);
1743         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1744
1745         final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.empty());
1746
1747         // Trigger creation of a snapshot by ensuring
1748         final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1749         raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1750         awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1751
1752         raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1753         awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1754     }
1755
1756     private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1757             final AtomicReference<Object> savedSnapshot, final NormalizedNode expectedRoot)
1758                     throws InterruptedException {
1759         assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1760
1761         assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1762
1763         verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1764
1765         latch.set(new CountDownLatch(1));
1766         savedSnapshot.set(null);
1767     }
1768
1769     private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) {
1770         final NormalizedNode actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get();
1771         assertEquals("Root node", expectedRoot, actual);
1772     }
1773
1774     /**
1775      * This test simply verifies that the applySnapShot logic will work.
1776      */
1777     @Test
1778     public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1779         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1780             SCHEMA_CONTEXT);
1781
1782         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1783         putTransaction.write(TestModel.TEST_PATH,
1784             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1785         commitTransaction(store, putTransaction);
1786
1787
1788         final NormalizedNode expected = readStore(store, YangInstanceIdentifier.empty());
1789
1790         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1791
1792         writeTransaction.delete(YangInstanceIdentifier.empty());
1793         writeTransaction.write(YangInstanceIdentifier.empty(), expected);
1794
1795         commitTransaction(store, writeTransaction);
1796
1797         final NormalizedNode actual = readStore(store, YangInstanceIdentifier.empty());
1798
1799         assertEquals(expected, actual);
1800     }
1801
1802     @Test
1803     public void testRecoveryApplicable() {
1804
1805         final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1806                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1807
1808         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1809                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1810
1811         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1812                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1813
1814         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1815                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1816
1817         final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1818
1819         assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1820
1821         final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1822
1823         assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1824     }
1825
1826     @Test
1827     public void testOnDatastoreContext() {
1828         dataStoreContextBuilder.persistent(true);
1829
1830         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1831
1832         assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1833
1834         ShardTestKit.waitUntilLeader(shard);
1835
1836         shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1837
1838         assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1839
1840         shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1841
1842         assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1843     }
1844
1845     @Test
1846     public void testRegisterRoleChangeListener() {
1847         final ShardTestKit testKit = new ShardTestKit(getSystem());
1848         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1849             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1850             "testRegisterRoleChangeListener");
1851
1852         ShardTestKit.waitUntilLeader(shard);
1853
1854         final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1855
1856         shard.tell(new RegisterRoleChangeListener(), listener);
1857
1858         MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1859
1860         ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1861             ShardLeaderStateChanged.class);
1862         assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1863         assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1864             leaderStateChanged.getLocalShardDataTree().get());
1865
1866         MessageCollectorActor.clearMessages(listener);
1867
1868         // Force a leader change
1869
1870         shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1871
1872         leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1873         assertFalse("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1874     }
1875
1876     @Test
1877     public void testFollowerInitialSyncStatus() {
1878         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1879                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1880                 "testFollowerInitialSyncStatus");
1881
1882         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1883                 "member-1-shard-inventory-operational"));
1884
1885         assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1886
1887         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1888                 "member-1-shard-inventory-operational"));
1889
1890         assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1891     }
1892
1893     @Test
1894     public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1895         final ShardTestKit testKit = new ShardTestKit(getSystem());
1896         final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1897         dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1898             .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1899
1900         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1901         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1902             TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1903
1904         setupInMemorySnapshotStore();
1905
1906         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1907             newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1908             actorFactory.generateActorId(testName + "-shard"));
1909
1910         testKit.waitUntilNoLeader(shard);
1911
1912         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1913         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1914             RegisterDataTreeNotificationListenerReply.class);
1915         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1916
1917         shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1918             .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1919
1920         listener.waitForChangeEvents();
1921     }
1922
1923     @Test
1924     public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1925         final ShardTestKit testKit = new ShardTestKit(getSystem());
1926         final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1927         dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1928             .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1929
1930         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1931         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1932             TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1933
1934         setupInMemorySnapshotStore();
1935
1936         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1937             newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1938             actorFactory.generateActorId(testName + "-shard"));
1939
1940         testKit.waitUntilNoLeader(shard);
1941
1942         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1943         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1944             RegisterDataTreeNotificationListenerReply.class);
1945         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1946
1947         final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1948         regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1949         testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1950
1951         shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1952             .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1953
1954         listener.expectNoMoreChanges("Received unexpected change after close");
1955     }
1956
1957     @Test
1958     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1959         final ShardTestKit testKit = new ShardTestKit(getSystem());
1960         final String testName = "testClusteredDataTreeChangeListenerRegistration";
1961         final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1962             MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1963
1964         final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1965             MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1966
1967         final TestActorRef<Shard> followerShard = actorFactory
1968                 .createTestActor(Shard.builder().id(followerShardID)
1969                     .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1970                     .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1971                         "akka://test/user/" + leaderShardID.toString()))
1972                     .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1973                     .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1974
1975         final TestActorRef<Shard> leaderShard = actorFactory
1976                 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1977                     .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1978                         "akka://test/user/" + followerShardID.toString()))
1979                     .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1980                     .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1981
1982         leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1983         final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1984         assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1985
1986         final YangInstanceIdentifier path = TestModel.TEST_PATH;
1987         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1988         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1989             actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1990
1991         followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1992         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1993             RegisterDataTreeNotificationListenerReply.class);
1994         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1995
1996         writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1997
1998         listener.waitForChangeEvents();
1999     }
2000
2001     @Test
2002     public void testServerRemoved() {
2003         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
2004                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2005
2006         final ActorRef shard = parent.underlyingActor().context().actorOf(
2007                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2008                 "testServerRemoved");
2009
2010         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2011
2012         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2013     }
2014 }