2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.base.Throwables;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.time.Duration;
37 import java.util.Collections;
38 import java.util.HashSet;
40 import java.util.Optional;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
78 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
79 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
80 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
83 import org.opendaylight.controller.cluster.raft.RaftActorContext;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
85 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
94 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
95 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
96 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
97 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
100 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.yangtools.concepts.Identifier;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
121 public class ShardTest extends AbstractShardTest {
122 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
123 + "InMemorySnapshotStore and journal recovery seq number will start from 1";
126 public void testRegisterDataTreeChangeListener() throws Exception {
127 final ShardTestKit testKit = new ShardTestKit(getSystem());
128 final TestActorRef<Shard> shard = actorFactory.createTestActor(
129 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
130 "testRegisterDataTreeChangeListener");
132 ShardTestKit.waitUntilLeader(shard);
134 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
136 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
137 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
138 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
140 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
142 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
143 RegisterDataTreeNotificationListenerReply.class);
144 final String replyPath = reply.getListenerRegistrationPath().toString();
145 assertTrue("Incorrect reply path: " + replyPath,
146 replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
148 final YangInstanceIdentifier path = TestModel.TEST_PATH;
149 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
151 listener.waitForChangeEvents();
152 listener.verifyOnInitialDataEvent();
154 final MockDataTreeChangeListener listener2 = new MockDataTreeChangeListener(1);
155 final ActorRef dclActor2 = actorFactory.createActor(DataTreeChangeListenerActor.props(listener2,
156 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener2");
158 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor2, false), testKit.getRef());
160 testKit.expectMsgClass(Duration.ofSeconds(3), RegisterDataTreeNotificationListenerReply.class);
162 listener2.waitForChangeEvents();
163 listener2.verifyNoOnInitialDataEvent();
166 @SuppressWarnings("serial")
168 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
169 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
170 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
171 final Creator<Shard> creator = new Creator<>() {
172 boolean firstElectionTimeout = true;
175 public Shard create() {
176 return new Shard(newShardBuilder()) {
178 public void handleCommand(final Object message) {
179 if (message instanceof ElectionTimeout && firstElectionTimeout) {
180 firstElectionTimeout = false;
181 final ActorRef self = getSelf();
183 Uninterruptibles.awaitUninterruptibly(
184 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
185 self.tell(message, self);
188 onFirstElectionTimeout.countDown();
190 super.handleCommand(message);
197 setupInMemorySnapshotStore();
199 final YangInstanceIdentifier path = TestModel.TEST_PATH;
200 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
201 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
202 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
204 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
205 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
206 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
208 final ShardTestKit testKit = new ShardTestKit(getSystem());
209 assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
211 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
212 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
213 RegisterDataTreeNotificationListenerReply.class);
214 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
216 shard.tell(FindLeader.INSTANCE, testKit.getRef());
217 final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
218 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
220 onChangeListenerRegistered.countDown();
222 // TODO: investigate why we do not receive data chage events
223 listener.waitForChangeEvents();
227 public void testCreateTransaction() {
228 final ShardTestKit testKit = new ShardTestKit(getSystem());
229 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
231 ShardTestKit.waitUntilLeader(shard);
233 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
235 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
236 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
238 final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
239 CreateTransactionReply.class);
241 final String path = reply.getTransactionPath().toString();
242 assertTrue("Unexpected transaction path " + path, path.contains(String.format(
243 "/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
244 shardID.getShardName(), shardID.getMemberName().getName())));
248 public void testCreateTransactionOnChain() {
249 final ShardTestKit testKit = new ShardTestKit(getSystem());
250 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
252 ShardTestKit.waitUntilLeader(shard);
254 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
255 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
257 final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
258 CreateTransactionReply.class);
260 final String path = reply.getTransactionPath().toString();
261 assertTrue("Unexpected transaction path " + path, path.contains(String.format(
262 "/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
263 shardID.getShardName(), shardID.getMemberName().getName())));
267 public void testPeerAddressResolved() {
268 final ShardTestKit testKit = new ShardTestKit(getSystem());
269 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
271 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
272 .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
273 .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
275 final String address = "akka://foobar";
276 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
278 shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
279 final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
280 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
284 public void testApplySnapshot() throws Exception {
286 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
287 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
289 ShardTestKit.waitUntilLeader(shard);
291 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
294 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
295 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
296 .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
297 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
299 writeToStore(store, TestModel.TEST_PATH, container);
301 final YangInstanceIdentifier root = YangInstanceIdentifier.empty();
302 final NormalizedNode expected = readStore(store, root);
304 final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
305 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
307 shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
309 final Stopwatch sw = Stopwatch.createStarted();
310 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
311 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
314 assertEquals("Root node", expected, readStore(shard, root));
316 } catch (final AssertionError e) {
321 fail("Snapshot was not applied");
325 public void testApplyState() throws Exception {
326 final TestActorRef<Shard> shard = actorFactory.createTestActor(
327 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
329 ShardTestKit.waitUntilLeader(shard);
331 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
334 final DataTreeModification writeMod = store.takeSnapshot().newModification();
335 final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
336 writeMod.write(TestModel.TEST_PATH, node);
339 final TransactionIdentifier tx = nextTransactionId();
340 shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
342 final Stopwatch sw = Stopwatch.createStarted();
343 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
344 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
346 final NormalizedNode actual = readStore(shard, TestModel.TEST_PATH);
347 if (actual != null) {
348 assertEquals("Applied state", node, actual);
353 fail("State was not applied");
357 public void testDataTreeCandidateRecovery() throws Exception {
358 // Set up the InMemorySnapshotStore.
359 final DataTree source = setupInMemorySnapshotStore();
361 final DataTreeModification writeMod = source.takeSnapshot().newModification();
362 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
364 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
366 // Set up the InMemoryJournal.
367 InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
368 payloadForModification(source, writeMod, nextTransactionId())));
370 final int nListEntries = 16;
371 final Set<Integer> listEntryKeys = new HashSet<>();
373 // Add some ModificationPayload entries
374 for (int i = 1; i <= nListEntries; i++) {
375 listEntryKeys.add(i);
377 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
378 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
380 final DataTreeModification mod = source.takeSnapshot().newModification();
381 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
384 InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
385 payloadForModification(source, mod, nextTransactionId())));
388 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
389 new ApplyJournalEntries(nListEntries));
391 testRecovery(listEntryKeys, true);
395 @SuppressWarnings("checkstyle:IllegalCatch")
396 public void testConcurrentThreePhaseCommits() throws Exception {
397 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
398 final CountDownLatch commitLatch = new CountDownLatch(2);
400 final long timeoutSec = 5;
401 final Duration duration = Duration.ofSeconds(timeoutSec);
402 final Timeout timeout = Timeout.create(duration);
404 final TestActorRef<Shard> shard = actorFactory.createTestActor(
405 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
406 "testConcurrentThreePhaseCommits");
408 class OnFutureComplete extends OnComplete<Object> {
409 private final Class<?> expRespType;
411 OnFutureComplete(final Class<?> expRespType) {
412 this.expRespType = expRespType;
416 public void onComplete(final Throwable error, final Object resp) {
418 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
421 assertEquals("Commit response type", expRespType, resp.getClass());
423 } catch (final Exception e) {
429 void onSuccess(final Object resp) {
433 class OnCommitFutureComplete extends OnFutureComplete {
434 OnCommitFutureComplete() {
435 super(CommitTransactionReply.class);
439 public void onComplete(final Throwable error, final Object resp) {
440 super.onComplete(error, resp);
441 commitLatch.countDown();
445 class OnCanCommitFutureComplete extends OnFutureComplete {
446 private final TransactionIdentifier transactionID;
448 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
449 super(CanCommitTransactionReply.class);
450 this.transactionID = transactionID;
454 void onSuccess(final Object resp) {
455 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
456 assertTrue("Can commit", canCommitReply.getCanCommit());
458 final Future<Object> commitFuture = Patterns.ask(shard,
459 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
460 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
464 final ShardTestKit testKit = new ShardTestKit(getSystem());
465 ShardTestKit.waitUntilLeader(shard);
467 final TransactionIdentifier transactionID1 = nextTransactionId();
468 final TransactionIdentifier transactionID2 = nextTransactionId();
469 final TransactionIdentifier transactionID3 = nextTransactionId();
471 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
472 shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
473 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
474 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
475 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
477 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
478 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
479 final ReadyTransactionReply readyReply = ReadyTransactionReply
480 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
482 String pathSuffix = shard.path().toString().replaceFirst("akka://test", "");
483 assertTrue("Cohort path", readyReply.getCohortPath().endsWith(pathSuffix));
484 // Send the CanCommitTransaction message for the first Tx.
486 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
487 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
488 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
489 assertTrue("Can commit", canCommitReply.getCanCommit());
491 // Ready 2 more Tx's.
493 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
494 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
495 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
498 prepareBatchedModifications(transactionID3,
499 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
500 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
501 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
502 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
504 // Send the CanCommitTransaction message for the next 2 Tx's.
505 // These should get queued and
506 // processed after the first Tx completes.
508 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
509 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
511 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
512 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
514 // Send the CommitTransaction message for the first Tx. After it
515 // completes, it should
516 // trigger the 2nd Tx to proceed which should in turn then
519 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
520 testKit.expectMsgClass(duration, CommitTransactionReply.class);
522 // Wait for the next 2 Tx's to complete.
524 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
526 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
528 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
530 final Throwable t = caughtEx.get();
532 Throwables.propagateIfPossible(t, Exception.class);
533 throw new RuntimeException(t);
536 assertTrue("Commits complete", done);
538 // final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
539 // cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
540 // cohort3.getPreCommit(), cohort3.getCommit());
541 // inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
542 // inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
543 // inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
544 // inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
545 // inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
546 // inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
547 // inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
548 // inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
549 // inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
551 // Verify data in the data store.
553 verifyOuterListEntry(shard, 1);
555 verifyLastApplied(shard, 3);
559 public void testBatchedModificationsWithNoCommitOnReady() {
560 final ShardTestKit testKit = new ShardTestKit(getSystem());
561 final TestActorRef<Shard> shard = actorFactory.createTestActor(
562 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
563 "testBatchedModificationsWithNoCommitOnReady");
565 ShardTestKit.waitUntilLeader(shard);
567 final TransactionIdentifier transactionID = nextTransactionId();
568 final Duration duration = Duration.ofSeconds(5);
570 // Send a BatchedModifications to start a transaction.
572 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
573 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
574 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
576 // Send a couple more BatchedModifications.
578 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
579 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
581 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
583 shard.tell(newBatchedModifications(transactionID,
584 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
585 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
586 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
588 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
590 // Send the CanCommitTransaction message.
592 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
593 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
594 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
595 assertTrue("Can commit", canCommitReply.getCanCommit());
597 // Send the CommitTransaction message.
599 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
600 testKit.expectMsgClass(duration, CommitTransactionReply.class);
602 // Verify data in the data store.
604 verifyOuterListEntry(shard, 1);
608 public void testBatchedModificationsWithCommitOnReady() {
609 final ShardTestKit testKit = new ShardTestKit(getSystem());
610 final TestActorRef<Shard> shard = actorFactory.createTestActor(
611 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
612 "testBatchedModificationsWithCommitOnReady");
614 ShardTestKit.waitUntilLeader(shard);
616 final TransactionIdentifier transactionID = nextTransactionId();
617 final Duration duration = Duration.ofSeconds(5);
619 // Send a BatchedModifications to start a transaction.
621 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
622 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
623 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
625 // Send a couple more BatchedModifications.
627 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
628 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
630 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
632 shard.tell(newBatchedModifications(transactionID,
633 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
634 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
635 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
638 testKit.expectMsgClass(duration, CommitTransactionReply.class);
640 // Verify data in the data store.
641 verifyOuterListEntry(shard, 1);
644 @Test(expected = IllegalStateException.class)
645 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
646 final ShardTestKit testKit = new ShardTestKit(getSystem());
647 final TestActorRef<Shard> shard = actorFactory.createTestActor(
648 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
649 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
651 ShardTestKit.waitUntilLeader(shard);
653 final TransactionIdentifier transactionID = nextTransactionId();
654 final BatchedModifications batched = new BatchedModifications(transactionID,
655 DataStoreVersions.CURRENT_VERSION);
657 batched.setTotalMessagesSent(2);
659 shard.tell(batched, testKit.getRef());
661 final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
663 if (failure != null) {
664 Throwables.propagateIfPossible(failure.cause(), Exception.class);
665 throw new RuntimeException(failure.cause());
670 public void testBatchedModificationsWithOperationFailure() {
671 final ShardTestKit testKit = new ShardTestKit(getSystem());
672 final TestActorRef<Shard> shard = actorFactory.createTestActor(
673 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
674 "testBatchedModificationsWithOperationFailure");
676 ShardTestKit.waitUntilLeader(shard);
678 // Test merge with invalid data. An exception should occur when
679 // the merge is applied. Note that
680 // write will not validate the children for performance reasons.
682 final TransactionIdentifier transactionID = nextTransactionId();
684 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
685 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
686 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
688 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
689 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
690 shard.tell(batched, testKit.getRef());
691 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
693 final Throwable cause = failure.cause();
695 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
697 batched.setTotalMessagesSent(2);
699 shard.tell(batched, testKit.getRef());
701 failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
702 assertEquals("Failure cause", cause, failure.cause());
706 public void testBatchedModificationsOnTransactionChain() {
707 final ShardTestKit testKit = new ShardTestKit(getSystem());
708 final TestActorRef<Shard> shard = actorFactory.createTestActor(
709 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
710 "testBatchedModificationsOnTransactionChain");
712 ShardTestKit.waitUntilLeader(shard);
714 final LocalHistoryIdentifier historyId = nextHistoryId();
715 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
716 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
718 final Duration duration = Duration.ofSeconds(5);
720 // Send a BatchedModifications to start a chained write
721 // transaction and ready it.
723 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
724 final YangInstanceIdentifier path = TestModel.TEST_PATH;
725 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
726 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
728 // Create a read Tx on the same chain.
730 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
731 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
733 final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
734 CreateTransactionReply.class);
736 getSystem().actorSelection(createReply.getTransactionPath())
737 .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
738 final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
739 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
741 // Commit the write transaction.
743 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
744 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
745 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
746 assertTrue("Can commit", canCommitReply.getCanCommit());
748 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
749 testKit.expectMsgClass(duration, CommitTransactionReply.class);
751 // Verify data in the data store.
753 final NormalizedNode actualNode = readStore(shard, path);
754 assertEquals("Stored node", containerNode, actualNode);
758 public void testOnBatchedModificationsWhenNotLeader() {
759 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
760 final ShardTestKit testKit = new ShardTestKit(getSystem());
761 final Creator<Shard> creator = new Creator<>() {
762 private static final long serialVersionUID = 1L;
765 public Shard create() {
766 return new Shard(newShardBuilder()) {
768 protected boolean isLeader() {
769 return overrideLeaderCalls.get() ? false : super.isLeader();
773 public ActorSelection getLeader() {
774 return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
781 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
782 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
783 "testOnBatchedModificationsWhenNotLeader");
785 ShardTestKit.waitUntilLeader(shard);
787 overrideLeaderCalls.set(true);
789 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
790 DataStoreVersions.CURRENT_VERSION);
792 shard.tell(batched, ActorRef.noSender());
794 testKit.expectMsgEquals(batched);
798 public void testTransactionMessagesWithNoLeader() {
799 final ShardTestKit testKit = new ShardTestKit(getSystem());
800 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
801 .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
802 final TestActorRef<Shard> shard = actorFactory.createTestActor(
803 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
804 "testTransactionMessagesWithNoLeader");
806 testKit.waitUntilNoLeader(shard);
808 final TransactionIdentifier txId = nextTransactionId();
809 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
810 Failure failure = testKit.expectMsgClass(Failure.class);
811 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
813 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
814 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
815 failure = testKit.expectMsgClass(Failure.class);
816 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
818 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
820 failure = testKit.expectMsgClass(Failure.class);
821 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
825 public void testReadyWithReadWriteImmediateCommit() {
826 testReadyWithImmediateCommit(true);
830 public void testReadyWithWriteOnlyImmediateCommit() {
831 testReadyWithImmediateCommit(false);
834 private void testReadyWithImmediateCommit(final boolean readWrite) {
835 final ShardTestKit testKit = new ShardTestKit(getSystem());
836 final TestActorRef<Shard> shard = actorFactory.createTestActor(
837 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
838 "testReadyWithImmediateCommit-" + readWrite);
840 ShardTestKit.waitUntilLeader(shard);
842 final TransactionIdentifier transactionID = nextTransactionId();
843 final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
845 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
848 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
852 testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
854 final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
855 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
859 public void testReadyLocalTransactionWithImmediateCommit() {
860 final ShardTestKit testKit = new ShardTestKit(getSystem());
861 final TestActorRef<Shard> shard = actorFactory.createTestActor(
862 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
863 "testReadyLocalTransactionWithImmediateCommit");
865 ShardTestKit.waitUntilLeader(shard);
867 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
869 final DataTreeModification modification = dataStore.newModification();
871 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
872 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
873 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
874 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
876 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
878 final TransactionIdentifier txId = nextTransactionId();
879 modification.ready();
880 final ReadyLocalTransaction readyMessage =
881 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
883 shard.tell(readyMessage, testKit.getRef());
885 testKit.expectMsgClass(CommitTransactionReply.class);
887 final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
888 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
892 public void testReadyLocalTransactionWithThreePhaseCommit() {
893 final ShardTestKit testKit = new ShardTestKit(getSystem());
894 final TestActorRef<Shard> shard = actorFactory.createTestActor(
895 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
896 "testReadyLocalTransactionWithThreePhaseCommit");
898 ShardTestKit.waitUntilLeader(shard);
900 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
902 final DataTreeModification modification = dataStore.newModification();
904 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
905 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
906 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
907 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
909 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
911 final TransactionIdentifier txId = nextTransactionId();
912 modification.ready();
913 final ReadyLocalTransaction readyMessage =
914 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
916 shard.tell(readyMessage, testKit.getRef());
918 testKit.expectMsgClass(ReadyTransactionReply.class);
920 // Send the CanCommitTransaction message.
922 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
923 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
924 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
925 assertTrue("Can commit", canCommitReply.getCanCommit());
927 // Send the CanCommitTransaction message.
929 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
930 testKit.expectMsgClass(CommitTransactionReply.class);
932 final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
933 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
937 public void testReadWriteCommitWithPersistenceDisabled() {
938 dataStoreContextBuilder.persistent(false);
939 final ShardTestKit testKit = new ShardTestKit(getSystem());
940 final TestActorRef<Shard> shard = actorFactory.createTestActor(
941 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
942 "testCommitWithPersistenceDisabled");
944 ShardTestKit.waitUntilLeader(shard);
946 // Setup a simulated transactions with a mock cohort.
948 final Duration duration = Duration.ofSeconds(5);
950 final TransactionIdentifier transactionID = nextTransactionId();
951 final NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
952 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
954 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
956 // Send the CanCommitTransaction message.
958 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
959 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
960 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
961 assertTrue("Can commit", canCommitReply.getCanCommit());
963 // Send the CanCommitTransaction message.
965 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
966 testKit.expectMsgClass(duration, CommitTransactionReply.class);
968 final NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH);
969 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
973 public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
974 testCommitWhenTransactionHasModifications(true);
978 public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
979 testCommitWhenTransactionHasModifications(false);
982 private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
983 final ShardTestKit testKit = new ShardTestKit(getSystem());
984 final DataTree dataTree = createDelegatingMockDataTree();
985 final TestActorRef<Shard> shard = actorFactory.createTestActor(
986 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
987 "testCommitWhenTransactionHasModifications-" + readWrite);
989 ShardTestKit.waitUntilLeader(shard);
991 final Duration duration = Duration.ofSeconds(5);
992 final TransactionIdentifier transactionID = nextTransactionId();
995 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
996 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
998 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
999 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1002 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1004 // Send the CanCommitTransaction message.
1006 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1007 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1008 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1009 assertTrue("Can commit", canCommitReply.getCanCommit());
1011 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1012 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1014 final InOrder inOrder = inOrder(dataTree);
1015 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1016 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1017 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1019 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1021 Thread.sleep(HEARTBEAT_MILLIS * 2);
1023 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1024 final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1026 // Use MBean for verification
1027 // Committed transaction count should increase as usual
1028 assertEquals(1, shardStats.getCommittedTransactionsCount());
1030 // Commit index should advance 1 to account for disabling metadata
1031 assertEquals(1, shardStats.getCommitIndex());
1035 public void testCommitPhaseFailure() throws Exception {
1036 final ShardTestKit testKit = new ShardTestKit(getSystem());
1037 final DataTree dataTree = createDelegatingMockDataTree();
1038 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1039 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1040 "testCommitPhaseFailure");
1042 ShardTestKit.waitUntilLeader(shard);
1044 final Duration duration = Duration.ofSeconds(5);
1045 final Timeout timeout = Timeout.create(duration);
1047 // Setup 2 simulated transactions with mock cohorts. The first
1051 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1052 .commit(any(DataTreeCandidate.class));
1054 final TransactionIdentifier transactionID1 = nextTransactionId();
1055 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1056 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1057 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1059 final TransactionIdentifier transactionID2 = nextTransactionId();
1060 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1061 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1062 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1064 // Send the CanCommitTransaction message for the first Tx.
1066 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1067 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1068 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1069 assertTrue("Can commit", canCommitReply.getCanCommit());
1071 // Send the CanCommitTransaction message for the 2nd Tx. This
1072 // should get queued and
1073 // processed after the first Tx completes.
1075 final Future<Object> canCommitFuture = Patterns.ask(shard,
1076 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1078 // Send the CommitTransaction message for the first Tx. This
1079 // should send back an error
1080 // and trigger the 2nd Tx to proceed.
1082 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1083 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1085 // Wait for the 2nd Tx to complete the canCommit phase.
1087 final CountDownLatch latch = new CountDownLatch(1);
1088 canCommitFuture.onComplete(new OnComplete<>() {
1090 public void onComplete(final Throwable failure, final Object resp) {
1093 }, getSystem().dispatcher());
1095 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1097 final InOrder inOrder = inOrder(dataTree);
1098 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1099 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1101 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1102 // validate performs wrapping and we capture that mock
1103 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1105 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1109 public void testPreCommitPhaseFailure() throws Exception {
1110 final ShardTestKit testKit = new ShardTestKit(getSystem());
1111 final DataTree dataTree = createDelegatingMockDataTree();
1112 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1113 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1114 "testPreCommitPhaseFailure");
1116 ShardTestKit.waitUntilLeader(shard);
1118 final Duration duration = Duration.ofSeconds(5);
1119 final Timeout timeout = Timeout.create(duration);
1121 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1122 .prepare(any(DataTreeModification.class));
1124 final TransactionIdentifier transactionID1 = nextTransactionId();
1125 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1126 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1127 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1129 final TransactionIdentifier transactionID2 = nextTransactionId();
1130 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1131 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1132 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1134 // Send the CanCommitTransaction message for the first Tx.
1136 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1137 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1138 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1139 assertTrue("Can commit", canCommitReply.getCanCommit());
1141 // Send the CanCommitTransaction message for the 2nd Tx. This
1142 // should get queued and
1143 // processed after the first Tx completes.
1145 final Future<Object> canCommitFuture = Patterns.ask(shard,
1146 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1148 // Send the CommitTransaction message for the first Tx. This
1149 // should send back an error
1150 // and trigger the 2nd Tx to proceed.
1152 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1153 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1155 // Wait for the 2nd Tx to complete the canCommit phase.
1157 final CountDownLatch latch = new CountDownLatch(1);
1158 canCommitFuture.onComplete(new OnComplete<>() {
1160 public void onComplete(final Throwable failure, final Object resp) {
1163 }, getSystem().dispatcher());
1165 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1167 final InOrder inOrder = inOrder(dataTree);
1168 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1169 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1170 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1174 public void testCanCommitPhaseFailure() throws Exception {
1175 final ShardTestKit testKit = new ShardTestKit(getSystem());
1176 final DataTree dataTree = createDelegatingMockDataTree();
1177 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1178 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179 "testCanCommitPhaseFailure");
1181 ShardTestKit.waitUntilLeader(shard);
1183 final Duration duration = Duration.ofSeconds(5);
1184 final TransactionIdentifier transactionID1 = nextTransactionId();
1186 doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure"))
1187 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1189 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1190 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1191 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1193 // Send the CanCommitTransaction message.
1195 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1196 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1198 // Send another can commit to ensure the failed one got cleaned
1201 final TransactionIdentifier transactionID2 = nextTransactionId();
1202 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1203 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1204 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1206 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1207 final CanCommitTransactionReply reply = CanCommitTransactionReply
1208 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1209 assertTrue("getCanCommit", reply.getCanCommit());
1213 public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1214 testImmediateCommitWithCanCommitPhaseFailure(true);
1215 testImmediateCommitWithCanCommitPhaseFailure(false);
1218 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1219 final ShardTestKit testKit = new ShardTestKit(getSystem());
1220 final DataTree dataTree = createDelegatingMockDataTree();
1221 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1222 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1223 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1225 ShardTestKit.waitUntilLeader(shard);
1227 doThrow(new DataValidationFailedException(YangInstanceIdentifier.empty(), "mock canCommit failure"))
1228 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1230 final Duration duration = Duration.ofSeconds(5);
1232 final TransactionIdentifier transactionID1 = nextTransactionId();
1235 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1236 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1238 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1239 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1242 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1244 // Send another can commit to ensure the failed one got cleaned
1247 final TransactionIdentifier transactionID2 = nextTransactionId();
1249 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1250 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1252 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1253 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1256 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1260 public void testAbortWithCommitPending() {
1261 final ShardTestKit testKit = new ShardTestKit(getSystem());
1262 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1264 void persistPayload(final Identifier id, final Payload payload,
1265 final boolean batchHint) {
1266 // Simulate an AbortTransaction message occurring during
1267 // replication, after
1268 // persisting and before finishing the commit to the
1271 doAbortTransaction(id, null);
1272 super.persistPayload(id, payload, batchHint);
1276 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1277 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1278 "testAbortWithCommitPending");
1280 ShardTestKit.waitUntilLeader(shard);
1282 final Duration duration = Duration.ofSeconds(5);
1284 final TransactionIdentifier transactionID = nextTransactionId();
1286 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1287 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1288 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1290 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1291 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1293 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1294 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1296 final NormalizedNode node = readStore(shard, TestModel.TEST_PATH);
1298 // Since we're simulating an abort occurring during replication
1299 // and before finish commit,
1300 // the data should still get written to the in-memory store
1301 // since we've gotten past
1302 // canCommit and preCommit and persisted the data.
1303 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1307 public void testTransactionCommitTimeout() throws Exception {
1308 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1309 final ShardTestKit testKit = new ShardTestKit(getSystem());
1310 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1311 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1312 "testTransactionCommitTimeout");
1314 ShardTestKit.waitUntilLeader(shard);
1316 final Duration duration = Duration.ofSeconds(5);
1318 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1319 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1320 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1322 // Ready 2 Tx's - the first will timeout
1324 final TransactionIdentifier transactionID1 = nextTransactionId();
1326 prepareBatchedModifications(transactionID1,
1327 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1328 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1329 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1331 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1333 final TransactionIdentifier transactionID2 = nextTransactionId();
1334 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1335 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1337 prepareBatchedModifications(transactionID2, listNodePath,
1338 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1339 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1341 // canCommit 1st Tx. We don't send the commit so it should
1344 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1345 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1347 // canCommit the 2nd Tx - it should complete after the 1st Tx
1350 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1351 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1353 // Try to commit the 1st Tx - should fail as it's not the
1356 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1357 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1359 // Commit the 2nd Tx.
1361 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1362 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1364 final NormalizedNode node = readStore(shard, listNodePath);
1365 assertNotNull(listNodePath + " not found", node);
1370 // public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1371 // dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1373 // new ShardTestKit(getSystem()) {{
1374 // final TestActorRef<Shard> shard = actorFactory.createTestActor(
1375 // newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1376 // "testTransactionCommitQueueCapacityExceeded");
1378 // waitUntilLeader(shard);
1380 // final FiniteDuration duration = duration("5 seconds");
1382 // final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1384 // final TransactionIdentifier transactionID1 = nextTransactionId();
1385 // final MutableCompositeModification modification1 = new MutableCompositeModification();
1386 // final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1387 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1390 // final TransactionIdentifier transactionID2 = nextTransactionId();
1391 // final MutableCompositeModification modification2 = new MutableCompositeModification();
1392 // final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1393 // TestModel.OUTER_LIST_PATH,
1394 // ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1397 // final TransactionIdentifier transactionID3 = nextTransactionId();
1398 // final MutableCompositeModification modification3 = new MutableCompositeModification();
1399 // final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1400 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1403 // // Ready the Tx's
1405 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1406 // modification1), getRef());
1407 // expectMsgClass(duration, ReadyTransactionReply.class);
1409 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1410 // modification2), getRef());
1411 // expectMsgClass(duration, ReadyTransactionReply.class);
1413 // // The 3rd Tx should exceed queue capacity and fail.
1415 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1416 // modification3), getRef());
1417 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1419 // // canCommit 1st Tx.
1421 // shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1422 // expectMsgClass(duration, CanCommitTransactionReply.class);
1424 // // canCommit the 2nd Tx - it should get queued.
1426 // shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1428 // // canCommit the 3rd Tx - should exceed queue capacity and fail.
1430 // shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1431 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1436 public void testTransactionCommitWithPriorExpiredCohortEntries() {
1437 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1438 final ShardTestKit testKit = new ShardTestKit(getSystem());
1439 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1440 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1441 "testTransactionCommitWithPriorExpiredCohortEntries");
1443 ShardTestKit.waitUntilLeader(shard);
1445 final Duration duration = Duration.ofSeconds(5);
1447 final TransactionIdentifier transactionID1 = nextTransactionId();
1448 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1449 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1450 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1452 final TransactionIdentifier transactionID2 = nextTransactionId();
1453 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1454 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1455 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1457 final TransactionIdentifier transactionID3 = nextTransactionId();
1458 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1459 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1460 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1462 // All Tx's are readied. We'll send canCommit for the last one
1463 // but not the others. The others
1464 // should expire from the queue and the last one should be
1467 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1468 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1472 public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1473 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1474 final ShardTestKit testKit = new ShardTestKit(getSystem());
1475 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1476 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1477 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1479 ShardTestKit.waitUntilLeader(shard);
1481 final Duration duration = Duration.ofSeconds(5);
1483 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1485 final TransactionIdentifier transactionID1 = nextTransactionId();
1486 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1487 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1488 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1490 // CanCommit the first Tx so it's the current in-progress Tx.
1492 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1493 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1495 // Ready the second Tx.
1497 final TransactionIdentifier transactionID2 = nextTransactionId();
1498 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1499 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1500 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1502 // Ready the third Tx.
1504 final TransactionIdentifier transactionID3 = nextTransactionId();
1505 final DataTreeModification modification3 = dataStore.newModification();
1506 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1507 .apply(modification3);
1508 modification3.ready();
1509 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1510 true, Optional.empty());
1511 shard.tell(readyMessage, testKit.getRef());
1513 // Commit the first Tx. After completing, the second should
1514 // expire from the queue and the third
1517 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1518 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1520 // Expect commit reply from the third Tx.
1522 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1524 final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH);
1525 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1529 public void testCanCommitBeforeReadyFailure() {
1530 final ShardTestKit testKit = new ShardTestKit(getSystem());
1531 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1532 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1533 "testCanCommitBeforeReadyFailure");
1535 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1536 testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
1540 public void testAbortAfterCanCommit() throws Exception {
1541 final ShardTestKit testKit = new ShardTestKit(getSystem());
1542 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1543 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1545 ShardTestKit.waitUntilLeader(shard);
1547 final Duration duration = Duration.ofSeconds(5);
1548 final Timeout timeout = Timeout.create(duration);
1550 // Ready 2 transactions - the first one will be aborted.
1552 final TransactionIdentifier transactionID1 = nextTransactionId();
1553 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1554 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1555 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1557 final TransactionIdentifier transactionID2 = nextTransactionId();
1558 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1559 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1560 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1562 // Send the CanCommitTransaction message for the first Tx.
1564 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1565 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1566 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1567 assertTrue("Can commit", canCommitReply.getCanCommit());
1569 // Send the CanCommitTransaction message for the 2nd Tx. This
1570 // should get queued and
1571 // processed after the first Tx completes.
1573 final Future<Object> canCommitFuture = Patterns.ask(shard,
1574 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1576 // Send the AbortTransaction message for the first Tx. This
1577 // should trigger the 2nd
1580 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1581 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1583 // Wait for the 2nd Tx to complete the canCommit phase.
1585 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
1586 FiniteDuration.create(5, TimeUnit.SECONDS));
1587 assertTrue("Can commit", canCommitReply.getCanCommit());
1591 public void testAbortAfterReady() {
1592 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1593 final ShardTestKit testKit = new ShardTestKit(getSystem());
1594 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1595 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1597 ShardTestKit.waitUntilLeader(shard);
1599 final Duration duration = Duration.ofSeconds(5);
1603 final TransactionIdentifier transactionID1 = nextTransactionId();
1604 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1605 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1606 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1608 // Send the AbortTransaction message.
1610 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1611 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1613 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1615 // Now send CanCommitTransaction - should fail.
1617 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1618 final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1619 assertTrue("Failure type", failure instanceof IllegalStateException);
1621 // Ready and CanCommit another and verify success.
1623 final TransactionIdentifier transactionID2 = nextTransactionId();
1624 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1625 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1626 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1628 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1629 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1633 public void testAbortQueuedTransaction() {
1634 final ShardTestKit testKit = new ShardTestKit(getSystem());
1635 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1636 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1638 ShardTestKit.waitUntilLeader(shard);
1640 final Duration duration = Duration.ofSeconds(5);
1644 final TransactionIdentifier transactionID1 = nextTransactionId();
1645 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1646 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1647 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1649 final TransactionIdentifier transactionID2 = nextTransactionId();
1650 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1651 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1652 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1654 final TransactionIdentifier transactionID3 = nextTransactionId();
1655 shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1656 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1657 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1659 // Abort the second tx while it's queued.
1661 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1662 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1664 // Commit the other 2.
1666 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1667 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1669 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1670 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1672 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1673 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1675 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1676 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1678 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1682 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1683 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1687 public void testCreateSnapshot() throws Exception {
1688 testCreateSnapshot(true, "testCreateSnapshot");
1691 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1692 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1694 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1695 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1696 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1701 public void saveSnapshot(final Object obj) {
1702 savedSnapshot.set(obj);
1703 super.saveSnapshot(obj);
1707 dataStoreContextBuilder.persistent(persistent);
1709 final class TestShard extends Shard {
1711 TestShard(final AbstractBuilder<?, ?> builder) {
1713 setPersistence(new TestPersistentDataProvider(super.persistence()));
1717 public void handleCommand(final Object message) {
1718 super.handleCommand(message);
1720 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1721 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1722 latch.get().countDown();
1727 public RaftActorContext getRaftActorContext() {
1728 return super.getRaftActorContext();
1732 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1734 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props.create(Shard.class,
1735 new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1737 ShardTestKit.waitUntilLeader(shard);
1738 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1740 final NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.empty());
1742 // Trigger creation of a snapshot by ensuring
1743 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1744 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1745 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1747 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1748 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1751 private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1752 final AtomicReference<Object> savedSnapshot, final NormalizedNode expectedRoot)
1753 throws InterruptedException {
1754 assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1756 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1758 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1760 latch.set(new CountDownLatch(1));
1761 savedSnapshot.set(null);
1764 private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode expectedRoot) {
1765 final NormalizedNode actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get();
1766 assertEquals("Root node", expectedRoot, actual);
1770 * This test simply verifies that the applySnapShot logic will work.
1773 public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1774 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1777 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1778 putTransaction.write(TestModel.TEST_PATH,
1779 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1780 commitTransaction(store, putTransaction);
1783 final NormalizedNode expected = readStore(store, YangInstanceIdentifier.empty());
1785 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1787 writeTransaction.delete(YangInstanceIdentifier.empty());
1788 writeTransaction.write(YangInstanceIdentifier.empty(), expected);
1790 commitTransaction(store, writeTransaction);
1792 final NormalizedNode actual = readStore(store, YangInstanceIdentifier.empty());
1794 assertEquals(expected, actual);
1798 public void testRecoveryApplicable() {
1800 final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1801 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1803 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1804 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1806 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1807 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1809 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1810 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1812 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1814 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1816 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1818 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1822 public void testOnDatastoreContext() {
1823 dataStoreContextBuilder.persistent(true);
1825 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1827 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1829 ShardTestKit.waitUntilLeader(shard);
1831 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1833 assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1835 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1837 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1841 public void testRegisterRoleChangeListener() {
1842 final ShardTestKit testKit = new ShardTestKit(getSystem());
1843 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1844 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1845 "testRegisterRoleChangeListener");
1847 ShardTestKit.waitUntilLeader(shard);
1849 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1851 shard.tell(new RegisterRoleChangeListener(), listener);
1853 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1855 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1856 ShardLeaderStateChanged.class);
1857 assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1858 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1859 leaderStateChanged.getLocalShardDataTree().get());
1861 MessageCollectorActor.clearMessages(listener);
1863 // Force a leader change
1865 shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1867 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1868 assertFalse("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1872 public void testFollowerInitialSyncStatus() {
1873 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1874 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1875 "testFollowerInitialSyncStatus");
1877 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1878 "member-1-shard-inventory-operational"));
1880 assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1882 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1883 "member-1-shard-inventory-operational"));
1885 assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1889 public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1890 final ShardTestKit testKit = new ShardTestKit(getSystem());
1891 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1892 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1893 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1895 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1896 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1897 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1899 setupInMemorySnapshotStore();
1901 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1902 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1903 actorFactory.generateActorId(testName + "-shard"));
1905 testKit.waitUntilNoLeader(shard);
1907 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1908 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1909 RegisterDataTreeNotificationListenerReply.class);
1910 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1912 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1913 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1915 listener.waitForChangeEvents();
1919 public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1920 final ShardTestKit testKit = new ShardTestKit(getSystem());
1921 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1922 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1923 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1925 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1926 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1927 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1929 setupInMemorySnapshotStore();
1931 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1932 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1933 actorFactory.generateActorId(testName + "-shard"));
1935 testKit.waitUntilNoLeader(shard);
1937 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1938 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1939 RegisterDataTreeNotificationListenerReply.class);
1940 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1942 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1943 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1944 testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1946 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1947 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1949 listener.expectNoMoreChanges("Received unexpected change after close");
1953 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1954 final ShardTestKit testKit = new ShardTestKit(getSystem());
1955 final String testName = "testClusteredDataTreeChangeListenerRegistration";
1956 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1957 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1959 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1960 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1962 final TestActorRef<Shard> followerShard = actorFactory
1963 .createTestActor(Shard.builder().id(followerShardID)
1964 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1965 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1966 "akka://test/user/" + leaderShardID.toString()))
1967 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1968 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1970 final TestActorRef<Shard> leaderShard = actorFactory
1971 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1972 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1973 "akka://test/user/" + followerShardID.toString()))
1974 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1975 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1977 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1978 final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1979 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1981 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1982 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1983 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1984 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1986 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1987 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1988 RegisterDataTreeNotificationListenerReply.class);
1989 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1991 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1993 listener.waitForChangeEvents();
1997 public void testServerRemoved() {
1998 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
1999 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2001 final ActorRef shard = parent.underlyingActor().context().actorOf(
2002 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2003 "testServerRemoved");
2005 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2007 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);