888fe9e8a10c25d55d35200a7d976e76c2454b66
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.endsWith;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertSame;
18 import static org.junit.Assert.assertTrue;
19 import static org.junit.Assert.fail;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.Mockito.doThrow;
22 import static org.mockito.Mockito.inOrder;
23 import static org.mockito.Mockito.mock;
24 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
25
26 import akka.actor.ActorRef;
27 import akka.actor.ActorSelection;
28 import akka.actor.Props;
29 import akka.actor.Status.Failure;
30 import akka.dispatch.Dispatchers;
31 import akka.dispatch.OnComplete;
32 import akka.japi.Creator;
33 import akka.pattern.Patterns;
34 import akka.persistence.SaveSnapshotSuccess;
35 import akka.testkit.TestActorRef;
36 import akka.util.Timeout;
37 import com.google.common.base.Stopwatch;
38 import com.google.common.base.Throwables;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.time.Duration;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Map;
44 import java.util.Optional;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicReference;
50 import org.junit.Test;
51 import org.mockito.InOrder;
52 import org.opendaylight.controller.cluster.DataPersistenceProvider;
53 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
54 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
55 import org.opendaylight.controller.cluster.access.concepts.MemberName;
56 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
57 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
62 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
66 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
67 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
73 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
75 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
76 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
77 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
78 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
79 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
80 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
81 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
82 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
83 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
84 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.raft.RaftActorContext;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
89 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
93 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
95 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
97 import org.opendaylight.controller.cluster.raft.messages.Payload;
98 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
99 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
100 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
101 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
102 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
103 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
104 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
105 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
106 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
107 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
108 import org.opendaylight.yangtools.concepts.Identifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
117 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
118 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
119 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
120 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
121 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
122 import scala.concurrent.Await;
123 import scala.concurrent.Future;
124 import scala.concurrent.duration.FiniteDuration;
125
126 public class ShardTest extends AbstractShardTest {
127     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
128             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
129
130     @Test
131     public void testRegisterDataTreeChangeListener() throws Exception {
132         final ShardTestKit testKit = new ShardTestKit(getSystem());
133         final TestActorRef<Shard> shard = actorFactory.createTestActor(
134             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
135             "testRegisterDataTreeChangeListener");
136
137         ShardTestKit.waitUntilLeader(shard);
138
139         shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
140
141         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
142         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
143             TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
144
145         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
146
147         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
148             RegisterDataTreeNotificationListenerReply.class);
149         final String replyPath = reply.getListenerRegistrationPath().toString();
150         assertTrue("Incorrect reply path: " + replyPath,
151             replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
152
153         final YangInstanceIdentifier path = TestModel.TEST_PATH;
154         writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
155
156         listener.waitForChangeEvents();
157         listener.verifyOnInitialDataEvent();
158
159         final MockDataTreeChangeListener listener2 = new MockDataTreeChangeListener(1);
160         final ActorRef dclActor2 = actorFactory.createActor(DataTreeChangeListenerActor.props(listener2,
161             TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener2");
162
163         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor2, false), testKit.getRef());
164
165         testKit.expectMsgClass(Duration.ofSeconds(3), RegisterDataTreeNotificationListenerReply.class);
166
167         listener2.waitForChangeEvents();
168         listener2.verifyNoOnInitialDataEvent();
169     }
170
171     @SuppressWarnings("serial")
172     @Test
173     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
174         final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
175         final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
176         final Creator<Shard> creator = new Creator<>() {
177             boolean firstElectionTimeout = true;
178
179             @Override
180             public Shard create() {
181                 return new Shard(newShardBuilder()) {
182                     @Override
183                     public void handleCommand(final Object message) {
184                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
185                             firstElectionTimeout = false;
186                             final ActorRef self = getSelf();
187                             new Thread(() -> {
188                                 Uninterruptibles.awaitUninterruptibly(
189                                         onChangeListenerRegistered, 5, TimeUnit.SECONDS);
190                                 self.tell(message, self);
191                             }).start();
192
193                             onFirstElectionTimeout.countDown();
194                         } else {
195                             super.handleCommand(message);
196                         }
197                     }
198                 };
199             }
200         };
201
202         setupInMemorySnapshotStore();
203
204         final YangInstanceIdentifier path = TestModel.TEST_PATH;
205         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
206         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
207                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
208
209         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
210                 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
211                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
212
213         final ShardTestKit testKit = new ShardTestKit(getSystem());
214         assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
215
216         shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
217         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
218             RegisterDataTreeNotificationListenerReply.class);
219         assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
220
221         shard.tell(FindLeader.INSTANCE, testKit.getRef());
222         final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
223         assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
224
225         onChangeListenerRegistered.countDown();
226
227         // TODO: investigate why we do not receive data chage events
228         listener.waitForChangeEvents();
229     }
230
231     @Test
232     public void testCreateTransaction() {
233         final ShardTestKit testKit = new ShardTestKit(getSystem());
234         final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
235
236         ShardTestKit.waitUntilLeader(shard);
237
238         shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
239
240         shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
241             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
242
243         final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
244             CreateTransactionReply.class);
245
246         final String path = reply.getTransactionPath().toString();
247         assertThat(path, containsString(String.format("/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
248             shardID.getShardName(), shardID.getMemberName().getName())));
249     }
250
251     @Test
252     public void testCreateTransactionOnChain() {
253         final ShardTestKit testKit = new ShardTestKit(getSystem());
254         final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
255
256         ShardTestKit.waitUntilLeader(shard);
257
258         shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
259             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
260
261         final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
262             CreateTransactionReply.class);
263
264         final String path = reply.getTransactionPath().toString();
265         assertThat(path, containsString(String.format(
266             "/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
267             shardID.getShardName(), shardID.getMemberName().getName())));
268     }
269
270     @Test
271     public void testPeerAddressResolved() {
272         final ShardTestKit testKit = new ShardTestKit(getSystem());
273         final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
274                 "config");
275         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
276             .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
277             .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
278
279         final String address = "akka://foobar";
280         shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
281
282         shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
283         final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
284         assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
285     }
286
287     @Test
288     public void testApplySnapshot() throws Exception {
289
290         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
291                 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
292
293         ShardTestKit.waitUntilLeader(shard);
294
295         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
296             SCHEMA_CONTEXT);
297
298         final ContainerNode container = Builders.containerBuilder()
299             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
300             .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
301                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1))
302                 .build())
303             .build();
304
305         writeToStore(store, TestModel.TEST_PATH, container);
306
307         final YangInstanceIdentifier root = YangInstanceIdentifier.of();
308         final NormalizedNode expected = readStore(store, root);
309
310         final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
311                 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
312
313         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
314
315         final Stopwatch sw = Stopwatch.createStarted();
316         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
317             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
318
319             try {
320                 assertEquals("Root node", expected, readStore(shard, root));
321                 return;
322             } catch (final AssertionError e) {
323                 // try again
324             }
325         }
326
327         fail("Snapshot was not applied");
328     }
329
330     @Test
331     public void testApplyState() throws Exception {
332         final TestActorRef<Shard> shard = actorFactory.createTestActor(
333                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
334
335         ShardTestKit.waitUntilLeader(shard);
336
337         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
338             SCHEMA_CONTEXT);
339
340         final DataTreeModification writeMod = store.takeSnapshot().newModification();
341         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
342         writeMod.write(TestModel.TEST_PATH, node);
343         writeMod.ready();
344
345         final TransactionIdentifier tx = nextTransactionId();
346         shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
347
348         final Stopwatch sw = Stopwatch.createStarted();
349         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
350             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
351
352             final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH);
353             if (actual != null) {
354                 assertEquals("Applied state", node, actual);
355                 return;
356             }
357         }
358
359         fail("State was not applied");
360     }
361
362     @Test
363     public void testDataTreeCandidateRecovery() throws Exception {
364         // Set up the InMemorySnapshotStore.
365         final DataTree source = setupInMemorySnapshotStore();
366
367         final DataTreeModification writeMod = source.takeSnapshot().newModification();
368         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
369         writeMod.ready();
370         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
371
372         // Set up the InMemoryJournal.
373         InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
374             payloadForModification(source, writeMod, nextTransactionId())));
375
376         final int nListEntries = 16;
377         final Set<Integer> listEntryKeys = new HashSet<>();
378
379         // Add some ModificationPayload entries
380         for (int i = 1; i <= nListEntries; i++) {
381             listEntryKeys.add(i);
382
383             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
384                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
385
386             final DataTreeModification mod = source.takeSnapshot().newModification();
387             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
388             mod.ready();
389
390             InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
391                 payloadForModification(source, mod, nextTransactionId())));
392         }
393
394         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
395             new ApplyJournalEntries(nListEntries));
396
397         testRecovery(listEntryKeys, true);
398     }
399
400     @Test
401     @SuppressWarnings("checkstyle:IllegalCatch")
402     public void testConcurrentThreePhaseCommits() throws Exception {
403         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
404         final CountDownLatch commitLatch = new CountDownLatch(2);
405
406         final long timeoutSec = 5;
407         final Duration duration = Duration.ofSeconds(timeoutSec);
408         final Timeout timeout = Timeout.create(duration);
409
410         final TestActorRef<Shard> shard = actorFactory.createTestActor(
411                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
412                 "testConcurrentThreePhaseCommits");
413
414         class OnFutureComplete extends OnComplete<Object> {
415             private final Class<?> expRespType;
416
417             OnFutureComplete(final Class<?> expRespType) {
418                 this.expRespType = expRespType;
419             }
420
421             @Override
422             public void onComplete(final Throwable error, final Object resp) {
423                 if (error != null) {
424                     caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
425                 } else {
426                     try {
427                         assertEquals("Commit response type", expRespType, resp.getClass());
428                         onSuccess(resp);
429                     } catch (final Exception e) {
430                         caughtEx.set(e);
431                     }
432                 }
433             }
434
435             void onSuccess(final Object resp) {
436             }
437         }
438
439         class OnCommitFutureComplete extends OnFutureComplete {
440             OnCommitFutureComplete() {
441                 super(CommitTransactionReply.class);
442             }
443
444             @Override
445             public void onComplete(final Throwable error, final Object resp) {
446                 super.onComplete(error, resp);
447                 commitLatch.countDown();
448             }
449         }
450
451         class OnCanCommitFutureComplete extends OnFutureComplete {
452             private final TransactionIdentifier transactionID;
453
454             OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
455                 super(CanCommitTransactionReply.class);
456                 this.transactionID = transactionID;
457             }
458
459             @Override
460             void onSuccess(final Object resp) {
461                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
462                 assertTrue("Can commit", canCommitReply.getCanCommit());
463
464                 final Future<Object> commitFuture = Patterns.ask(shard,
465                         new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
466                 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
467             }
468         }
469
470         final ShardTestKit testKit = new ShardTestKit(getSystem());
471         ShardTestKit.waitUntilLeader(shard);
472
473         final TransactionIdentifier transactionID1 = nextTransactionId();
474         final TransactionIdentifier transactionID2 = nextTransactionId();
475         final TransactionIdentifier transactionID3 = nextTransactionId();
476
477         final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
478             shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
479         final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
480         final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
481         final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
482
483         shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
484             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
485         final ReadyTransactionReply readyReply = ReadyTransactionReply
486                 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
487
488         String pathSuffix = shard.path().toString().replaceFirst("akka://test", "");
489         assertThat(readyReply.getCohortPath(), endsWith(pathSuffix));
490         // Send the CanCommitTransaction message for the first Tx.
491
492         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
493         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
494                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
495         assertTrue("Can commit", canCommitReply.getCanCommit());
496
497         // Ready 2 more Tx's.
498
499         shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
500             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
501         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
502
503         shard.tell(
504             prepareBatchedModifications(transactionID3,
505                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
506                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
507                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
508         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
509
510         // Send the CanCommitTransaction message for the next 2 Tx's.
511         // These should get queued and
512         // processed after the first Tx completes.
513
514         final Future<Object> canCommitFuture1 = Patterns.ask(shard,
515             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
516
517         final Future<Object> canCommitFuture2 = Patterns.ask(shard,
518             new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
519
520         // Send the CommitTransaction message for the first Tx. After it
521         // completes, it should
522         // trigger the 2nd Tx to proceed which should in turn then
523         // trigger the 3rd.
524
525         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
526         testKit.expectMsgClass(duration, CommitTransactionReply.class);
527
528         // Wait for the next 2 Tx's to complete.
529
530         canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
531
532         canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
533
534         final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
535
536         final Throwable t = caughtEx.get();
537         if (t != null) {
538             Throwables.propagateIfPossible(t, Exception.class);
539             throw new RuntimeException(t);
540         }
541
542         assertTrue("Commits complete", done);
543
544 //                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
545 //                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
546 //                        cohort3.getPreCommit(), cohort3.getCommit());
547 //                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
548 //                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
549 //                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
550 //                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
551 //                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
552 //                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
553 //                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
554 //                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
555 //                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
556
557         // Verify data in the data store.
558
559         verifyOuterListEntry(shard, 1);
560
561         verifyLastApplied(shard, 3);
562     }
563
564     @Test
565     public void testBatchedModificationsWithNoCommitOnReady() {
566         final ShardTestKit testKit = new ShardTestKit(getSystem());
567         final TestActorRef<Shard> shard = actorFactory.createTestActor(
568             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
569             "testBatchedModificationsWithNoCommitOnReady");
570
571         ShardTestKit.waitUntilLeader(shard);
572
573         final TransactionIdentifier transactionID = nextTransactionId();
574         final Duration duration = Duration.ofSeconds(5);
575
576         // Send a BatchedModifications to start a transaction.
577
578         shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
579             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
580         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
581
582         // Send a couple more BatchedModifications.
583
584         shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
585                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
586             testKit.getRef());
587         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
588
589         shard.tell(newBatchedModifications(transactionID,
590             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
591             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
592             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
593             testKit.getRef());
594         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
595
596         // Send the CanCommitTransaction message.
597
598         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
599         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
600                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
601         assertTrue("Can commit", canCommitReply.getCanCommit());
602
603         // Send the CommitTransaction message.
604
605         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
606         testKit.expectMsgClass(duration, CommitTransactionReply.class);
607
608         // Verify data in the data store.
609
610         verifyOuterListEntry(shard, 1);
611     }
612
613     @Test
614     public void testBatchedModificationsWithCommitOnReady() {
615         final ShardTestKit testKit = new ShardTestKit(getSystem());
616         final TestActorRef<Shard> shard = actorFactory.createTestActor(
617             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
618             "testBatchedModificationsWithCommitOnReady");
619
620         ShardTestKit.waitUntilLeader(shard);
621
622         final TransactionIdentifier transactionID = nextTransactionId();
623         final Duration duration = Duration.ofSeconds(5);
624
625         // Send a BatchedModifications to start a transaction.
626
627         shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
628             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
629         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
630
631         // Send a couple more BatchedModifications.
632
633         shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
634             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
635             testKit.getRef());
636         testKit.expectMsgClass(duration, BatchedModificationsReply.class);
637
638         shard.tell(newBatchedModifications(transactionID,
639             YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
640             .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
641             ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
642             testKit.getRef());
643
644         testKit.expectMsgClass(duration, CommitTransactionReply.class);
645
646         // Verify data in the data store.
647         verifyOuterListEntry(shard, 1);
648     }
649
650     @Test(expected = IllegalStateException.class)
651     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
652         final ShardTestKit testKit = new ShardTestKit(getSystem());
653         final TestActorRef<Shard> shard = actorFactory.createTestActor(
654             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
655             "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
656
657         ShardTestKit.waitUntilLeader(shard);
658
659         final TransactionIdentifier transactionID = nextTransactionId();
660         final BatchedModifications batched = new BatchedModifications(transactionID,
661             DataStoreVersions.CURRENT_VERSION);
662         batched.setReady();
663         batched.setTotalMessagesSent(2);
664
665         shard.tell(batched, testKit.getRef());
666
667         final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
668
669         if (failure != null) {
670             Throwables.propagateIfPossible(failure.cause(), Exception.class);
671             throw new RuntimeException(failure.cause());
672         }
673     }
674
675     @Test
676     public void testBatchedModificationsWithOperationFailure() {
677         final ShardTestKit testKit = new ShardTestKit(getSystem());
678         final TestActorRef<Shard> shard = actorFactory.createTestActor(
679             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
680             "testBatchedModificationsWithOperationFailure");
681
682         ShardTestKit.waitUntilLeader(shard);
683
684         // Test merge with invalid data. An exception should occur when
685         // the merge is applied. Note that
686         // write will not validate the children for performance reasons.
687
688         final TransactionIdentifier transactionID = nextTransactionId();
689
690         final ContainerNode invalidData = Builders.containerBuilder()
691             .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
692             .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
693             .build();
694
695         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
696         batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
697         shard.tell(batched, testKit.getRef());
698         Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
699
700         final Throwable cause = failure.cause();
701
702         batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
703         batched.setReady();
704         batched.setTotalMessagesSent(2);
705
706         shard.tell(batched, testKit.getRef());
707
708         failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
709         assertEquals("Failure cause", cause, failure.cause());
710     }
711
712     @Test
713     public void testBatchedModificationsOnTransactionChain() {
714         final ShardTestKit testKit = new ShardTestKit(getSystem());
715         final TestActorRef<Shard> shard = actorFactory.createTestActor(
716             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
717             "testBatchedModificationsOnTransactionChain");
718
719         ShardTestKit.waitUntilLeader(shard);
720
721         final LocalHistoryIdentifier historyId = nextHistoryId();
722         final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
723         final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
724
725         final Duration duration = Duration.ofSeconds(5);
726
727         // Send a BatchedModifications to start a chained write
728         // transaction and ready it.
729
730         final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
731         final YangInstanceIdentifier path = TestModel.TEST_PATH;
732         shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
733         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
734
735         // Create a read Tx on the same chain.
736
737         shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
738             DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
739
740         final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
741             CreateTransactionReply.class);
742
743         getSystem().actorSelection(createReply.getTransactionPath())
744         .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
745         final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
746         assertEquals("Read node", containerNode, readReply.getNormalizedNode());
747
748         // Commit the write transaction.
749
750         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
751         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
752                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
753         assertTrue("Can commit", canCommitReply.getCanCommit());
754
755         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
756         testKit.expectMsgClass(duration, CommitTransactionReply.class);
757
758         // Verify data in the data store.
759
760         final NormalizedNode actualNode = readStore(shard, path);
761         assertEquals("Stored node", containerNode, actualNode);
762     }
763
764     @Test
765     public void testOnBatchedModificationsWhenNotLeader() {
766         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
767         final ShardTestKit testKit = new ShardTestKit(getSystem());
768         final Creator<Shard> creator = new Creator<>() {
769             private static final long serialVersionUID = 1L;
770
771             @Override
772             public Shard create() {
773                 return new Shard(newShardBuilder()) {
774                     @Override
775                     protected boolean isLeader() {
776                         return overrideLeaderCalls.get() ? false : super.isLeader();
777                     }
778
779                     @Override
780                     public ActorSelection getLeader() {
781                         return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
782                                 : super.getLeader();
783                     }
784                 };
785             }
786         };
787
788         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
789             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
790             "testOnBatchedModificationsWhenNotLeader");
791
792         ShardTestKit.waitUntilLeader(shard);
793
794         overrideLeaderCalls.set(true);
795
796         final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
797             DataStoreVersions.CURRENT_VERSION);
798
799         shard.tell(batched, ActorRef.noSender());
800
801         testKit.expectMsgEquals(batched);
802     }
803
804     @Test
805     public void testTransactionMessagesWithNoLeader() {
806         final ShardTestKit testKit = new ShardTestKit(getSystem());
807         dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
808         .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
809         final TestActorRef<Shard> shard = actorFactory.createTestActor(
810             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
811             "testTransactionMessagesWithNoLeader");
812
813         testKit.waitUntilNoLeader(shard);
814
815         final TransactionIdentifier txId = nextTransactionId();
816         shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
817         Failure failure = testKit.expectMsgClass(Failure.class);
818         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
819
820         shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
821             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
822         failure = testKit.expectMsgClass(Failure.class);
823         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
824
825         shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
826             testKit.getRef());
827         failure = testKit.expectMsgClass(Failure.class);
828         assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
829     }
830
831     @Test
832     public void testReadyWithReadWriteImmediateCommit() {
833         testReadyWithImmediateCommit(true);
834     }
835
836     @Test
837     public void testReadyWithWriteOnlyImmediateCommit() {
838         testReadyWithImmediateCommit(false);
839     }
840
841     private void testReadyWithImmediateCommit(final boolean readWrite) {
842         final ShardTestKit testKit = new ShardTestKit(getSystem());
843         final TestActorRef<Shard> shard = actorFactory.createTestActor(
844             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
845             "testReadyWithImmediateCommit-" + readWrite);
846
847         ShardTestKit.waitUntilLeader(shard);
848
849         final TransactionIdentifier transactionID = nextTransactionId();
850         final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
851         if (readWrite) {
852             shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
853                 testKit.getRef());
854         } else {
855             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
856                 testKit.getRef());
857         }
858
859         testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
860
861         final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
862         assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
863     }
864
865     @Test
866     public void testReadyLocalTransactionWithImmediateCommit() {
867         final ShardTestKit testKit = new ShardTestKit(getSystem());
868         final TestActorRef<Shard> shard = actorFactory.createTestActor(
869             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
870             "testReadyLocalTransactionWithImmediateCommit");
871
872         ShardTestKit.waitUntilLeader(shard);
873
874         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
875
876         final DataTreeModification modification = dataStore.newModification();
877
878         final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
879         new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
880         final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
881                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
882                 .build();
883         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
884
885         final TransactionIdentifier txId = nextTransactionId();
886         modification.ready();
887         final ReadyLocalTransaction readyMessage =
888                 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
889
890         shard.tell(readyMessage, testKit.getRef());
891
892         testKit.expectMsgClass(CommitTransactionReply.class);
893
894         final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
895         assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
896     }
897
898     @Test
899     public void testReadyLocalTransactionWithThreePhaseCommit() {
900         final ShardTestKit testKit = new ShardTestKit(getSystem());
901         final TestActorRef<Shard> shard = actorFactory.createTestActor(
902             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
903             "testReadyLocalTransactionWithThreePhaseCommit");
904
905         ShardTestKit.waitUntilLeader(shard);
906
907         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
908
909         final DataTreeModification modification = dataStore.newModification();
910
911         final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
912         new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
913         final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
914                 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
915                 .build();
916         new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
917
918         final TransactionIdentifier txId = nextTransactionId();
919         modification.ready();
920         final ReadyLocalTransaction readyMessage =
921                 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
922
923         shard.tell(readyMessage, testKit.getRef());
924
925         testKit.expectMsgClass(ReadyTransactionReply.class);
926
927         // Send the CanCommitTransaction message.
928
929         shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
930         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
931                 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
932         assertTrue("Can commit", canCommitReply.getCanCommit());
933
934         // Send the CanCommitTransaction message.
935
936         shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
937         testKit.expectMsgClass(CommitTransactionReply.class);
938
939         final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
940         assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
941     }
942
943     @Test
944     public void testReadWriteCommitWithPersistenceDisabled() {
945         dataStoreContextBuilder.persistent(false);
946         final ShardTestKit testKit = new ShardTestKit(getSystem());
947         final TestActorRef<Shard> shard = actorFactory.createTestActor(
948             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
949             "testCommitWithPersistenceDisabled");
950
951         ShardTestKit.waitUntilLeader(shard);
952
953         // Setup a simulated transactions with a mock cohort.
954
955         final Duration duration = Duration.ofSeconds(5);
956
957         final TransactionIdentifier transactionID = nextTransactionId();
958         final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
959         shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
960             testKit.getRef());
961         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
962
963         // Send the CanCommitTransaction message.
964
965         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
966         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
967                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
968         assertTrue("Can commit", canCommitReply.getCanCommit());
969
970         // Send the CanCommitTransaction message.
971
972         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
973         testKit.expectMsgClass(duration, CommitTransactionReply.class);
974
975         final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
976         assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
977     }
978
979     @Test
980     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
981         testCommitWhenTransactionHasModifications(true);
982     }
983
984     @Test
985     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
986         testCommitWhenTransactionHasModifications(false);
987     }
988
989     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
990         final ShardTestKit testKit = new ShardTestKit(getSystem());
991         final DataTree dataTree = createDelegatingMockDataTree();
992         final TestActorRef<Shard> shard = actorFactory.createTestActor(
993             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
994             "testCommitWhenTransactionHasModifications-" + readWrite);
995
996         ShardTestKit.waitUntilLeader(shard);
997
998         final Duration duration = Duration.ofSeconds(5);
999         final TransactionIdentifier transactionID = nextTransactionId();
1000
1001         if (readWrite) {
1002             shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1003                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1004         } else {
1005             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1006                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1007         }
1008
1009         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1010
1011         // Send the CanCommitTransaction message.
1012
1013         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1014         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1015                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1016         assertTrue("Can commit", canCommitReply.getCanCommit());
1017
1018         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1019         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1020
1021         final InOrder inOrder = inOrder(dataTree);
1022         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1023         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1024         inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1025
1026         // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1027         // the journal
1028         Thread.sleep(HEARTBEAT_MILLIS * 2);
1029
1030         shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1031         final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1032
1033         // Use MBean for verification
1034         // Committed transaction count should increase as usual
1035         assertEquals(1, shardStats.getCommittedTransactionsCount());
1036
1037         // Commit index should advance 1 to account for disabling metadata
1038         assertEquals(1, shardStats.getCommitIndex());
1039     }
1040
1041     @Test
1042     public void testCommitPhaseFailure() throws Exception {
1043         final ShardTestKit testKit = new ShardTestKit(getSystem());
1044         final DataTree dataTree = createDelegatingMockDataTree();
1045         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1046             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1047             "testCommitPhaseFailure");
1048
1049         ShardTestKit.waitUntilLeader(shard);
1050
1051         final Duration duration = Duration.ofSeconds(5);
1052         final Timeout timeout = Timeout.create(duration);
1053
1054         // Setup 2 simulated transactions with mock cohorts. The first
1055         // one fails in the
1056         // commit phase.
1057
1058         doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1059         .commit(any(DataTreeCandidate.class));
1060
1061         final TransactionIdentifier transactionID1 = nextTransactionId();
1062         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1063             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1064         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1065
1066         final TransactionIdentifier transactionID2 = nextTransactionId();
1067         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1068             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1069         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1070
1071         // Send the CanCommitTransaction message for the first Tx.
1072
1073         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1074         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1075                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1076         assertTrue("Can commit", canCommitReply.getCanCommit());
1077
1078         // Send the CanCommitTransaction message for the 2nd Tx. This
1079         // should get queued and
1080         // processed after the first Tx completes.
1081
1082         final Future<Object> canCommitFuture = Patterns.ask(shard,
1083             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1084
1085         // Send the CommitTransaction message for the first Tx. This
1086         // should send back an error
1087         // and trigger the 2nd Tx to proceed.
1088
1089         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1090         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1091
1092         // Wait for the 2nd Tx to complete the canCommit phase.
1093
1094         final CountDownLatch latch = new CountDownLatch(1);
1095         canCommitFuture.onComplete(new OnComplete<>() {
1096             @Override
1097             public void onComplete(final Throwable failure, final Object resp) {
1098                 latch.countDown();
1099             }
1100         }, getSystem().dispatcher());
1101
1102         assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1103
1104         final InOrder inOrder = inOrder(dataTree);
1105         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1106         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1107
1108         // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1109         //        validate performs wrapping and we capture that mock
1110         // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1111
1112         inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1113     }
1114
1115     @Test
1116     public void testPreCommitPhaseFailure() throws Exception {
1117         final ShardTestKit testKit = new ShardTestKit(getSystem());
1118         final DataTree dataTree = createDelegatingMockDataTree();
1119         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1120             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1121             "testPreCommitPhaseFailure");
1122
1123         ShardTestKit.waitUntilLeader(shard);
1124
1125         final Duration duration = Duration.ofSeconds(5);
1126         final Timeout timeout = Timeout.create(duration);
1127
1128         doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1129         .prepare(any(DataTreeModification.class));
1130
1131         final TransactionIdentifier transactionID1 = nextTransactionId();
1132         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1133             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1134         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1135
1136         final TransactionIdentifier transactionID2 = nextTransactionId();
1137         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1138             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1139         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1140
1141         // Send the CanCommitTransaction message for the first Tx.
1142
1143         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1144         final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1145                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1146         assertTrue("Can commit", canCommitReply.getCanCommit());
1147
1148         // Send the CanCommitTransaction message for the 2nd Tx. This
1149         // should get queued and
1150         // processed after the first Tx completes.
1151
1152         final Future<Object> canCommitFuture = Patterns.ask(shard,
1153             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1154
1155         // Send the CommitTransaction message for the first Tx. This
1156         // should send back an error
1157         // and trigger the 2nd Tx to proceed.
1158
1159         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1160         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1161
1162         // Wait for the 2nd Tx to complete the canCommit phase.
1163
1164         final CountDownLatch latch = new CountDownLatch(1);
1165         canCommitFuture.onComplete(new OnComplete<>() {
1166             @Override
1167             public void onComplete(final Throwable failure, final Object resp) {
1168                 latch.countDown();
1169             }
1170         }, getSystem().dispatcher());
1171
1172         assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1173
1174         final InOrder inOrder = inOrder(dataTree);
1175         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1176         inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1177         inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1178     }
1179
1180     @Test
1181     public void testCanCommitPhaseFailure() throws Exception {
1182         final ShardTestKit testKit = new ShardTestKit(getSystem());
1183         final DataTree dataTree = createDelegatingMockDataTree();
1184         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1185             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1186             "testCanCommitPhaseFailure");
1187
1188         ShardTestKit.waitUntilLeader(shard);
1189
1190         final Duration duration = Duration.ofSeconds(5);
1191         final TransactionIdentifier transactionID1 = nextTransactionId();
1192
1193         doThrow(new DataValidationFailedException(YangInstanceIdentifier.of(), "mock canCommit failure"))
1194         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1195
1196         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1197             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1198         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1199
1200         // Send the CanCommitTransaction message.
1201
1202         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1203         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1204
1205         // Send another can commit to ensure the failed one got cleaned
1206         // up.
1207
1208         final TransactionIdentifier transactionID2 = nextTransactionId();
1209         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1210             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1211         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1212
1213         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1214         final CanCommitTransactionReply reply = CanCommitTransactionReply
1215                 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1216         assertTrue("getCanCommit", reply.getCanCommit());
1217     }
1218
1219     @Test
1220     public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1221         testImmediateCommitWithCanCommitPhaseFailure(true);
1222         testImmediateCommitWithCanCommitPhaseFailure(false);
1223     }
1224
1225     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1226         final ShardTestKit testKit = new ShardTestKit(getSystem());
1227         final DataTree dataTree = createDelegatingMockDataTree();
1228         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1229             newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1230             "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1231
1232         ShardTestKit.waitUntilLeader(shard);
1233
1234         doThrow(new DataValidationFailedException(YangInstanceIdentifier.of(), "mock canCommit failure"))
1235         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1236
1237         final Duration duration = Duration.ofSeconds(5);
1238
1239         final TransactionIdentifier transactionID1 = nextTransactionId();
1240
1241         if (readWrite) {
1242             shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1243                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1244         } else {
1245             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1246                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1247         }
1248
1249         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1250
1251         // Send another can commit to ensure the failed one got cleaned
1252         // up.
1253
1254         final TransactionIdentifier transactionID2 = nextTransactionId();
1255         if (readWrite) {
1256             shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1257                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1258         } else {
1259             shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1260                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1261         }
1262
1263         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1264     }
1265
1266     @Test
1267     public void testAbortWithCommitPending() {
1268         final ShardTestKit testKit = new ShardTestKit(getSystem());
1269         final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1270             @Override
1271             void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
1272                 // Simulate an AbortTransaction message occurring during
1273                 // replication, after
1274                 // persisting and before finishing the commit to the
1275                 // in-memory store.
1276
1277                 doAbortTransaction(id, null);
1278                 super.persistPayload(id, payload, batchHint);
1279             }
1280         };
1281
1282         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1283             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1284             "testAbortWithCommitPending");
1285
1286         ShardTestKit.waitUntilLeader(shard);
1287
1288         final Duration duration = Duration.ofSeconds(5);
1289
1290         final TransactionIdentifier transactionID = nextTransactionId();
1291
1292         shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1293             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1294         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1295
1296         shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1297         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1298
1299         shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1300         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1301
1302         final NormalizedNode node = readStore(shard, TestModel.TEST_PATH);
1303
1304         // Since we're simulating an abort occurring during replication
1305         // and before finish commit,
1306         // the data should still get written to the in-memory store
1307         // since we've gotten past
1308         // canCommit and preCommit and persisted the data.
1309         assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1310     }
1311
1312     @Test
1313     public void testTransactionCommitTimeout() throws Exception {
1314         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1315         final ShardTestKit testKit = new ShardTestKit(getSystem());
1316         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1317             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1318             "testTransactionCommitTimeout");
1319
1320         ShardTestKit.waitUntilLeader(shard);
1321
1322         final Duration duration = Duration.ofSeconds(5);
1323
1324         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1325         writeToStore(shard, TestModel.OUTER_LIST_PATH,
1326             ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1327
1328         // Ready 2 Tx's - the first will timeout
1329
1330         final TransactionIdentifier transactionID1 = nextTransactionId();
1331         shard.tell(
1332             prepareBatchedModifications(transactionID1,
1333                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1334                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1335                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1336             testKit.getRef());
1337         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1338
1339         final TransactionIdentifier transactionID2 = nextTransactionId();
1340         final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1341                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1342         shard.tell(
1343             prepareBatchedModifications(transactionID2, listNodePath,
1344                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1345         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1346
1347         // canCommit 1st Tx. We don't send the commit so it should
1348         // timeout.
1349
1350         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1351         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1352
1353         // canCommit the 2nd Tx - it should complete after the 1st Tx
1354         // times out.
1355
1356         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1357         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1358
1359         // Try to commit the 1st Tx - should fail as it's not the
1360         // current Tx.
1361
1362         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1363         testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1364
1365         // Commit the 2nd Tx.
1366
1367         shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1368         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1369
1370         final NormalizedNode node = readStore(shard, listNodePath);
1371         assertNotNull(listNodePath + " not found", node);
1372     }
1373
1374 //    @Test
1375 //    @Ignore
1376 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1377 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1378 //
1379 //        new ShardTestKit(getSystem()) {{
1380 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1381 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1382 //                    "testTransactionCommitQueueCapacityExceeded");
1383 //
1384 //            waitUntilLeader(shard);
1385 //
1386 //            final FiniteDuration duration = duration("5 seconds");
1387 //
1388 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1389 //
1390 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1391 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1392 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1393 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1394 //                    modification1);
1395 //
1396 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1397 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1398 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1399 //                    TestModel.OUTER_LIST_PATH,
1400 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1401 //                    modification2);
1402 //
1403 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1404 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1405 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1406 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1407 //                    modification3);
1408 //
1409 //            // Ready the Tx's
1410 //
1411 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1412 //                    modification1), getRef());
1413 //            expectMsgClass(duration, ReadyTransactionReply.class);
1414 //
1415 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1416 //                    modification2), getRef());
1417 //            expectMsgClass(duration, ReadyTransactionReply.class);
1418 //
1419 //            // The 3rd Tx should exceed queue capacity and fail.
1420 //
1421 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1422 //                    modification3), getRef());
1423 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1424 //
1425 //            // canCommit 1st Tx.
1426 //
1427 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1428 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1429 //
1430 //            // canCommit the 2nd Tx - it should get queued.
1431 //
1432 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1433 //
1434 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1435 //
1436 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1437 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1438 //        }};
1439 //    }
1440
1441     @Test
1442     public void testTransactionCommitWithPriorExpiredCohortEntries() {
1443         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1444         final ShardTestKit testKit = new ShardTestKit(getSystem());
1445         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1446             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1447             "testTransactionCommitWithPriorExpiredCohortEntries");
1448
1449         ShardTestKit.waitUntilLeader(shard);
1450
1451         final Duration duration = Duration.ofSeconds(5);
1452
1453         final TransactionIdentifier transactionID1 = nextTransactionId();
1454         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1455             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1456         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1457
1458         final TransactionIdentifier transactionID2 = nextTransactionId();
1459         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1460             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1461         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1462
1463         final TransactionIdentifier transactionID3 = nextTransactionId();
1464         shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1465             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1466         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1467
1468         // All Tx's are readied. We'll send canCommit for the last one
1469         // but not the others. The others
1470         // should expire from the queue and the last one should be
1471         // processed.
1472
1473         shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1474         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1475     }
1476
1477     @Test
1478     public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1479         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1480         final ShardTestKit testKit = new ShardTestKit(getSystem());
1481         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1482             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1483             "testTransactionCommitWithSubsequentExpiredCohortEntry");
1484
1485         ShardTestKit.waitUntilLeader(shard);
1486
1487         final Duration duration = Duration.ofSeconds(5);
1488
1489         final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1490
1491         final TransactionIdentifier transactionID1 = nextTransactionId();
1492         shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1493             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1494         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1495
1496         // CanCommit the first Tx so it's the current in-progress Tx.
1497
1498         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1499         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1500
1501         // Ready the second Tx.
1502
1503         final TransactionIdentifier transactionID2 = nextTransactionId();
1504         shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1505             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1506         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1507
1508         // Ready the third Tx.
1509
1510         final TransactionIdentifier transactionID3 = nextTransactionId();
1511         final DataTreeModification modification3 = dataStore.newModification();
1512         new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1513             .apply(modification3);
1514         modification3.ready();
1515         final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1516             true, Optional.empty());
1517         shard.tell(readyMessage, testKit.getRef());
1518
1519         // Commit the first Tx. After completing, the second should
1520         // expire from the queue and the third
1521         // Tx committed.
1522
1523         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1524         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1525
1526         // Expect commit reply from the third Tx.
1527
1528         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1529
1530         final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH);
1531         assertNotNull(TestModel.TEST2_PATH + " not found", node);
1532     }
1533
1534     @Test
1535     public void testCanCommitBeforeReadyFailure() {
1536         final ShardTestKit testKit = new ShardTestKit(getSystem());
1537         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1538             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1539             "testCanCommitBeforeReadyFailure");
1540
1541         shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1542         testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
1543     }
1544
1545     @Test
1546     public void testAbortAfterCanCommit() throws Exception {
1547         final ShardTestKit testKit = new ShardTestKit(getSystem());
1548         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1549             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1550
1551         ShardTestKit.waitUntilLeader(shard);
1552
1553         final Duration duration = Duration.ofSeconds(5);
1554         final Timeout timeout = Timeout.create(duration);
1555
1556         // Ready 2 transactions - the first one will be aborted.
1557
1558         final TransactionIdentifier transactionID1 = nextTransactionId();
1559         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1560             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1561         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1562
1563         final TransactionIdentifier transactionID2 = nextTransactionId();
1564         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1565             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1566         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1567
1568         // Send the CanCommitTransaction message for the first Tx.
1569
1570         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1571         CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1572                 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1573         assertTrue("Can commit", canCommitReply.getCanCommit());
1574
1575         // Send the CanCommitTransaction message for the 2nd Tx. This
1576         // should get queued and
1577         // processed after the first Tx completes.
1578
1579         final Future<Object> canCommitFuture = Patterns.ask(shard,
1580             new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1581
1582         // Send the AbortTransaction message for the first Tx. This
1583         // should trigger the 2nd
1584         // Tx to proceed.
1585
1586         shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1587         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1588
1589         // Wait for the 2nd Tx to complete the canCommit phase.
1590
1591         canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
1592             FiniteDuration.create(5, TimeUnit.SECONDS));
1593         assertTrue("Can commit", canCommitReply.getCanCommit());
1594     }
1595
1596     @Test
1597     public void testAbortAfterReady() {
1598         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1599         final ShardTestKit testKit = new ShardTestKit(getSystem());
1600         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1601             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1602
1603         ShardTestKit.waitUntilLeader(shard);
1604
1605         final Duration duration = Duration.ofSeconds(5);
1606
1607         // Ready a tx.
1608
1609         final TransactionIdentifier transactionID1 = nextTransactionId();
1610         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1611             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1612         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1613
1614         // Send the AbortTransaction message.
1615
1616         shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1617         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1618
1619         assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1620
1621         // Now send CanCommitTransaction - should fail.
1622
1623         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1624         final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1625         assertTrue("Failure type", failure instanceof IllegalStateException);
1626
1627         // Ready and CanCommit another and verify success.
1628
1629         final TransactionIdentifier transactionID2 = nextTransactionId();
1630         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1631             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1632         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1633
1634         shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1635         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1636     }
1637
1638     @Test
1639     public void testAbortQueuedTransaction() {
1640         final ShardTestKit testKit = new ShardTestKit(getSystem());
1641         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1642             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1643
1644         ShardTestKit.waitUntilLeader(shard);
1645
1646         final Duration duration = Duration.ofSeconds(5);
1647
1648         // Ready 3 tx's.
1649
1650         final TransactionIdentifier transactionID1 = nextTransactionId();
1651         shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1652             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1653         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1654
1655         final TransactionIdentifier transactionID2 = nextTransactionId();
1656         shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1657             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1658         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1659
1660         final TransactionIdentifier transactionID3 = nextTransactionId();
1661         shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1662                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1663         testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1664
1665         // Abort the second tx while it's queued.
1666
1667         shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1668         testKit.expectMsgClass(duration, AbortTransactionReply.class);
1669
1670         // Commit the other 2.
1671
1672         shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1673         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1674
1675         shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1676         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1677
1678         shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1679         testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1680
1681         shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1682         testKit.expectMsgClass(duration, CommitTransactionReply.class);
1683
1684         assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1685     }
1686
1687     @Test
1688     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1689         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1690     }
1691
1692     @Test
1693     public void testCreateSnapshot() throws Exception {
1694         testCreateSnapshot(true, "testCreateSnapshot");
1695     }
1696
1697     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1698         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1699
1700         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1701         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1702             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1703                 super(delegate);
1704             }
1705
1706             @Override
1707             public void saveSnapshot(final Object obj) {
1708                 savedSnapshot.set(obj);
1709                 super.saveSnapshot(obj);
1710             }
1711         }
1712
1713         dataStoreContextBuilder.persistent(persistent);
1714
1715         final class TestShard extends Shard {
1716
1717             TestShard(final AbstractBuilder<?, ?> builder) {
1718                 super(builder);
1719                 setPersistence(new TestPersistentDataProvider(super.persistence()));
1720             }
1721
1722             @Override
1723             public void handleCommand(final Object message) {
1724                 super.handleCommand(message);
1725
1726                 // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1727                 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1728                     latch.get().countDown();
1729                 }
1730             }
1731
1732             @Override
1733             public RaftActorContext getRaftActorContext() {
1734                 return super.getRaftActorContext();
1735             }
1736         }
1737
1738         final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1739
1740         final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1741             new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1742
1743         ShardTestKit.waitUntilLeader(shard);
1744         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1745
1746         final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.of());
1747
1748         // Trigger creation of a snapshot by ensuring
1749         final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1750         raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1751         awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1752
1753         raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1754         awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1755     }
1756
1757     private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1758             final AtomicReference<Object> savedSnapshot, final NormalizedNode expectedRoot)
1759                     throws InterruptedException {
1760         assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1761
1762         assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1763
1764         verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1765
1766         latch.set(new CountDownLatch(1));
1767         savedSnapshot.set(null);
1768     }
1769
1770     private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) {
1771         assertEquals("Root node", expectedRoot,
1772             ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().orElseThrow());
1773     }
1774
1775     /**
1776      * This test simply verifies that the applySnapShot logic will work.
1777      */
1778     @Test
1779     public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1780         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1781             SCHEMA_CONTEXT);
1782
1783         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1784         putTransaction.write(TestModel.TEST_PATH,
1785             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1786         commitTransaction(store, putTransaction);
1787
1788
1789         final NormalizedNode expected = readStore(store, YangInstanceIdentifier.of());
1790
1791         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1792
1793         writeTransaction.delete(YangInstanceIdentifier.of());
1794         writeTransaction.write(YangInstanceIdentifier.of(), expected);
1795
1796         commitTransaction(store, writeTransaction);
1797
1798         final NormalizedNode actual = readStore(store, YangInstanceIdentifier.of());
1799
1800         assertEquals(expected, actual);
1801     }
1802
1803     @Test
1804     public void testRecoveryApplicable() {
1805
1806         final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1807                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1808
1809         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1810                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1811
1812         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1813                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1814
1815         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1816                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1817
1818         final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1819
1820         assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1821
1822         final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1823
1824         assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1825     }
1826
1827     @Test
1828     public void testOnDatastoreContext() {
1829         dataStoreContextBuilder.persistent(true);
1830
1831         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1832
1833         assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1834
1835         ShardTestKit.waitUntilLeader(shard);
1836
1837         shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1838
1839         assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1840
1841         shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1842
1843         assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1844     }
1845
1846     @Test
1847     public void testRegisterRoleChangeListener() {
1848         final ShardTestKit testKit = new ShardTestKit(getSystem());
1849         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1850             newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1851             "testRegisterRoleChangeListener");
1852
1853         ShardTestKit.waitUntilLeader(shard);
1854
1855         final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1856
1857         shard.tell(new RegisterRoleChangeListener(), listener);
1858
1859         MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1860
1861         ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1862             ShardLeaderStateChanged.class);
1863         final var dataTree = leaderStateChanged.localShardDataTree();
1864         assertNotNull("getLocalShardDataTree present", dataTree);
1865         assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(), dataTree);
1866
1867         MessageCollectorActor.clearMessages(listener);
1868
1869         // Force a leader change
1870
1871         shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1872
1873         leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1874         assertNull("getLocalShardDataTree present", leaderStateChanged.localShardDataTree());
1875     }
1876
1877     @Test
1878     public void testFollowerInitialSyncStatus() {
1879         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1880                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1881                 "testFollowerInitialSyncStatus");
1882
1883         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1884                 "member-1-shard-inventory-operational"));
1885
1886         assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1887
1888         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1889                 "member-1-shard-inventory-operational"));
1890
1891         assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1892     }
1893
1894     @Test
1895     public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1896         final ShardTestKit testKit = new ShardTestKit(getSystem());
1897         final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1898         dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1899             .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1900
1901         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1902         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1903             TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1904
1905         setupInMemorySnapshotStore();
1906
1907         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1908             newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1909             actorFactory.generateActorId(testName + "-shard"));
1910
1911         testKit.waitUntilNoLeader(shard);
1912
1913         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1914         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1915             RegisterDataTreeNotificationListenerReply.class);
1916         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1917
1918         shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1919             .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1920
1921         listener.waitForChangeEvents();
1922     }
1923
1924     @Test
1925     public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1926         final ShardTestKit testKit = new ShardTestKit(getSystem());
1927         final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1928         dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1929             .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1930
1931         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1932         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1933             TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1934
1935         setupInMemorySnapshotStore();
1936
1937         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1938             newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1939             actorFactory.generateActorId(testName + "-shard"));
1940
1941         testKit.waitUntilNoLeader(shard);
1942
1943         shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1944         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1945             RegisterDataTreeNotificationListenerReply.class);
1946         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1947
1948         final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1949         regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1950         testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1951
1952         shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1953             .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1954
1955         listener.expectNoMoreChanges("Received unexpected change after close");
1956     }
1957
1958     @Test
1959     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1960         final ShardTestKit testKit = new ShardTestKit(getSystem());
1961         final String testName = "testClusteredDataTreeChangeListenerRegistration";
1962         final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1963             MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1964
1965         final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1966             MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1967
1968         final TestActorRef<Shard> followerShard = actorFactory
1969                 .createTestActor(Shard.builder().id(followerShardID)
1970                     .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1971                     .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1972                         "akka://test/user/" + leaderShardID.toString()))
1973                     .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1974                     .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1975
1976         final TestActorRef<Shard> leaderShard = actorFactory
1977                 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1978                     .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1979                         "akka://test/user/" + followerShardID.toString()))
1980                     .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1981                     .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1982
1983         leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1984         final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1985         assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1986
1987         final YangInstanceIdentifier path = TestModel.TEST_PATH;
1988         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1989         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1990             actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1991
1992         followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1993         final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1994             RegisterDataTreeNotificationListenerReply.class);
1995         assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1996
1997         writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1998
1999         listener.waitForChangeEvents();
2000     }
2001
2002     @Test
2003     public void testServerRemoved() {
2004         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
2005                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2006
2007         final ActorRef shard = parent.underlyingActor().context().actorOf(
2008                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2009                 "testServerRemoved");
2010
2011         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2012
2013         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2014     }
2015 }