Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.endsWith;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertSame;
18 import static org.junit.Assert.assertTrue;
19 import static org.junit.Assert.fail;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.Mockito.doThrow;
22 import static org.mockito.Mockito.inOrder;
23 import static org.mockito.Mockito.mock;
24 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.Props;
29 import akka.actor.Status.Failure;
30 import akka.dispatch.Dispatchers;
31 import akka.dispatch.OnComplete;
32 import akka.japi.Creator;
33 import akka.pattern.Patterns;
34 import akka.persistence.SaveSnapshotSuccess;
35 import akka.testkit.TestActorRef;
36 import akka.util.Timeout;
37 import com.google.common.base.Stopwatch;
38 import com.google.common.base.Throwables;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.time.Duration;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Map;
44 import java.util.Optional;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicReference;
50 import org.junit.Test;
51 import org.mockito.InOrder;
52 import org.opendaylight.controller.cluster.DataPersistenceProvider;
53 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
54 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
55 import org.opendaylight.controller.cluster.access.concepts.MemberName;
56 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
57 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
62 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
66 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
67 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
73 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
75 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
76 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
77 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
78 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
79 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
80 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
81 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
82 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
83 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
84 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.raft.RaftActorContext;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
89 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
93 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
95 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
97 import org.opendaylight.controller.cluster.raft.messages.Payload;
98 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
99 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
100 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
101 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
102 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
103 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
104 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
105 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
106 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
107 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
108 import org.opendaylight.yangtools.concepts.Identifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
117 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
118 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
119 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
120 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
121 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
122 import scala.concurrent.Await;
123 import scala.concurrent.Future;
124 import scala.concurrent.duration.FiniteDuration;
125
126 public class ShardTest extends AbstractShardTest {
127     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
128             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
129
130     @Test
131     public void testRegisterDataTreeChangeListener() throws Exception {
132         final ShardTestKit testKit = new ShardTestKit(getSystem());
133         final TestActorRef<Shard> shard = actorFactory.createTestActor(
134             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
135             "testRegisterDataTreeChangeListener");
136
137         ShardTestKit.waitUntilLeader(shard);
138
139         shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
140
141         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
142         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
143             TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
144
145         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
146
147         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
148             RegisterDataTreeNotificationListenerReply.class);
149         final String replyPath = reply.getListenerRegistrationPath().toString();
150         assertTrue("Incorrect reply path: " + replyPath,
151             replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
152
153         final YangInstanceIdentifier path = TestModel.TEST_PATH;
154         writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
155
156         listener.waitForChangeEvents();
157         listener.verifyOnInitialDataEvent();
158
159         final MockDataTreeChangeListener listener2 = new MockDataTreeChangeListener(1);
160         final ActorRef dclActor2 = actorFactory.createActor(DataTreeChangeListenerActor.props(listener2,
161             TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener2");
162
163         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor2, false), testKit.getRef());
164
165         testKit.expectMsgClass(Duration.ofSeconds(3), RegisterDataTreeNotificationListenerReply.class);
166
167         listener2.waitForChangeEvents();
168         listener2.verifyNoOnInitialDataEvent();
169     }
170
171     @SuppressWarnings("serial")
172     @Test
173     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
174         final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
175         final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
176         final Creator<Shard> creator = new Creator<>() {
177             boolean firstElectionTimeout = true;
178
179             @Override
180             public Shard create() {
181                 return new Shard(newShardBuilder()) {
182                     @Override
183                     public void handleCommand(final Object message) {
184                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
185                             firstElectionTimeout = false;
186                             final ActorRef self = getSelf();
187                             new Thread(() -> {
188                                 Uninterruptibles.awaitUninterruptibly(
189                                         onChangeListenerRegistered, 5, TimeUnit.SECONDS);
190                                 self.tell(message, self);
191                             }).start();
192
193                             onFirstElectionTimeout.countDown();
194                         } else {
195                             super.handleCommand(message);
196                         }
197                     }
198                 };
199             }
200         };
201
202         setupInMemorySnapshotStore();
203
204         final YangInstanceIdentifier path = TestModel.TEST_PATH;
205         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
206         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
207                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
208
209         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
210                 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
211                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
212
213         final ShardTestKit testKit = new ShardTestKit(getSystem());
214         assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
215
216         shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
217         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
218             RegisterDataTreeNotificationListenerReply.class);
219         assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
220
221         shard.tell(FindLeader.INSTANCE, testKit.getRef());
222         final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
223         assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
224
225         onChangeListenerRegistered.countDown();
226
227         // TODO: investigate why we do not receive data chage events
228         listener.waitForChangeEvents();
229     }
230
231     @Test
232     public void testCreateTransaction() {
233         final ShardTestKit testKit = new ShardTestKit(getSystem());
234         final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
235
236         ShardTestKit.waitUntilLeader(shard);
237
238         shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
239
240         shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
241             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
242
243         final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
244             CreateTransactionReply.class);
245
246         final String path = reply.getTransactionPath().toString();
247         assertThat(path, containsString(String.format("/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
248             shardID.getShardName(), shardID.getMemberName().getName())));
249     }
250
251     @Test
252     public void testCreateTransactionOnChain() {
253         final ShardTestKit testKit = new ShardTestKit(getSystem());
254         final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
255
256         ShardTestKit.waitUntilLeader(shard);
257
258         shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
259             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
260
261         final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
262             CreateTransactionReply.class);
263
264         final String path = reply.getTransactionPath().toString();
265         assertThat(path, containsString(String.format(
266             "/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
267             shardID.getShardName(), shardID.getMemberName().getName())));
268     }
269
270     @Test
271     public void testPeerAddressResolved() {
272         final ShardTestKit testKit = new ShardTestKit(getSystem());
273         final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
274                 "config");
275         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
276             .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
277             .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
278
279         final String address = "akka://foobar";
280         shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
281
282         shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
283         final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
284         assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
285     }
286
287     @Test
288     public void testApplySnapshot() throws Exception {
289
290         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
291                 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
292
293         ShardTestKit.waitUntilLeader(shard);
294
295         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
296             SCHEMA_CONTEXT);
297
298         final ContainerNode container = Builders.containerBuilder()
299             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
300             .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
301                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1))
302                 .build())
303             .build();
304
305         writeToStore(store, TestModel.TEST_PATH, container);
306
307         final YangInstanceIdentifier root = YangInstanceIdentifier.of();
308         final NormalizedNode expected = readStore(store, root);
309
310         final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
311                 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
312
313         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
314
315         final Stopwatch sw = Stopwatch.createStarted();
316         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
317             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
318
319             try {
320                 assertEquals("Root node", expected, readStore(shard, root));
321                 return;
322             } catch (final AssertionError e) {
323                 // try again
324             }
325         }
326
327         fail("Snapshot was not applied");
328     }
329
330     @Test
331     public void testApplyState() throws Exception {
332         final TestActorRef<Shard> shard = actorFactory.createTestActor(
333                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
334
335         ShardTestKit.waitUntilLeader(shard);
336
337         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
338             SCHEMA_CONTEXT);
339
340         final DataTreeModification writeMod = store.takeSnapshot().newModification();
341         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
342         writeMod.write(TestModel.TEST_PATH, node);
343         writeMod.ready();
344
345         final TransactionIdentifier tx = nextTransactionId();
346         shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
347
348         final Stopwatch sw = Stopwatch.createStarted();
349         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
350             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
351
352             final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH);
353             if (actual != null) {
354                 assertEquals("Applied state", node, actual);
355                 return;
356             }
357         }
358
359         fail("State was not applied");
360     }
361
362     @Test
363     public void testDataTreeCandidateRecovery() throws Exception {
364         // Set up the InMemorySnapshotStore.
365         final DataTree source = setupInMemorySnapshotStore();
366
367         final DataTreeModification writeMod = source.takeSnapshot().newModification();
368         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
369         writeMod.ready();
370         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
371
372         // Set up the InMemoryJournal.
373         InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
374             payloadForModification(source, writeMod, nextTransactionId())));
375
376         final int nListEntries = 16;
377         final Set<Integer> listEntryKeys = new HashSet<>();
378
379         // Add some ModificationPayload entries
380         for (int i = 1; i <= nListEntries; i++) {
381             listEntryKeys.add(i);
382
383             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
384                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
385
386             final DataTreeModification mod = source.takeSnapshot().newModification();
387             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
388             mod.ready();
389
390             InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
391                 payloadForModification(source, mod, nextTransactionId())));
392         }
393
394         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
395             new ApplyJournalEntries(nListEntries));
396
397         testRecovery(listEntryKeys, true);
398     }
399
400     @Test
401     @SuppressWarnings("checkstyle:IllegalCatch")
402     public void testConcurrentThreePhaseCommits() throws Exception {
403         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
404         final CountDownLatch commitLatch = new CountDownLatch(2);
405
406         final long timeoutSec = 5;
407         final Duration duration = Duration.ofSeconds(timeoutSec);
408         final Timeout timeout = Timeout.create(duration);
409
410         final TestActorRef<Shard> shard = actorFactory.createTestActor(
411                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
412                 "testConcurrentThreePhaseCommits");
413
414         class OnFutureComplete extends OnComplete<Object> {
415             private final Class<?> expRespType;
416
417             OnFutureComplete(final Class<?> expRespType) {
418                 this.expRespType = expRespType;
419             }
420
421             @Override
422             public void onComplete(final Throwable error, final Object resp) {
423                 if (error != null) {
424                     caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
425                 } else {
426                     try {
427                         assertEquals("Commit response type", expRespType, resp.getClass());
428                         onSuccess(resp);
429                     } catch (final Exception e) {
430                         caughtEx.set(e);
431                     }
432                 }
433             }
434
435             void onSuccess(final Object resp) {
436             }
437         }
438
439         class OnCommitFutureComplete extends OnFutureComplete {
440             OnCommitFutureComplete() {
441                 super(CommitTransactionReply.class);
442             }
443
444             @Override
445             public void onComplete(final Throwable error, final Object resp) {
446                 super.onComplete(error, resp);
447                 commitLatch.countDown();
448             }
449         }
450
451         class OnCanCommitFutureComplete extends OnFutureComplete {
452             private final TransactionIdentifier transactionID;
453
454             OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
455                 super(CanCommitTransactionReply.class);
456                 this.transactionID = transactionID;
457             }
458
459             @Override
460             void onSuccess(final Object resp) {
461                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
462                 assertTrue("Can commit", canCommitReply.getCanCommit());
463
464                 final Future<Object> commitFuture = Patterns.ask(shard,
465                         new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
466                 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
467             }
468         }
469
470         final ShardTestKit testKit = new ShardTestKit(getSystem());
471         ShardTestKit.waitUntilLeader(shard);
472
473         final TransactionIdentifier transactionID1 = nextTransactionId();
474         final TransactionIdentifier transactionID2 = nextTransactionId();
475         final TransactionIdentifier transactionID3 = nextTransactionId();
476
477         final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
478             shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
479         final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
480         final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
481         final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
482
483         shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
484             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
485         final ReadyTransactionReply readyReply = ReadyTransactionReply
486                 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
487
488         String pathSuffix = shard.path().toString().replaceFirst("akka://test", "");
489         assertThat(readyReply.getCohortPath(), endsWith(pathSuffix));
490         // Send the CanCommitTransaction message for the first Tx.
491
492         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
493         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
494                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
495         assertTrue("Can commit", canCommitReply.getCanCommit());
496
497         // Ready 2 more Tx's.
498
499         shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
500             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
501         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
502
503         shard.tell(
504             prepareBatchedModifications(transactionID3,
505                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
506                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
507                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
508         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
509
510         // Send the CanCommitTransaction message for the next 2 Tx's.
511         // These should get queued and
512         // processed after the first Tx completes.
513
514         final Future<Object> canCommitFuture1 = Patterns.ask(shard,
515             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
516
517         final Future<Object> canCommitFuture2 = Patterns.ask(shard,
518             new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
519
520         // Send the CommitTransaction message for the first Tx. After it
521         // completes, it should
522         // trigger the 2nd Tx to proceed which should in turn then
523         // trigger the 3rd.
524
525         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
526         testKit.expectMsgClass(duration, CommitTransactionReply.class);
527
528         // Wait for the next 2 Tx's to complete.
529
530         canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
531
532         canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
533
534         final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
535
536         final Throwable t = caughtEx.get();
537         if (t != null) {
538             Throwables.propagateIfPossible(t, Exception.class);
539             throw new RuntimeException(t);
540         }
541
542         assertTrue("Commits complete", done);
543
544 //                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
545 //                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
546 //                        cohort3.getPreCommit(), cohort3.getCommit());
547 //                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
548 //                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
549 //                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
550 //                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
551 //                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
552 //                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
553 //                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
554 //                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
555 //                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
556
557         // Verify data in the data store.
558
559         verifyOuterListEntry(shard, 1);
560
561         verifyLastApplied(shard, 3);
562     }
563
564     @Test
565     public void testBatchedModificationsWithNoCommitOnReady() {
566         final ShardTestKit testKit = new ShardTestKit(getSystem());
567         final TestActorRef<Shard> shard = actorFactory.createTestActor(
568             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
569             "testBatchedModificationsWithNoCommitOnReady");
570
571         ShardTestKit.waitUntilLeader(shard);
572
573         final TransactionIdentifier transactionID = nextTransactionId();
574         final Duration duration = Duration.ofSeconds(5);
575
576         // Send a BatchedModifications to start a transaction.
577
578         shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
579             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
580         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
581
582         // Send a couple more BatchedModifications.
583
584         shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
585                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
586             testKit.getRef());
587         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
588
589         shard.tell(newBatchedModifications(transactionID,
590             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
591             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
592             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
593             testKit.getRef());
594         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
595
596         // Send the CanCommitTransaction message.
597
598         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
599         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
600                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
601         assertTrue("Can commit", canCommitReply.getCanCommit());
602
603         // Send the CommitTransaction message.
604
605         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
606         testKit.expectMsgClass(duration, CommitTransactionReply.class);
607
608         // Verify data in the data store.
609
610         verifyOuterListEntry(shard, 1);
611     }
612
613     @Test
614     public void testBatchedModificationsWithCommitOnReady() {
615         final ShardTestKit testKit = new ShardTestKit(getSystem());
616         final TestActorRef<Shard> shard = actorFactory.createTestActor(
617             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
618             "testBatchedModificationsWithCommitOnReady");
619
620         ShardTestKit.waitUntilLeader(shard);
621
622         final TransactionIdentifier transactionID = nextTransactionId();
623         final Duration duration = Duration.ofSeconds(5);
624
625         // Send a BatchedModifications to start a transaction.
626
627         shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
628             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
629         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
630
631         // Send a couple more BatchedModifications.
632
633         shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
634             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
635             testKit.getRef());
636         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
637
638         shard.tell(newBatchedModifications(transactionID,
639             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
640             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
641             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
642             testKit.getRef());
643
644         testKit.expectMsgClass(duration, CommitTransactionReply.class);
645
646         // Verify data in the data store.
647         verifyOuterListEntry(shard, 1);
648     }
649
650     @Deprecated(since = "9.0.0", forRemoval = true)
651     @Test(expected = IllegalStateException.class)
652     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
653         final ShardTestKit testKit = new ShardTestKit(getSystem());
654         final TestActorRef<Shard> shard = actorFactory.createTestActor(
655             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
656             "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
657
658         ShardTestKit.waitUntilLeader(shard);
659
660         final TransactionIdentifier transactionID = nextTransactionId();
661         final BatchedModifications batched = new BatchedModifications(transactionID,
662             DataStoreVersions.CURRENT_VERSION);
663         batched.setReady();
664         batched.setTotalMessagesSent(2);
665
666         shard.tell(batched, testKit.getRef());
667
668         final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
669
670         if (failure != null) {
671             Throwables.propagateIfPossible(failure.cause(), Exception.class);
672             throw new RuntimeException(failure.cause());
673         }
674     }
675
676     @Test
677     @Deprecated(since = "9.0.0", forRemoval = true)
678     public void testBatchedModificationsWithOperationFailure() {
679         final ShardTestKit testKit = new ShardTestKit(getSystem());
680         final TestActorRef<Shard> shard = actorFactory.createTestActor(
681             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
682             "testBatchedModificationsWithOperationFailure");
683
684         ShardTestKit.waitUntilLeader(shard);
685
686         // Test merge with invalid data. An exception should occur when
687         // the merge is applied. Note that
688         // write will not validate the children for performance reasons.
689
690         final TransactionIdentifier transactionID = nextTransactionId();
691
692         final ContainerNode invalidData = Builders.containerBuilder()
693             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
694             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
695             .build();
696
697         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
698         batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
699         shard.tell(batched, testKit.getRef());
700         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
701
702         final Throwable cause = failure.cause();
703
704         batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
705         batched.setReady();
706         batched.setTotalMessagesSent(2);
707
708         shard.tell(batched, testKit.getRef());
709
710         failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
711         assertEquals("Failure cause", cause, failure.cause());
712     }
713
714     @Test
715     public void testBatchedModificationsOnTransactionChain() {
716         final ShardTestKit testKit = new ShardTestKit(getSystem());
717         final TestActorRef<Shard> shard = actorFactory.createTestActor(
718             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
719             "testBatchedModificationsOnTransactionChain");
720
721         ShardTestKit.waitUntilLeader(shard);
722
723         final LocalHistoryIdentifier historyId = nextHistoryId();
724         final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
725         final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
726
727         final Duration duration = Duration.ofSeconds(5);
728
729         // Send a BatchedModifications to start a chained write
730         // transaction and ready it.
731
732         final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
733         final YangInstanceIdentifier path = TestModel.TEST_PATH;
734         shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
735         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
736
737         // Create a read Tx on the same chain.
738
739         shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
740             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
741
742         final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
743             CreateTransactionReply.class);
744
745         getSystem().actorSelection(createReply.getTransactionPath())
746         .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
747         final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
748         assertEquals("Read node", containerNode, readReply.getNormalizedNode());
749
750         // Commit the write transaction.
751
752         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
753         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
754                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
755         assertTrue("Can commit", canCommitReply.getCanCommit());
756
757         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
758         testKit.expectMsgClass(duration, CommitTransactionReply.class);
759
760         // Verify data in the data store.
761
762         final NormalizedNode actualNode = readStore(shard, path);
763         assertEquals("Stored node", containerNode, actualNode);
764     }
765
766     @Test
767     @Deprecated(since = "9.0.0", forRemoval = true)
768     public void testOnBatchedModificationsWhenNotLeader() {
769         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
770         final ShardTestKit testKit = new ShardTestKit(getSystem());
771         final Creator<Shard> creator = new Creator<>() {
772             private static final long serialVersionUID = 1L;
773
774             @Override
775             public Shard create() {
776                 return new Shard(newShardBuilder()) {
777                     @Override
778                     protected boolean isLeader() {
779                         return overrideLeaderCalls.get() ? false : super.isLeader();
780                     }
781
782                     @Override
783                     public ActorSelection getLeader() {
784                         return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
785                                 : super.getLeader();
786                     }
787                 };
788             }
789         };
790
791         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
792             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
793             "testOnBatchedModificationsWhenNotLeader");
794
795         ShardTestKit.waitUntilLeader(shard);
796
797         overrideLeaderCalls.set(true);
798
799         final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
800             DataStoreVersions.CURRENT_VERSION);
801
802         shard.tell(batched, ActorRef.noSender());
803
804         testKit.expectMsgEquals(batched);
805     }
806
807     @Test
808     @Deprecated(since = "9.0.0", forRemoval = true)
809     public void testTransactionMessagesWithNoLeader() {
810         final ShardTestKit testKit = new ShardTestKit(getSystem());
811         dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
812         .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
813         final TestActorRef<Shard> shard = actorFactory.createTestActor(
814             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
815             "testTransactionMessagesWithNoLeader");
816
817         testKit.waitUntilNoLeader(shard);
818
819         final TransactionIdentifier txId = nextTransactionId();
820         shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
821         Failure failure = testKit.expectMsgClass(Failure.class);
822         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
823
824         shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
825             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
826         failure = testKit.expectMsgClass(Failure.class);
827         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
828
829         shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
830             testKit.getRef());
831         failure = testKit.expectMsgClass(Failure.class);
832         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
833     }
834
835     @Test
836     public void testReadyWithReadWriteImmediateCommit() {
837         testReadyWithImmediateCommit(true);
838     }
839
840     @Test
841     public void testReadyWithWriteOnlyImmediateCommit() {
842         testReadyWithImmediateCommit(false);
843     }
844
845     private void testReadyWithImmediateCommit(final boolean readWrite) {
846         final ShardTestKit testKit = new ShardTestKit(getSystem());
847         final TestActorRef<Shard> shard = actorFactory.createTestActor(
848             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
849             "testReadyWithImmediateCommit-" + readWrite);
850
851         ShardTestKit.waitUntilLeader(shard);
852
853         final TransactionIdentifier transactionID = nextTransactionId();
854         final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
855         if (readWrite) {
856             shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
857                 testKit.getRef());
858         } else {
859             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
860                 testKit.getRef());
861         }
862
863         testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
864
865         final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
866         assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
867     }
868
869     @Test
870     public void testReadyLocalTransactionWithImmediateCommit() {
871         final ShardTestKit testKit = new ShardTestKit(getSystem());
872         final TestActorRef<Shard> shard = actorFactory.createTestActor(
873             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
874             "testReadyLocalTransactionWithImmediateCommit");
875
876         ShardTestKit.waitUntilLeader(shard);
877
878         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
879
880         final DataTreeModification modification = dataStore.newModification();
881
882         final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
883         new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
884         final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
885                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
886                 .build();
887         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
888
889         final TransactionIdentifier txId = nextTransactionId();
890         modification.ready();
891         final ReadyLocalTransaction readyMessage =
892                 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
893
894         shard.tell(readyMessage, testKit.getRef());
895
896         testKit.expectMsgClass(CommitTransactionReply.class);
897
898         final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
899         assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
900     }
901
902     @Test
903     public void testReadyLocalTransactionWithThreePhaseCommit() {
904         final ShardTestKit testKit = new ShardTestKit(getSystem());
905         final TestActorRef<Shard> shard = actorFactory.createTestActor(
906             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
907             "testReadyLocalTransactionWithThreePhaseCommit");
908
909         ShardTestKit.waitUntilLeader(shard);
910
911         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
912
913         final DataTreeModification modification = dataStore.newModification();
914
915         final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
916         new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
917         final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
918                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
919                 .build();
920         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
921
922         final TransactionIdentifier txId = nextTransactionId();
923         modification.ready();
924         final ReadyLocalTransaction readyMessage =
925                 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
926
927         shard.tell(readyMessage, testKit.getRef());
928
929         testKit.expectMsgClass(ReadyTransactionReply.class);
930
931         // Send the CanCommitTransaction message.
932
933         shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
934         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
935                 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
936         assertTrue("Can commit", canCommitReply.getCanCommit());
937
938         // Send the CanCommitTransaction message.
939
940         shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
941         testKit.expectMsgClass(CommitTransactionReply.class);
942
943         final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
944         assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
945     }
946
947     @Test
948     public void testReadWriteCommitWithPersistenceDisabled() {
949         dataStoreContextBuilder.persistent(false);
950         final ShardTestKit testKit = new ShardTestKit(getSystem());
951         final TestActorRef<Shard> shard = actorFactory.createTestActor(
952             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
953             "testCommitWithPersistenceDisabled");
954
955         ShardTestKit.waitUntilLeader(shard);
956
957         // Setup a simulated transactions with a mock cohort.
958
959         final Duration duration = Duration.ofSeconds(5);
960
961         final TransactionIdentifier transactionID = nextTransactionId();
962         final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
963         shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
964             testKit.getRef());
965         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
966
967         // Send the CanCommitTransaction message.
968
969         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
970         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
971                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
972         assertTrue("Can commit", canCommitReply.getCanCommit());
973
974         // Send the CanCommitTransaction message.
975
976         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
977         testKit.expectMsgClass(duration, CommitTransactionReply.class);
978
979         final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
980         assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
981     }
982
983     @Test
984     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
985         testCommitWhenTransactionHasModifications(true);
986     }
987
988     @Test
989     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
990         testCommitWhenTransactionHasModifications(false);
991     }
992
993     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
994         final ShardTestKit testKit = new ShardTestKit(getSystem());
995         final DataTree dataTree = createDelegatingMockDataTree();
996         final TestActorRef<Shard> shard = actorFactory.createTestActor(
997             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
998             "testCommitWhenTransactionHasModifications-" + readWrite);
999
1000         ShardTestKit.waitUntilLeader(shard);
1001
1002         final Duration duration = Duration.ofSeconds(5);
1003         final TransactionIdentifier transactionID = nextTransactionId();
1004
1005         if (readWrite) {
1006             shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1007                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1008         } else {
1009             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1010                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1011         }
1012
1013         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1014
1015         // Send the CanCommitTransaction message.
1016
1017         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1018         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1019                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1020         assertTrue("Can commit", canCommitReply.getCanCommit());
1021
1022         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1023         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1024
1025         final InOrder inOrder = inOrder(dataTree);
1026         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1027         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1028         inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1029
1030         // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1031         // the journal
1032         Thread.sleep(HEARTBEAT_MILLIS * 2);
1033
1034         shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1035         final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1036
1037         // Use MBean for verification
1038         // Committed transaction count should increase as usual
1039         assertEquals(1, shardStats.getCommittedTransactionsCount());
1040
1041         // Commit index should advance 1 to account for disabling metadata
1042         assertEquals(1, shardStats.getCommitIndex());
1043     }
1044
1045     @Test
1046     public void testCommitPhaseFailure() throws Exception {
1047         final ShardTestKit testKit = new ShardTestKit(getSystem());
1048         final DataTree dataTree = createDelegatingMockDataTree();
1049         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1050             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1051             "testCommitPhaseFailure");
1052
1053         ShardTestKit.waitUntilLeader(shard);
1054
1055         final Duration duration = Duration.ofSeconds(5);
1056         final Timeout timeout = Timeout.create(duration);
1057
1058         // Setup 2 simulated transactions with mock cohorts. The first
1059         // one fails in the
1060         // commit phase.
1061
1062         doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1063         .commit(any(DataTreeCandidate.class));
1064
1065         final TransactionIdentifier transactionID1 = nextTransactionId();
1066         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1067             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1068         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1069
1070         final TransactionIdentifier transactionID2 = nextTransactionId();
1071         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1072             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1073         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1074
1075         // Send the CanCommitTransaction message for the first Tx.
1076
1077         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1078         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1079                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1080         assertTrue("Can commit", canCommitReply.getCanCommit());
1081
1082         // Send the CanCommitTransaction message for the 2nd Tx. This
1083         // should get queued and
1084         // processed after the first Tx completes.
1085
1086         final Future<Object> canCommitFuture = Patterns.ask(shard,
1087             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1088
1089         // Send the CommitTransaction message for the first Tx. This
1090         // should send back an error
1091         // and trigger the 2nd Tx to proceed.
1092
1093         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1094         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1095
1096         // Wait for the 2nd Tx to complete the canCommit phase.
1097
1098         final CountDownLatch latch = new CountDownLatch(1);
1099         canCommitFuture.onComplete(new OnComplete<>() {
1100             @Override
1101             public void onComplete(final Throwable failure, final Object resp) {
1102                 latch.countDown();
1103             }
1104         }, getSystem().dispatcher());
1105
1106         assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1107
1108         final InOrder inOrder = inOrder(dataTree);
1109         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1110         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1111
1112         // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1113         //        validate performs wrapping and we capture that mock
1114         // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1115
1116         inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1117     }
1118
1119     @Test
1120     public void testPreCommitPhaseFailure() throws Exception {
1121         final ShardTestKit testKit = new ShardTestKit(getSystem());
1122         final DataTree dataTree = createDelegatingMockDataTree();
1123         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1124             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1125             "testPreCommitPhaseFailure");
1126
1127         ShardTestKit.waitUntilLeader(shard);
1128
1129         final Duration duration = Duration.ofSeconds(5);
1130         final Timeout timeout = Timeout.create(duration);
1131
1132         doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1133         .prepare(any(DataTreeModification.class));
1134
1135         final TransactionIdentifier transactionID1 = nextTransactionId();
1136         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1137             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1138         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1139
1140         final TransactionIdentifier transactionID2 = nextTransactionId();
1141         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1142             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1143         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1144
1145         // Send the CanCommitTransaction message for the first Tx.
1146
1147         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1148         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1149                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1150         assertTrue("Can commit", canCommitReply.getCanCommit());
1151
1152         // Send the CanCommitTransaction message for the 2nd Tx. This
1153         // should get queued and
1154         // processed after the first Tx completes.
1155
1156         final Future<Object> canCommitFuture = Patterns.ask(shard,
1157             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1158
1159         // Send the CommitTransaction message for the first Tx. This
1160         // should send back an error
1161         // and trigger the 2nd Tx to proceed.
1162
1163         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1164         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1165
1166         // Wait for the 2nd Tx to complete the canCommit phase.
1167
1168         final CountDownLatch latch = new CountDownLatch(1);
1169         canCommitFuture.onComplete(new OnComplete<>() {
1170             @Override
1171             public void onComplete(final Throwable failure, final Object resp) {
1172                 latch.countDown();
1173             }
1174         }, getSystem().dispatcher());
1175
1176         assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1177
1178         final InOrder inOrder = inOrder(dataTree);
1179         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1180         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1181         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1182     }
1183
1184     @Test
1185     public void testCanCommitPhaseFailure() throws Exception {
1186         final ShardTestKit testKit = new ShardTestKit(getSystem());
1187         final DataTree dataTree = createDelegatingMockDataTree();
1188         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1189             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1190             "testCanCommitPhaseFailure");
1191
1192         ShardTestKit.waitUntilLeader(shard);
1193
1194         final Duration duration = Duration.ofSeconds(5);
1195         final TransactionIdentifier transactionID1 = nextTransactionId();
1196
1197         doThrow(new DataValidationFailedException(YangInstanceIdentifier.of(), "mock canCommit failure"))
1198         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1199
1200         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1201             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1202         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1203
1204         // Send the CanCommitTransaction message.
1205
1206         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1207         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1208
1209         // Send another can commit to ensure the failed one got cleaned
1210         // up.
1211
1212         final TransactionIdentifier transactionID2 = nextTransactionId();
1213         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1214             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1215         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1216
1217         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1218         final CanCommitTransactionReply reply = CanCommitTransactionReply
1219                 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1220         assertTrue("getCanCommit", reply.getCanCommit());
1221     }
1222
1223     @Test
1224     public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1225         testImmediateCommitWithCanCommitPhaseFailure(true);
1226         testImmediateCommitWithCanCommitPhaseFailure(false);
1227     }
1228
1229     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1230         final ShardTestKit testKit = new ShardTestKit(getSystem());
1231         final DataTree dataTree = createDelegatingMockDataTree();
1232         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1233             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1234             "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1235
1236         ShardTestKit.waitUntilLeader(shard);
1237
1238         doThrow(new DataValidationFailedException(YangInstanceIdentifier.of(), "mock canCommit failure"))
1239         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1240
1241         final Duration duration = Duration.ofSeconds(5);
1242
1243         final TransactionIdentifier transactionID1 = nextTransactionId();
1244
1245         if (readWrite) {
1246             shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1247                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1248         } else {
1249             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1250                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1251         }
1252
1253         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1254
1255         // Send another can commit to ensure the failed one got cleaned
1256         // up.
1257
1258         final TransactionIdentifier transactionID2 = nextTransactionId();
1259         if (readWrite) {
1260             shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1261                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1262         } else {
1263             shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1264                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1265         }
1266
1267         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1268     }
1269
1270     @Test
1271     public void testAbortWithCommitPending() {
1272         final ShardTestKit testKit = new ShardTestKit(getSystem());
1273         final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1274             @Override
1275             void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
1276                 // Simulate an AbortTransaction message occurring during
1277                 // replication, after
1278                 // persisting and before finishing the commit to the
1279                 // in-memory store.
1280
1281                 doAbortTransaction(id, null);
1282                 super.persistPayload(id, payload, batchHint);
1283             }
1284         };
1285
1286         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1287             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1288             "testAbortWithCommitPending");
1289
1290         ShardTestKit.waitUntilLeader(shard);
1291
1292         final Duration duration = Duration.ofSeconds(5);
1293
1294         final TransactionIdentifier transactionID = nextTransactionId();
1295
1296         shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1297             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1298         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1299
1300         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1301         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1302
1303         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1304         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1305
1306         final NormalizedNode node = readStore(shard, TestModel.TEST_PATH);
1307
1308         // Since we're simulating an abort occurring during replication
1309         // and before finish commit,
1310         // the data should still get written to the in-memory store
1311         // since we've gotten past
1312         // canCommit and preCommit and persisted the data.
1313         assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1314     }
1315
1316     @Test
1317     public void testTransactionCommitTimeout() throws Exception {
1318         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1319         final ShardTestKit testKit = new ShardTestKit(getSystem());
1320         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1321             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1322             "testTransactionCommitTimeout");
1323
1324         ShardTestKit.waitUntilLeader(shard);
1325
1326         final Duration duration = Duration.ofSeconds(5);
1327
1328         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1329         writeToStore(shard, TestModel.OUTER_LIST_PATH,
1330             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1331
1332         // Ready 2 Tx's - the first will timeout
1333
1334         final TransactionIdentifier transactionID1 = nextTransactionId();
1335         shard.tell(
1336             prepareBatchedModifications(transactionID1,
1337                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1338                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1339                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1340             testKit.getRef());
1341         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1342
1343         final TransactionIdentifier transactionID2 = nextTransactionId();
1344         final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1345                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1346         shard.tell(
1347             prepareBatchedModifications(transactionID2, listNodePath,
1348                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1349         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1350
1351         // canCommit 1st Tx. We don't send the commit so it should
1352         // timeout.
1353
1354         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1355         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1356
1357         // canCommit the 2nd Tx - it should complete after the 1st Tx
1358         // times out.
1359
1360         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1361         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1362
1363         // Try to commit the 1st Tx - should fail as it's not the
1364         // current Tx.
1365
1366         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1367         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1368
1369         // Commit the 2nd Tx.
1370
1371         shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1372         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1373
1374         final NormalizedNode node = readStore(shard, listNodePath);
1375         assertNotNull(listNodePath + " not found", node);
1376     }
1377
1378 //    @Test
1379 //    @Ignore
1380 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1381 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1382 //
1383 //        new ShardTestKit(getSystem()) {{
1384 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1385 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1386 //                    "testTransactionCommitQueueCapacityExceeded");
1387 //
1388 //            waitUntilLeader(shard);
1389 //
1390 //            final FiniteDuration duration = duration("5 seconds");
1391 //
1392 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1393 //
1394 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1395 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1396 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1397 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1398 //                    modification1);
1399 //
1400 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1401 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1402 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1403 //                    TestModel.OUTER_LIST_PATH,
1404 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1405 //                    modification2);
1406 //
1407 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1408 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1409 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1410 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1411 //                    modification3);
1412 //
1413 //            // Ready the Tx's
1414 //
1415 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1416 //                    modification1), getRef());
1417 //            expectMsgClass(duration, ReadyTransactionReply.class);
1418 //
1419 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1420 //                    modification2), getRef());
1421 //            expectMsgClass(duration, ReadyTransactionReply.class);
1422 //
1423 //            // The 3rd Tx should exceed queue capacity and fail.
1424 //
1425 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1426 //                    modification3), getRef());
1427 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1428 //
1429 //            // canCommit 1st Tx.
1430 //
1431 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1432 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1433 //
1434 //            // canCommit the 2nd Tx - it should get queued.
1435 //
1436 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1437 //
1438 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1439 //
1440 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1441 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1442 //        }};
1443 //    }
1444
1445     @Test
1446     public void testTransactionCommitWithPriorExpiredCohortEntries() {
1447         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1448         final ShardTestKit testKit = new ShardTestKit(getSystem());
1449         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1450             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1451             "testTransactionCommitWithPriorExpiredCohortEntries");
1452
1453         ShardTestKit.waitUntilLeader(shard);
1454
1455         final Duration duration = Duration.ofSeconds(5);
1456
1457         final TransactionIdentifier transactionID1 = nextTransactionId();
1458         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1459             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1460         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1461
1462         final TransactionIdentifier transactionID2 = nextTransactionId();
1463         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1464             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1465         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1466
1467         final TransactionIdentifier transactionID3 = nextTransactionId();
1468         shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1469             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1470         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1471
1472         // All Tx's are readied. We'll send canCommit for the last one
1473         // but not the others. The others
1474         // should expire from the queue and the last one should be
1475         // processed.
1476
1477         shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1478         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1479     }
1480
1481     @Test
1482     public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1483         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1484         final ShardTestKit testKit = new ShardTestKit(getSystem());
1485         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1486             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1487             "testTransactionCommitWithSubsequentExpiredCohortEntry");
1488
1489         ShardTestKit.waitUntilLeader(shard);
1490
1491         final Duration duration = Duration.ofSeconds(5);
1492
1493         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1494
1495         final TransactionIdentifier transactionID1 = nextTransactionId();
1496         shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1497             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1498         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1499
1500         // CanCommit the first Tx so it's the current in-progress Tx.
1501
1502         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1503         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1504
1505         // Ready the second Tx.
1506
1507         final TransactionIdentifier transactionID2 = nextTransactionId();
1508         shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1509             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1510         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1511
1512         // Ready the third Tx.
1513
1514         final TransactionIdentifier transactionID3 = nextTransactionId();
1515         final DataTreeModification modification3 = dataStore.newModification();
1516         new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1517             .apply(modification3);
1518         modification3.ready();
1519         final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1520             true, Optional.empty());
1521         shard.tell(readyMessage, testKit.getRef());
1522
1523         // Commit the first Tx. After completing, the second should
1524         // expire from the queue and the third
1525         // Tx committed.
1526
1527         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1528         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1529
1530         // Expect commit reply from the third Tx.
1531
1532         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1533
1534         final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH);
1535         assertNotNull(TestModel.TEST2_PATH + " not found", node);
1536     }
1537
1538     @Test
1539     public void testCanCommitBeforeReadyFailure() {
1540         final ShardTestKit testKit = new ShardTestKit(getSystem());
1541         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1542             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1543             "testCanCommitBeforeReadyFailure");
1544
1545         shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1546         testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
1547     }
1548
1549     @Test
1550     public void testAbortAfterCanCommit() throws Exception {
1551         final ShardTestKit testKit = new ShardTestKit(getSystem());
1552         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1553             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1554
1555         ShardTestKit.waitUntilLeader(shard);
1556
1557         final Duration duration = Duration.ofSeconds(5);
1558         final Timeout timeout = Timeout.create(duration);
1559
1560         // Ready 2 transactions - the first one will be aborted.
1561
1562         final TransactionIdentifier transactionID1 = nextTransactionId();
1563         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1564             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1565         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1566
1567         final TransactionIdentifier transactionID2 = nextTransactionId();
1568         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1569             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1570         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1571
1572         // Send the CanCommitTransaction message for the first Tx.
1573
1574         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1575         CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1576                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1577         assertTrue("Can commit", canCommitReply.getCanCommit());
1578
1579         // Send the CanCommitTransaction message for the 2nd Tx. This
1580         // should get queued and
1581         // processed after the first Tx completes.
1582
1583         final Future<Object> canCommitFuture = Patterns.ask(shard,
1584             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1585
1586         // Send the AbortTransaction message for the first Tx. This
1587         // should trigger the 2nd
1588         // Tx to proceed.
1589
1590         shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1591         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1592
1593         // Wait for the 2nd Tx to complete the canCommit phase.
1594
1595         canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
1596             FiniteDuration.create(5, TimeUnit.SECONDS));
1597         assertTrue("Can commit", canCommitReply.getCanCommit());
1598     }
1599
1600     @Test
1601     public void testAbortAfterReady() {
1602         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1603         final ShardTestKit testKit = new ShardTestKit(getSystem());
1604         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1605             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1606
1607         ShardTestKit.waitUntilLeader(shard);
1608
1609         final Duration duration = Duration.ofSeconds(5);
1610
1611         // Ready a tx.
1612
1613         final TransactionIdentifier transactionID1 = nextTransactionId();
1614         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1615             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1616         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1617
1618         // Send the AbortTransaction message.
1619
1620         shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1621         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1622
1623         assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1624
1625         // Now send CanCommitTransaction - should fail.
1626
1627         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1628         final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1629         assertTrue("Failure type", failure instanceof IllegalStateException);
1630
1631         // Ready and CanCommit another and verify success.
1632
1633         final TransactionIdentifier transactionID2 = nextTransactionId();
1634         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1635             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1636         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1637
1638         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1639         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1640     }
1641
1642     @Test
1643     public void testAbortQueuedTransaction() {
1644         final ShardTestKit testKit = new ShardTestKit(getSystem());
1645         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1646             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1647
1648         ShardTestKit.waitUntilLeader(shard);
1649
1650         final Duration duration = Duration.ofSeconds(5);
1651
1652         // Ready 3 tx's.
1653
1654         final TransactionIdentifier transactionID1 = nextTransactionId();
1655         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1656             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1657         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1658
1659         final TransactionIdentifier transactionID2 = nextTransactionId();
1660         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1661             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1662         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1663
1664         final TransactionIdentifier transactionID3 = nextTransactionId();
1665         shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1666                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1667         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1668
1669         // Abort the second tx while it's queued.
1670
1671         shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1672         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1673
1674         // Commit the other 2.
1675
1676         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1677         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1678
1679         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1680         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1681
1682         shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1683         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1684
1685         shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1686         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1687
1688         assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1689     }
1690
1691     @Test
1692     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1693         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1694     }
1695
1696     @Test
1697     public void testCreateSnapshot() throws Exception {
1698         testCreateSnapshot(true, "testCreateSnapshot");
1699     }
1700
1701     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1702         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1703
1704         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1705         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1706             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1707                 super(delegate);
1708             }
1709
1710             @Override
1711             public void saveSnapshot(final Object obj) {
1712                 savedSnapshot.set(obj);
1713                 super.saveSnapshot(obj);
1714             }
1715         }
1716
1717         dataStoreContextBuilder.persistent(persistent);
1718
1719         final class TestShard extends Shard {
1720
1721             TestShard(final AbstractBuilder<?, ?> builder) {
1722                 super(builder);
1723                 setPersistence(new TestPersistentDataProvider(super.persistence()));
1724             }
1725
1726             @Override
1727             public void handleCommand(final Object message) {
1728                 super.handleCommand(message);
1729
1730                 // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1731                 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1732                     latch.get().countDown();
1733                 }
1734             }
1735
1736             @Override
1737             public RaftActorContext getRaftActorContext() {
1738                 return super.getRaftActorContext();
1739             }
1740         }
1741
1742         final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1743
1744         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1745             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1746
1747         ShardTestKit.waitUntilLeader(shard);
1748         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1749
1750         final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.of());
1751
1752         // Trigger creation of a snapshot by ensuring
1753         final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1754         raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1755         awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1756
1757         raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1758         awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1759     }
1760
1761     private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1762             final AtomicReference<Object> savedSnapshot, final NormalizedNode expectedRoot)
1763                     throws InterruptedException {
1764         assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1765
1766         assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1767
1768         verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1769
1770         latch.set(new CountDownLatch(1));
1771         savedSnapshot.set(null);
1772     }
1773
1774     private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) {
1775         assertEquals("Root node", expectedRoot,
1776             ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().orElseThrow());
1777     }
1778
1779     /**
1780      * This test simply verifies that the applySnapShot logic will work.
1781      */
1782     @Test
1783     public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1784         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1785             SCHEMA_CONTEXT);
1786
1787         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1788         putTransaction.write(TestModel.TEST_PATH,
1789             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1790         commitTransaction(store, putTransaction);
1791
1792
1793         final NormalizedNode expected = readStore(store, YangInstanceIdentifier.of());
1794
1795         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1796
1797         writeTransaction.delete(YangInstanceIdentifier.of());
1798         writeTransaction.write(YangInstanceIdentifier.of(), expected);
1799
1800         commitTransaction(store, writeTransaction);
1801
1802         final NormalizedNode actual = readStore(store, YangInstanceIdentifier.of());
1803
1804         assertEquals(expected, actual);
1805     }
1806
1807     @Test
1808     public void testRecoveryApplicable() {
1809
1810         final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1811                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1812
1813         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1814                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1815
1816         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1817                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1818
1819         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1820                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1821
1822         final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1823
1824         assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1825
1826         final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1827
1828         assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1829     }
1830
1831     @Test
1832     public void testOnDatastoreContext() {
1833         dataStoreContextBuilder.persistent(true);
1834
1835         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1836
1837         assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1838
1839         ShardTestKit.waitUntilLeader(shard);
1840
1841         shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1842
1843         assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1844
1845         shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1846
1847         assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1848     }
1849
1850     @Test
1851     public void testRegisterRoleChangeListener() {
1852         final ShardTestKit testKit = new ShardTestKit(getSystem());
1853         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1854             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1855             "testRegisterRoleChangeListener");
1856
1857         ShardTestKit.waitUntilLeader(shard);
1858
1859         final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1860
1861         shard.tell(new RegisterRoleChangeListener(), listener);
1862
1863         MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1864
1865         ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1866             ShardLeaderStateChanged.class);
1867         final var dataTree = leaderStateChanged.localShardDataTree();
1868         assertNotNull("getLocalShardDataTree present", dataTree);
1869         assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), dataTree);
1870
1871         MessageCollectorActor.clearMessages(listener);
1872
1873         // Force a leader change
1874
1875         shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1876
1877         leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1878         assertNull("getLocalShardDataTree present", leaderStateChanged.localShardDataTree());
1879     }
1880
1881     @Test
1882     public void testFollowerInitialSyncStatus() {
1883         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1884                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1885                 "testFollowerInitialSyncStatus");
1886
1887         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1888                 "member-1-shard-inventory-operational"));
1889
1890         assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1891
1892         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1893                 "member-1-shard-inventory-operational"));
1894
1895         assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1896     }
1897
1898     @Test
1899     public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1900         final ShardTestKit testKit = new ShardTestKit(getSystem());
1901         final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1902         dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1903             .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1904
1905         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1906         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1907             TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1908
1909         setupInMemorySnapshotStore();
1910
1911         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1912             newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1913             actorFactory.generateActorId(testName + "-shard"));
1914
1915         testKit.waitUntilNoLeader(shard);
1916
1917         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1918         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1919             RegisterDataTreeNotificationListenerReply.class);
1920         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1921
1922         shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1923             .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1924
1925         listener.waitForChangeEvents();
1926     }
1927
1928     @Test
1929     public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1930         final ShardTestKit testKit = new ShardTestKit(getSystem());
1931         final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1932         dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1933             .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1934
1935         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1936         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1937             TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1938
1939         setupInMemorySnapshotStore();
1940
1941         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1942             newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1943             actorFactory.generateActorId(testName + "-shard"));
1944
1945         testKit.waitUntilNoLeader(shard);
1946
1947         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1948         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1949             RegisterDataTreeNotificationListenerReply.class);
1950         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1951
1952         final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1953         regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1954         testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1955
1956         shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1957             .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1958
1959         listener.expectNoMoreChanges("Received unexpected change after close");
1960     }
1961
1962     @Test
1963     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1964         final ShardTestKit testKit = new ShardTestKit(getSystem());
1965         final String testName = "testClusteredDataTreeChangeListenerRegistration";
1966         final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1967             MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1968
1969         final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1970             MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1971
1972         final TestActorRef<Shard> followerShard = actorFactory
1973                 .createTestActor(Shard.builder().id(followerShardID)
1974                     .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1975                     .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1976                         "akka://test/user/" + leaderShardID.toString()))
1977                     .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1978                     .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1979
1980         final TestActorRef<Shard> leaderShard = actorFactory
1981                 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1982                     .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1983                         "akka://test/user/" + followerShardID.toString()))
1984                     .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1985                     .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1986
1987         leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1988         final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1989         assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1990
1991         final YangInstanceIdentifier path = TestModel.TEST_PATH;
1992         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1993         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1994             actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1995
1996         followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1997         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1998             RegisterDataTreeNotificationListenerReply.class);
1999         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2000
2001         writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2002
2003         listener.waitForChangeEvents();
2004     }
2005
2006     @Test
2007     public void testServerRemoved() {
2008         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
2009                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2010
2011         final ActorRef shard = parent.underlyingActor().context().actorOf(
2012                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2013                 "testServerRemoved");
2014
2015         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2016
2017         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2018     }
2019 }