2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.base.Throwables;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.time.Duration;
37 import java.util.Collections;
38 import java.util.HashSet;
40 import java.util.Optional;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
78 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
79 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
80 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
83 import org.opendaylight.controller.cluster.raft.RaftActorContext;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
85 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
94 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
95 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
96 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
97 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
100 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.yangtools.concepts.Identifier;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
121 public class ShardTest extends AbstractShardTest {
122 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
123 + "InMemorySnapshotStore and journal recovery seq number will start from 1";
126 public void testRegisterDataTreeChangeListener() throws Exception {
127 final ShardTestKit testKit = new ShardTestKit(getSystem());
128 final TestActorRef<Shard> shard = actorFactory.createTestActor(
129 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
130 "testRegisterDataTreeChangeListener");
132 ShardTestKit.waitUntilLeader(shard);
134 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
136 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
137 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
138 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
140 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
142 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
143 RegisterDataTreeNotificationListenerReply.class);
144 final String replyPath = reply.getListenerRegistrationPath().toString();
145 assertTrue("Incorrect reply path: " + replyPath,
146 replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
148 final YangInstanceIdentifier path = TestModel.TEST_PATH;
149 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
151 listener.waitForChangeEvents();
152 listener.verifyOnInitialDataEvent();
154 final MockDataTreeChangeListener listener2 = new MockDataTreeChangeListener(1);
155 final ActorRef dclActor2 = actorFactory.createActor(DataTreeChangeListenerActor.props(listener2,
156 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener2");
158 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor2, false), testKit.getRef());
160 testKit.expectMsgClass(Duration.ofSeconds(3), RegisterDataTreeNotificationListenerReply.class);
162 listener2.waitForChangeEvents();
163 listener2.verifyNoOnInitialDataEvent();
166 @SuppressWarnings("serial")
168 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
169 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
170 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
171 final Creator<Shard> creator = new Creator<>() {
172 boolean firstElectionTimeout = true;
175 public Shard create() {
176 return new Shard(newShardBuilder()) {
178 public void handleCommand(final Object message) {
179 if (message instanceof ElectionTimeout && firstElectionTimeout) {
180 firstElectionTimeout = false;
181 final ActorRef self = getSelf();
183 Uninterruptibles.awaitUninterruptibly(
184 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
185 self.tell(message, self);
188 onFirstElectionTimeout.countDown();
190 super.handleCommand(message);
197 setupInMemorySnapshotStore();
199 final YangInstanceIdentifier path = TestModel.TEST_PATH;
200 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
201 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
202 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
204 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
205 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
206 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
208 final ShardTestKit testKit = new ShardTestKit(getSystem());
209 assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
211 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
212 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
213 RegisterDataTreeNotificationListenerReply.class);
214 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
216 shard.tell(FindLeader.INSTANCE, testKit.getRef());
217 final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
218 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
220 onChangeListenerRegistered.countDown();
222 // TODO: investigate why we do not receive data chage events
223 listener.waitForChangeEvents();
227 public void testCreateTransaction() {
228 final ShardTestKit testKit = new ShardTestKit(getSystem());
229 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
231 ShardTestKit.waitUntilLeader(shard);
233 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
235 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
236 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
238 final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
239 CreateTransactionReply.class);
241 final String path = reply.getTransactionPath().toString();
242 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
243 "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
244 shardID.getShardName(), shardID.getMemberName().getName())));
248 public void testCreateTransactionOnChain() {
249 final ShardTestKit testKit = new ShardTestKit(getSystem());
250 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
252 ShardTestKit.waitUntilLeader(shard);
254 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
255 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
257 final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
258 CreateTransactionReply.class);
260 final String path = reply.getTransactionPath().toString();
261 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
262 "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
263 shardID.getShardName(), shardID.getMemberName().getName())));
267 public void testPeerAddressResolved() {
268 final ShardTestKit testKit = new ShardTestKit(getSystem());
269 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
271 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
272 .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
273 .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
275 final String address = "akka://foobar";
276 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
278 shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
279 final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
280 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
284 public void testApplySnapshot() throws Exception {
286 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
287 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
289 ShardTestKit.waitUntilLeader(shard);
291 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
294 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
295 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
296 .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
297 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
299 writeToStore(store, TestModel.TEST_PATH, container);
301 final YangInstanceIdentifier root = YangInstanceIdentifier.empty();
302 final NormalizedNode expected = readStore(store, root);
304 final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
305 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
307 shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
309 final Stopwatch sw = Stopwatch.createStarted();
310 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
311 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
314 assertEquals("Root node", expected, readStore(shard, root));
316 } catch (final AssertionError e) {
321 fail("Snapshot was not applied");
325 public void testApplyState() throws Exception {
326 final TestActorRef<Shard> shard = actorFactory.createTestActor(
327 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
329 ShardTestKit.waitUntilLeader(shard);
331 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
334 final DataTreeModification writeMod = store.takeSnapshot().newModification();
335 final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
336 writeMod.write(TestModel.TEST_PATH, node);
339 final TransactionIdentifier tx = nextTransactionId();
340 shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
342 final Stopwatch sw = Stopwatch.createStarted();
343 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
344 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
346 final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH);
347 if (actual != null) {
348 assertEquals("Applied state", node, actual);
353 fail("State was not applied");
357 public void testDataTreeCandidateRecovery() throws Exception {
358 // Set up the InMemorySnapshotStore.
359 final DataTree source = setupInMemorySnapshotStore();
361 final DataTreeModification writeMod = source.takeSnapshot().newModification();
362 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
364 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
366 // Set up the InMemoryJournal.
367 InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
368 payloadForModification(source, writeMod, nextTransactionId())));
370 final int nListEntries = 16;
371 final Set<Integer> listEntryKeys = new HashSet<>();
373 // Add some ModificationPayload entries
374 for (int i = 1; i <= nListEntries; i++) {
375 listEntryKeys.add(i);
377 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
378 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
380 final DataTreeModification mod = source.takeSnapshot().newModification();
381 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
384 InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
385 payloadForModification(source, mod, nextTransactionId())));
388 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
389 new ApplyJournalEntries(nListEntries));
391 testRecovery(listEntryKeys, true);
395 @SuppressWarnings("checkstyle:IllegalCatch")
396 public void testConcurrentThreePhaseCommits() throws Exception {
397 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
398 final CountDownLatch commitLatch = new CountDownLatch(2);
400 final long timeoutSec = 5;
401 final Duration duration = Duration.ofSeconds(timeoutSec);
402 final Timeout timeout = Timeout.create(duration);
404 final TestActorRef<Shard> shard = actorFactory.createTestActor(
405 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
406 "testConcurrentThreePhaseCommits");
408 class OnFutureComplete extends OnComplete<Object> {
409 private final Class<?> expRespType;
411 OnFutureComplete(final Class<?> expRespType) {
412 this.expRespType = expRespType;
416 public void onComplete(final Throwable error, final Object resp) {
418 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
421 assertEquals("Commit response type", expRespType, resp.getClass());
423 } catch (final Exception e) {
429 void onSuccess(final Object resp) {
433 class OnCommitFutureComplete extends OnFutureComplete {
434 OnCommitFutureComplete() {
435 super(CommitTransactionReply.class);
439 public void onComplete(final Throwable error, final Object resp) {
440 super.onComplete(error, resp);
441 commitLatch.countDown();
445 class OnCanCommitFutureComplete extends OnFutureComplete {
446 private final TransactionIdentifier transactionID;
448 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
449 super(CanCommitTransactionReply.class);
450 this.transactionID = transactionID;
454 void onSuccess(final Object resp) {
455 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
456 assertTrue("Can commit", canCommitReply.getCanCommit());
458 final Future<Object> commitFuture = Patterns.ask(shard,
459 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
460 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
464 final ShardTestKit testKit = new ShardTestKit(getSystem());
465 ShardTestKit.waitUntilLeader(shard);
467 final TransactionIdentifier transactionID1 = nextTransactionId();
468 final TransactionIdentifier transactionID2 = nextTransactionId();
469 final TransactionIdentifier transactionID3 = nextTransactionId();
471 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
472 shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
473 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
474 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
475 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
477 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
478 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
479 final ReadyTransactionReply readyReply = ReadyTransactionReply
480 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
481 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
482 // Send the CanCommitTransaction message for the first Tx.
484 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
485 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
486 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
487 assertTrue("Can commit", canCommitReply.getCanCommit());
489 // Ready 2 more Tx's.
491 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
492 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
493 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
496 prepareBatchedModifications(transactionID3,
497 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
498 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
499 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
500 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
502 // Send the CanCommitTransaction message for the next 2 Tx's.
503 // These should get queued and
504 // processed after the first Tx completes.
506 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
507 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
509 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
510 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
512 // Send the CommitTransaction message for the first Tx. After it
513 // completes, it should
514 // trigger the 2nd Tx to proceed which should in turn then
517 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
518 testKit.expectMsgClass(duration, CommitTransactionReply.class);
520 // Wait for the next 2 Tx's to complete.
522 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
524 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
526 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
528 final Throwable t = caughtEx.get();
530 Throwables.propagateIfPossible(t, Exception.class);
531 throw new RuntimeException(t);
534 assertTrue("Commits complete", done);
536 // final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
537 // cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
538 // cohort3.getPreCommit(), cohort3.getCommit());
539 // inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
540 // inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
541 // inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
542 // inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
543 // inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
544 // inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
545 // inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
546 // inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
547 // inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
549 // Verify data in the data store.
551 verifyOuterListEntry(shard, 1);
553 verifyLastApplied(shard, 3);
557 public void testBatchedModificationsWithNoCommitOnReady() {
558 final ShardTestKit testKit = new ShardTestKit(getSystem());
559 final TestActorRef<Shard> shard = actorFactory.createTestActor(
560 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
561 "testBatchedModificationsWithNoCommitOnReady");
563 ShardTestKit.waitUntilLeader(shard);
565 final TransactionIdentifier transactionID = nextTransactionId();
566 final Duration duration = Duration.ofSeconds(5);
568 // Send a BatchedModifications to start a transaction.
570 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
571 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
572 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
574 // Send a couple more BatchedModifications.
576 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
577 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
579 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
581 shard.tell(newBatchedModifications(transactionID,
582 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
583 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
584 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
586 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
588 // Send the CanCommitTransaction message.
590 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
591 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
592 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
593 assertTrue("Can commit", canCommitReply.getCanCommit());
595 // Send the CommitTransaction message.
597 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
598 testKit.expectMsgClass(duration, CommitTransactionReply.class);
600 // Verify data in the data store.
602 verifyOuterListEntry(shard, 1);
606 public void testBatchedModificationsWithCommitOnReady() {
607 final ShardTestKit testKit = new ShardTestKit(getSystem());
608 final TestActorRef<Shard> shard = actorFactory.createTestActor(
609 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
610 "testBatchedModificationsWithCommitOnReady");
612 ShardTestKit.waitUntilLeader(shard);
614 final TransactionIdentifier transactionID = nextTransactionId();
615 final Duration duration = Duration.ofSeconds(5);
617 // Send a BatchedModifications to start a transaction.
619 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
620 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
621 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
623 // Send a couple more BatchedModifications.
625 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
626 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
628 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
630 shard.tell(newBatchedModifications(transactionID,
631 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
632 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
633 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
636 testKit.expectMsgClass(duration, CommitTransactionReply.class);
638 // Verify data in the data store.
639 verifyOuterListEntry(shard, 1);
642 @Test(expected = IllegalStateException.class)
643 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
644 final ShardTestKit testKit = new ShardTestKit(getSystem());
645 final TestActorRef<Shard> shard = actorFactory.createTestActor(
646 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
647 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
649 ShardTestKit.waitUntilLeader(shard);
651 final TransactionIdentifier transactionID = nextTransactionId();
652 final BatchedModifications batched = new BatchedModifications(transactionID,
653 DataStoreVersions.CURRENT_VERSION);
655 batched.setTotalMessagesSent(2);
657 shard.tell(batched, testKit.getRef());
659 final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
661 if (failure != null) {
662 Throwables.propagateIfPossible(failure.cause(), Exception.class);
663 throw new RuntimeException(failure.cause());
668 public void testBatchedModificationsWithOperationFailure() {
669 final ShardTestKit testKit = new ShardTestKit(getSystem());
670 final TestActorRef<Shard> shard = actorFactory.createTestActor(
671 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
672 "testBatchedModificationsWithOperationFailure");
674 ShardTestKit.waitUntilLeader(shard);
676 // Test merge with invalid data. An exception should occur when
677 // the merge is applied. Note that
678 // write will not validate the children for performance reasons.
680 final TransactionIdentifier transactionID = nextTransactionId();
682 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
683 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
684 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
686 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
687 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
688 shard.tell(batched, testKit.getRef());
689 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
691 final Throwable cause = failure.cause();
693 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
695 batched.setTotalMessagesSent(2);
697 shard.tell(batched, testKit.getRef());
699 failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
700 assertEquals("Failure cause", cause, failure.cause());
704 public void testBatchedModificationsOnTransactionChain() {
705 final ShardTestKit testKit = new ShardTestKit(getSystem());
706 final TestActorRef<Shard> shard = actorFactory.createTestActor(
707 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
708 "testBatchedModificationsOnTransactionChain");
710 ShardTestKit.waitUntilLeader(shard);
712 final LocalHistoryIdentifier historyId = nextHistoryId();
713 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
714 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
716 final Duration duration = Duration.ofSeconds(5);
718 // Send a BatchedModifications to start a chained write
719 // transaction and ready it.
721 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
722 final YangInstanceIdentifier path = TestModel.TEST_PATH;
723 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
724 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
726 // Create a read Tx on the same chain.
728 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
729 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
731 final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
732 CreateTransactionReply.class);
734 getSystem().actorSelection(createReply.getTransactionPath())
735 .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
736 final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
737 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
739 // Commit the write transaction.
741 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
742 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
743 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
744 assertTrue("Can commit", canCommitReply.getCanCommit());
746 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
747 testKit.expectMsgClass(duration, CommitTransactionReply.class);
749 // Verify data in the data store.
751 final NormalizedNode actualNode = readStore(shard, path);
752 assertEquals("Stored node", containerNode, actualNode);
756 public void testOnBatchedModificationsWhenNotLeader() {
757 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
758 final ShardTestKit testKit = new ShardTestKit(getSystem());
759 final Creator<Shard> creator = new Creator<>() {
760 private static final long serialVersionUID = 1L;
763 public Shard create() {
764 return new Shard(newShardBuilder()) {
766 protected boolean isLeader() {
767 return overrideLeaderCalls.get() ? false : super.isLeader();
771 public ActorSelection getLeader() {
772 return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
779 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
780 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
781 "testOnBatchedModificationsWhenNotLeader");
783 ShardTestKit.waitUntilLeader(shard);
785 overrideLeaderCalls.set(true);
787 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
788 DataStoreVersions.CURRENT_VERSION);
790 shard.tell(batched, ActorRef.noSender());
792 testKit.expectMsgEquals(batched);
796 public void testTransactionMessagesWithNoLeader() {
797 final ShardTestKit testKit = new ShardTestKit(getSystem());
798 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
799 .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
800 final TestActorRef<Shard> shard = actorFactory.createTestActor(
801 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
802 "testTransactionMessagesWithNoLeader");
804 testKit.waitUntilNoLeader(shard);
806 final TransactionIdentifier txId = nextTransactionId();
807 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
808 Failure failure = testKit.expectMsgClass(Failure.class);
809 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
811 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
812 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
813 failure = testKit.expectMsgClass(Failure.class);
814 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
816 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
818 failure = testKit.expectMsgClass(Failure.class);
819 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
823 public void testReadyWithReadWriteImmediateCommit() {
824 testReadyWithImmediateCommit(true);
828 public void testReadyWithWriteOnlyImmediateCommit() {
829 testReadyWithImmediateCommit(false);
832 private void testReadyWithImmediateCommit(final boolean readWrite) {
833 final ShardTestKit testKit = new ShardTestKit(getSystem());
834 final TestActorRef<Shard> shard = actorFactory.createTestActor(
835 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
836 "testReadyWithImmediateCommit-" + readWrite);
838 ShardTestKit.waitUntilLeader(shard);
840 final TransactionIdentifier transactionID = nextTransactionId();
841 final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
843 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
846 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
850 testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
852 final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
853 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
857 public void testReadyLocalTransactionWithImmediateCommit() {
858 final ShardTestKit testKit = new ShardTestKit(getSystem());
859 final TestActorRef<Shard> shard = actorFactory.createTestActor(
860 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
861 "testReadyLocalTransactionWithImmediateCommit");
863 ShardTestKit.waitUntilLeader(shard);
865 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
867 final DataTreeModification modification = dataStore.newModification();
869 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
870 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
871 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
872 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
874 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
876 final TransactionIdentifier txId = nextTransactionId();
877 modification.ready();
878 final ReadyLocalTransaction readyMessage =
879 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
881 shard.tell(readyMessage, testKit.getRef());
883 testKit.expectMsgClass(CommitTransactionReply.class);
885 final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
886 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
890 public void testReadyLocalTransactionWithThreePhaseCommit() {
891 final ShardTestKit testKit = new ShardTestKit(getSystem());
892 final TestActorRef<Shard> shard = actorFactory.createTestActor(
893 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
894 "testReadyLocalTransactionWithThreePhaseCommit");
896 ShardTestKit.waitUntilLeader(shard);
898 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
900 final DataTreeModification modification = dataStore.newModification();
902 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
903 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
904 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
905 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
907 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
909 final TransactionIdentifier txId = nextTransactionId();
910 modification.ready();
911 final ReadyLocalTransaction readyMessage =
912 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
914 shard.tell(readyMessage, testKit.getRef());
916 testKit.expectMsgClass(ReadyTransactionReply.class);
918 // Send the CanCommitTransaction message.
920 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
921 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
922 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
923 assertTrue("Can commit", canCommitReply.getCanCommit());
925 // Send the CanCommitTransaction message.
927 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
928 testKit.expectMsgClass(CommitTransactionReply.class);
930 final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
931 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
935 public void testReadWriteCommitWithPersistenceDisabled() {
936 dataStoreContextBuilder.persistent(false);
937 final ShardTestKit testKit = new ShardTestKit(getSystem());
938 final TestActorRef<Shard> shard = actorFactory.createTestActor(
939 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
940 "testCommitWithPersistenceDisabled");
942 ShardTestKit.waitUntilLeader(shard);
944 // Setup a simulated transactions with a mock cohort.
946 final Duration duration = Duration.ofSeconds(5);
948 final TransactionIdentifier transactionID = nextTransactionId();
949 final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
950 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
952 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
954 // Send the CanCommitTransaction message.
956 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
957 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
958 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
959 assertTrue("Can commit", canCommitReply.getCanCommit());
961 // Send the CanCommitTransaction message.
963 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
964 testKit.expectMsgClass(duration, CommitTransactionReply.class);
966 final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
967 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
971 public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
972 testCommitWhenTransactionHasModifications(true);
976 public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
977 testCommitWhenTransactionHasModifications(false);
980 private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
981 final ShardTestKit testKit = new ShardTestKit(getSystem());
982 final DataTree dataTree = createDelegatingMockDataTree();
983 final TestActorRef<Shard> shard = actorFactory.createTestActor(
984 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
985 "testCommitWhenTransactionHasModifications-" + readWrite);
987 ShardTestKit.waitUntilLeader(shard);
989 final Duration duration = Duration.ofSeconds(5);
990 final TransactionIdentifier transactionID = nextTransactionId();
993 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
994 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
996 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
997 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1000 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1002 // Send the CanCommitTransaction message.
1004 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1005 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1006 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1007 assertTrue("Can commit", canCommitReply.getCanCommit());
1009 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1010 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1012 final InOrder inOrder = inOrder(dataTree);
1013 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1014 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1015 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1017 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1019 Thread.sleep(HEARTBEAT_MILLIS * 2);
1021 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1022 final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1024 // Use MBean for verification
1025 // Committed transaction count should increase as usual
1026 assertEquals(1, shardStats.getCommittedTransactionsCount());
1028 // Commit index should advance 1 to account for disabling metadata
1029 assertEquals(1, shardStats.getCommitIndex());
1033 public void testCommitPhaseFailure() throws Exception {
1034 final ShardTestKit testKit = new ShardTestKit(getSystem());
1035 final DataTree dataTree = createDelegatingMockDataTree();
1036 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1037 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038 "testCommitPhaseFailure");
1040 ShardTestKit.waitUntilLeader(shard);
1042 final Duration duration = Duration.ofSeconds(5);
1043 final Timeout timeout = Timeout.create(duration);
1045 // Setup 2 simulated transactions with mock cohorts. The first
1049 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1050 .commit(any(DataTreeCandidate.class));
1052 final TransactionIdentifier transactionID1 = nextTransactionId();
1053 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1054 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1055 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1057 final TransactionIdentifier transactionID2 = nextTransactionId();
1058 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1059 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1060 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1062 // Send the CanCommitTransaction message for the first Tx.
1064 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1065 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1066 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1067 assertTrue("Can commit", canCommitReply.getCanCommit());
1069 // Send the CanCommitTransaction message for the 2nd Tx. This
1070 // should get queued and
1071 // processed after the first Tx completes.
1073 final Future<Object> canCommitFuture = Patterns.ask(shard,
1074 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1076 // Send the CommitTransaction message for the first Tx. This
1077 // should send back an error
1078 // and trigger the 2nd Tx to proceed.
1080 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1081 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1083 // Wait for the 2nd Tx to complete the canCommit phase.
1085 final CountDownLatch latch = new CountDownLatch(1);
1086 canCommitFuture.onComplete(new OnComplete<>() {
1088 public void onComplete(final Throwable failure, final Object resp) {
1091 }, getSystem().dispatcher());
1093 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1095 final InOrder inOrder = inOrder(dataTree);
1096 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1097 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1099 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1100 // validate performs wrapping and we capture that mock
1101 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1103 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1107 public void testPreCommitPhaseFailure() throws Exception {
1108 final ShardTestKit testKit = new ShardTestKit(getSystem());
1109 final DataTree dataTree = createDelegatingMockDataTree();
1110 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1111 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112 "testPreCommitPhaseFailure");
1114 ShardTestKit.waitUntilLeader(shard);
1116 final Duration duration = Duration.ofSeconds(5);
1117 final Timeout timeout = Timeout.create(duration);
1119 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1120 .prepare(any(DataTreeModification.class));
1122 final TransactionIdentifier transactionID1 = nextTransactionId();
1123 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1124 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1125 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1127 final TransactionIdentifier transactionID2 = nextTransactionId();
1128 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1129 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1130 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1132 // Send the CanCommitTransaction message for the first Tx.
1134 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1135 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1136 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1137 assertTrue("Can commit", canCommitReply.getCanCommit());
1139 // Send the CanCommitTransaction message for the 2nd Tx. This
1140 // should get queued and
1141 // processed after the first Tx completes.
1143 final Future<Object> canCommitFuture = Patterns.ask(shard,
1144 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1146 // Send the CommitTransaction message for the first Tx. This
1147 // should send back an error
1148 // and trigger the 2nd Tx to proceed.
1150 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1151 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1153 // Wait for the 2nd Tx to complete the canCommit phase.
1155 final CountDownLatch latch = new CountDownLatch(1);
1156 canCommitFuture.onComplete(new OnComplete<>() {
1158 public void onComplete(final Throwable failure, final Object resp) {
1161 }, getSystem().dispatcher());
1163 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1165 final InOrder inOrder = inOrder(dataTree);
1166 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1167 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1168 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1172 public void testCanCommitPhaseFailure() throws Exception {
1173 final ShardTestKit testKit = new ShardTestKit(getSystem());
1174 final DataTree dataTree = createDelegatingMockDataTree();
1175 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1176 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1177 "testCanCommitPhaseFailure");
1179 ShardTestKit.waitUntilLeader(shard);
1181 final Duration duration = Duration.ofSeconds(5);
1182 final TransactionIdentifier transactionID1 = nextTransactionId();
1184 doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure"))
1185 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1187 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1188 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1189 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1191 // Send the CanCommitTransaction message.
1193 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1194 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1196 // Send another can commit to ensure the failed one got cleaned
1199 final TransactionIdentifier transactionID2 = nextTransactionId();
1200 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1201 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1202 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1204 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1205 final CanCommitTransactionReply reply = CanCommitTransactionReply
1206 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1207 assertTrue("getCanCommit", reply.getCanCommit());
1211 public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1212 testImmediateCommitWithCanCommitPhaseFailure(true);
1213 testImmediateCommitWithCanCommitPhaseFailure(false);
1216 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1217 final ShardTestKit testKit = new ShardTestKit(getSystem());
1218 final DataTree dataTree = createDelegatingMockDataTree();
1219 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1220 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1221 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1223 ShardTestKit.waitUntilLeader(shard);
1225 doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure"))
1226 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1228 final Duration duration = Duration.ofSeconds(5);
1230 final TransactionIdentifier transactionID1 = nextTransactionId();
1233 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1234 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1236 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1237 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1240 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1242 // Send another can commit to ensure the failed one got cleaned
1245 final TransactionIdentifier transactionID2 = nextTransactionId();
1247 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1248 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1250 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1251 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1254 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1258 public void testAbortWithCommitPending() {
1259 final ShardTestKit testKit = new ShardTestKit(getSystem());
1260 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1262 void persistPayload(final Identifier id, final Payload payload,
1263 final boolean batchHint) {
1264 // Simulate an AbortTransaction message occurring during
1265 // replication, after
1266 // persisting and before finishing the commit to the
1269 doAbortTransaction(id, null);
1270 super.persistPayload(id, payload, batchHint);
1274 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1275 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1276 "testAbortWithCommitPending");
1278 ShardTestKit.waitUntilLeader(shard);
1280 final Duration duration = Duration.ofSeconds(5);
1282 final TransactionIdentifier transactionID = nextTransactionId();
1284 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1285 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1286 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1288 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1289 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1291 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1292 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1294 final NormalizedNode node = readStore(shard, TestModel.TEST_PATH);
1296 // Since we're simulating an abort occurring during replication
1297 // and before finish commit,
1298 // the data should still get written to the in-memory store
1299 // since we've gotten past
1300 // canCommit and preCommit and persisted the data.
1301 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1305 public void testTransactionCommitTimeout() throws Exception {
1306 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1307 final ShardTestKit testKit = new ShardTestKit(getSystem());
1308 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1309 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1310 "testTransactionCommitTimeout");
1312 ShardTestKit.waitUntilLeader(shard);
1314 final Duration duration = Duration.ofSeconds(5);
1316 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1317 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1318 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1320 // Ready 2 Tx's - the first will timeout
1322 final TransactionIdentifier transactionID1 = nextTransactionId();
1324 prepareBatchedModifications(transactionID1,
1325 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1326 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1327 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1329 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1331 final TransactionIdentifier transactionID2 = nextTransactionId();
1332 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1333 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1335 prepareBatchedModifications(transactionID2, listNodePath,
1336 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1337 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1339 // canCommit 1st Tx. We don't send the commit so it should
1342 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1343 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1345 // canCommit the 2nd Tx - it should complete after the 1st Tx
1348 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1349 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1351 // Try to commit the 1st Tx - should fail as it's not the
1354 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1355 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1357 // Commit the 2nd Tx.
1359 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1360 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1362 final NormalizedNode node = readStore(shard, listNodePath);
1363 assertNotNull(listNodePath + " not found", node);
1368 // public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1369 // dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1371 // new ShardTestKit(getSystem()) {{
1372 // final TestActorRef<Shard> shard = actorFactory.createTestActor(
1373 // newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1374 // "testTransactionCommitQueueCapacityExceeded");
1376 // waitUntilLeader(shard);
1378 // final FiniteDuration duration = duration("5 seconds");
1380 // final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1382 // final TransactionIdentifier transactionID1 = nextTransactionId();
1383 // final MutableCompositeModification modification1 = new MutableCompositeModification();
1384 // final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1385 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1388 // final TransactionIdentifier transactionID2 = nextTransactionId();
1389 // final MutableCompositeModification modification2 = new MutableCompositeModification();
1390 // final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1391 // TestModel.OUTER_LIST_PATH,
1392 // ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1395 // final TransactionIdentifier transactionID3 = nextTransactionId();
1396 // final MutableCompositeModification modification3 = new MutableCompositeModification();
1397 // final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1398 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1401 // // Ready the Tx's
1403 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1404 // modification1), getRef());
1405 // expectMsgClass(duration, ReadyTransactionReply.class);
1407 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1408 // modification2), getRef());
1409 // expectMsgClass(duration, ReadyTransactionReply.class);
1411 // // The 3rd Tx should exceed queue capacity and fail.
1413 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1414 // modification3), getRef());
1415 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1417 // // canCommit 1st Tx.
1419 // shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1420 // expectMsgClass(duration, CanCommitTransactionReply.class);
1422 // // canCommit the 2nd Tx - it should get queued.
1424 // shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1426 // // canCommit the 3rd Tx - should exceed queue capacity and fail.
1428 // shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1429 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1434 public void testTransactionCommitWithPriorExpiredCohortEntries() {
1435 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1436 final ShardTestKit testKit = new ShardTestKit(getSystem());
1437 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1438 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1439 "testTransactionCommitWithPriorExpiredCohortEntries");
1441 ShardTestKit.waitUntilLeader(shard);
1443 final Duration duration = Duration.ofSeconds(5);
1445 final TransactionIdentifier transactionID1 = nextTransactionId();
1446 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1447 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1448 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1450 final TransactionIdentifier transactionID2 = nextTransactionId();
1451 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1452 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1453 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1455 final TransactionIdentifier transactionID3 = nextTransactionId();
1456 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1457 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1458 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1460 // All Tx's are readied. We'll send canCommit for the last one
1461 // but not the others. The others
1462 // should expire from the queue and the last one should be
1465 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1466 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1470 public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1471 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1472 final ShardTestKit testKit = new ShardTestKit(getSystem());
1473 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1474 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1475 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1477 ShardTestKit.waitUntilLeader(shard);
1479 final Duration duration = Duration.ofSeconds(5);
1481 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1483 final TransactionIdentifier transactionID1 = nextTransactionId();
1484 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1485 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1486 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1488 // CanCommit the first Tx so it's the current in-progress Tx.
1490 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1491 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1493 // Ready the second Tx.
1495 final TransactionIdentifier transactionID2 = nextTransactionId();
1496 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1497 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1498 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1500 // Ready the third Tx.
1502 final TransactionIdentifier transactionID3 = nextTransactionId();
1503 final DataTreeModification modification3 = dataStore.newModification();
1504 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1505 .apply(modification3);
1506 modification3.ready();
1507 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1508 true, Optional.empty());
1509 shard.tell(readyMessage, testKit.getRef());
1511 // Commit the first Tx. After completing, the second should
1512 // expire from the queue and the third
1515 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1516 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1518 // Expect commit reply from the third Tx.
1520 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1522 final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH);
1523 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1527 public void testCanCommitBeforeReadyFailure() {
1528 final ShardTestKit testKit = new ShardTestKit(getSystem());
1529 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1530 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1531 "testCanCommitBeforeReadyFailure");
1533 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1534 testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
1538 public void testAbortAfterCanCommit() throws Exception {
1539 final ShardTestKit testKit = new ShardTestKit(getSystem());
1540 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1541 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1543 ShardTestKit.waitUntilLeader(shard);
1545 final Duration duration = Duration.ofSeconds(5);
1546 final Timeout timeout = Timeout.create(duration);
1548 // Ready 2 transactions - the first one will be aborted.
1550 final TransactionIdentifier transactionID1 = nextTransactionId();
1551 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1552 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1553 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1555 final TransactionIdentifier transactionID2 = nextTransactionId();
1556 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1557 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1558 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1560 // Send the CanCommitTransaction message for the first Tx.
1562 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1563 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1564 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1565 assertTrue("Can commit", canCommitReply.getCanCommit());
1567 // Send the CanCommitTransaction message for the 2nd Tx. This
1568 // should get queued and
1569 // processed after the first Tx completes.
1571 final Future<Object> canCommitFuture = Patterns.ask(shard,
1572 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1574 // Send the AbortTransaction message for the first Tx. This
1575 // should trigger the 2nd
1578 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1579 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1581 // Wait for the 2nd Tx to complete the canCommit phase.
1583 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
1584 FiniteDuration.create(5, TimeUnit.SECONDS));
1585 assertTrue("Can commit", canCommitReply.getCanCommit());
1589 public void testAbortAfterReady() {
1590 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1591 final ShardTestKit testKit = new ShardTestKit(getSystem());
1592 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1593 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1595 ShardTestKit.waitUntilLeader(shard);
1597 final Duration duration = Duration.ofSeconds(5);
1601 final TransactionIdentifier transactionID1 = nextTransactionId();
1602 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1603 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1604 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1606 // Send the AbortTransaction message.
1608 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1609 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1611 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1613 // Now send CanCommitTransaction - should fail.
1615 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1616 final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1617 assertTrue("Failure type", failure instanceof IllegalStateException);
1619 // Ready and CanCommit another and verify success.
1621 final TransactionIdentifier transactionID2 = nextTransactionId();
1622 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1623 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1624 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1626 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1627 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1631 public void testAbortQueuedTransaction() {
1632 final ShardTestKit testKit = new ShardTestKit(getSystem());
1633 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1634 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1636 ShardTestKit.waitUntilLeader(shard);
1638 final Duration duration = Duration.ofSeconds(5);
1642 final TransactionIdentifier transactionID1 = nextTransactionId();
1643 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1644 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1645 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1647 final TransactionIdentifier transactionID2 = nextTransactionId();
1648 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1649 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1650 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1652 final TransactionIdentifier transactionID3 = nextTransactionId();
1653 shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1654 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1655 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1657 // Abort the second tx while it's queued.
1659 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1660 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1662 // Commit the other 2.
1664 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1665 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1667 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1668 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1670 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1671 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1673 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1674 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1676 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1680 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1681 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1685 public void testCreateSnapshot() throws Exception {
1686 testCreateSnapshot(true, "testCreateSnapshot");
1689 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1690 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1692 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1693 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1694 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1699 public void saveSnapshot(final Object obj) {
1700 savedSnapshot.set(obj);
1701 super.saveSnapshot(obj);
1705 dataStoreContextBuilder.persistent(persistent);
1707 final class TestShard extends Shard {
1709 TestShard(final AbstractBuilder<?, ?> builder) {
1711 setPersistence(new TestPersistentDataProvider(super.persistence()));
1715 public void handleCommand(final Object message) {
1716 super.handleCommand(message);
1718 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1719 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1720 latch.get().countDown();
1725 public RaftActorContext getRaftActorContext() {
1726 return super.getRaftActorContext();
1730 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1732 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1733 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1735 ShardTestKit.waitUntilLeader(shard);
1736 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1738 final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.empty());
1740 // Trigger creation of a snapshot by ensuring
1741 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1742 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1743 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1745 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1746 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1749 private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1750 final AtomicReference<Object> savedSnapshot, final NormalizedNode expectedRoot)
1751 throws InterruptedException {
1752 assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1754 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1756 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1758 latch.set(new CountDownLatch(1));
1759 savedSnapshot.set(null);
1762 private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) {
1763 final NormalizedNode actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get();
1764 assertEquals("Root node", expectedRoot, actual);
1768 * This test simply verifies that the applySnapShot logic will work.
1771 public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1772 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1775 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1776 putTransaction.write(TestModel.TEST_PATH,
1777 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1778 commitTransaction(store, putTransaction);
1781 final NormalizedNode expected = readStore(store, YangInstanceIdentifier.empty());
1783 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1785 writeTransaction.delete(YangInstanceIdentifier.empty());
1786 writeTransaction.write(YangInstanceIdentifier.empty(), expected);
1788 commitTransaction(store, writeTransaction);
1790 final NormalizedNode actual = readStore(store, YangInstanceIdentifier.empty());
1792 assertEquals(expected, actual);
1796 public void testRecoveryApplicable() {
1798 final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1799 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1801 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1802 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1804 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1805 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1807 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1808 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1810 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1812 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1814 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1816 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1820 public void testOnDatastoreContext() {
1821 dataStoreContextBuilder.persistent(true);
1823 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1825 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1827 ShardTestKit.waitUntilLeader(shard);
1829 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1831 assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1833 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1835 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1839 public void testRegisterRoleChangeListener() {
1840 final ShardTestKit testKit = new ShardTestKit(getSystem());
1841 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1842 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1843 "testRegisterRoleChangeListener");
1845 ShardTestKit.waitUntilLeader(shard);
1847 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1849 shard.tell(new RegisterRoleChangeListener(), listener);
1851 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1853 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1854 ShardLeaderStateChanged.class);
1855 assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1856 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1857 leaderStateChanged.getLocalShardDataTree().get());
1859 MessageCollectorActor.clearMessages(listener);
1861 // Force a leader change
1863 shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1865 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1866 assertFalse("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1870 public void testFollowerInitialSyncStatus() {
1871 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1872 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1873 "testFollowerInitialSyncStatus");
1875 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1876 "member-1-shard-inventory-operational"));
1878 assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1880 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1881 "member-1-shard-inventory-operational"));
1883 assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1887 public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1888 final ShardTestKit testKit = new ShardTestKit(getSystem());
1889 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1890 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1891 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1893 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1894 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1895 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1897 setupInMemorySnapshotStore();
1899 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1900 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1901 actorFactory.generateActorId(testName + "-shard"));
1903 testKit.waitUntilNoLeader(shard);
1905 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1906 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1907 RegisterDataTreeNotificationListenerReply.class);
1908 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1910 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1911 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1913 listener.waitForChangeEvents();
1917 public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1918 final ShardTestKit testKit = new ShardTestKit(getSystem());
1919 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1920 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1921 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1923 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1924 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1925 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1927 setupInMemorySnapshotStore();
1929 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1930 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1931 actorFactory.generateActorId(testName + "-shard"));
1933 testKit.waitUntilNoLeader(shard);
1935 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1936 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1937 RegisterDataTreeNotificationListenerReply.class);
1938 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1940 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1941 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1942 testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1944 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1945 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1947 listener.expectNoMoreChanges("Received unexpected change after close");
1951 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1952 final ShardTestKit testKit = new ShardTestKit(getSystem());
1953 final String testName = "testClusteredDataTreeChangeListenerRegistration";
1954 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1955 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1957 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1958 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1960 final TestActorRef<Shard> followerShard = actorFactory
1961 .createTestActor(Shard.builder().id(followerShardID)
1962 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1963 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1964 "akka://test/user/" + leaderShardID.toString()))
1965 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1966 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1968 final TestActorRef<Shard> leaderShard = actorFactory
1969 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1970 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1971 "akka://test/user/" + followerShardID.toString()))
1972 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1973 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1975 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1976 final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1977 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1979 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1980 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1981 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1982 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1984 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1985 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1986 RegisterDataTreeNotificationListenerReply.class);
1987 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1989 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1991 listener.waitForChangeEvents();
1995 public void testServerRemoved() {
1996 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
1997 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1999 final ActorRef shard = parent.underlyingActor().context().actorOf(
2000 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2001 "testServerRemoved");
2003 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2005 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);