2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.base.Throwables;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.time.Duration;
37 import java.util.Collections;
38 import java.util.HashSet;
40 import java.util.Optional;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
63 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
79 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
80 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
83 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
84 import org.opendaylight.controller.cluster.raft.RaftActorContext;
85 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
87 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
91 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
94 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
95 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
96 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
97 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
98 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
99 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
100 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
101 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
102 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
103 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
104 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
105 import org.opendaylight.yangtools.concepts.Identifier;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
107 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import scala.concurrent.Await;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.FiniteDuration;
122 public class ShardTest extends AbstractShardTest {
123 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
124 + "InMemorySnapshotStore and journal recovery seq number will start from 1";
127 public void testRegisterDataTreeChangeListener() throws Exception {
128 final ShardTestKit testKit = new ShardTestKit(getSystem());
129 final TestActorRef<Shard> shard = actorFactory.createTestActor(
130 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
131 "testRegisterDataTreeChangeListener");
133 ShardTestKit.waitUntilLeader(shard);
135 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
137 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
138 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
139 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
141 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
143 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
144 RegisterDataTreeNotificationListenerReply.class);
145 final String replyPath = reply.getListenerRegistrationPath().toString();
146 assertTrue("Incorrect reply path: " + replyPath,
147 replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
149 final YangInstanceIdentifier path = TestModel.TEST_PATH;
150 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
152 listener.waitForChangeEvents();
155 @SuppressWarnings("serial")
157 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
158 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
159 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
160 final Creator<Shard> creator = new Creator<Shard>() {
161 boolean firstElectionTimeout = true;
164 public Shard create() {
165 return new Shard(newShardBuilder()) {
167 public void handleCommand(final Object message) {
168 if (message instanceof ElectionTimeout && firstElectionTimeout) {
169 firstElectionTimeout = false;
170 final ActorRef self = getSelf();
172 Uninterruptibles.awaitUninterruptibly(
173 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
174 self.tell(message, self);
177 onFirstElectionTimeout.countDown();
179 super.handleCommand(message);
186 setupInMemorySnapshotStore();
188 final YangInstanceIdentifier path = TestModel.TEST_PATH;
189 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
190 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
191 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
193 final TestActorRef<Shard> shard = actorFactory.createTestActor(
194 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
195 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
197 final ShardTestKit testKit = new ShardTestKit(getSystem());
198 assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
200 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
201 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
202 RegisterDataTreeNotificationListenerReply.class);
203 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
205 shard.tell(FindLeader.INSTANCE, testKit.getRef());
206 final FindLeaderReply findLeadeReply = testKit.expectMsgClass(Duration.ofSeconds(5), FindLeaderReply.class);
207 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
209 onChangeListenerRegistered.countDown();
211 // TODO: investigate why we do not receive data chage events
212 listener.waitForChangeEvents();
216 public void testCreateTransaction() {
217 final ShardTestKit testKit = new ShardTestKit(getSystem());
218 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
220 ShardTestKit.waitUntilLeader(shard);
222 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
224 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
225 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
227 final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
228 CreateTransactionReply.class);
230 final String path = reply.getTransactionPath().toString();
231 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
232 "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
233 shardID.getShardName(), shardID.getMemberName().getName())));
237 public void testCreateTransactionOnChain() {
238 final ShardTestKit testKit = new ShardTestKit(getSystem());
239 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
241 ShardTestKit.waitUntilLeader(shard);
243 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
244 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
246 final CreateTransactionReply reply = testKit.expectMsgClass(Duration.ofSeconds(3),
247 CreateTransactionReply.class);
249 final String path = reply.getTransactionPath().toString();
250 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
251 "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
252 shardID.getShardName(), shardID.getMemberName().getName())));
256 public void testPeerAddressResolved() {
257 final ShardTestKit testKit = new ShardTestKit(getSystem());
258 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
260 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
261 .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
262 .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
264 final String address = "akka://foobar";
265 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
267 shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
268 final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
269 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
273 public void testApplySnapshot() throws Exception {
275 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
276 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
278 ShardTestKit.waitUntilLeader(shard);
280 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
283 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
284 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
285 .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
286 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
288 writeToStore(store, TestModel.TEST_PATH, container);
290 final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
291 final NormalizedNode<?,?> expected = readStore(store, root);
293 final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
294 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
296 shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
298 final Stopwatch sw = Stopwatch.createStarted();
299 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
300 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
303 assertEquals("Root node", expected, readStore(shard, root));
305 } catch (final AssertionError e) {
310 fail("Snapshot was not applied");
314 public void testApplyState() throws Exception {
315 final TestActorRef<Shard> shard = actorFactory.createTestActor(
316 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
318 ShardTestKit.waitUntilLeader(shard);
320 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
323 final DataTreeModification writeMod = store.takeSnapshot().newModification();
324 final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
325 writeMod.write(TestModel.TEST_PATH, node);
328 final TransactionIdentifier tx = nextTransactionId();
329 shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
331 final Stopwatch sw = Stopwatch.createStarted();
332 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
333 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
335 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
336 if (actual != null) {
337 assertEquals("Applied state", node, actual);
342 fail("State was not applied");
346 public void testDataTreeCandidateRecovery() throws Exception {
347 // Set up the InMemorySnapshotStore.
348 final DataTree source = setupInMemorySnapshotStore();
350 final DataTreeModification writeMod = source.takeSnapshot().newModification();
351 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
353 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
355 // Set up the InMemoryJournal.
356 InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
357 payloadForModification(source, writeMod, nextTransactionId())));
359 final int nListEntries = 16;
360 final Set<Integer> listEntryKeys = new HashSet<>();
362 // Add some ModificationPayload entries
363 for (int i = 1; i <= nListEntries; i++) {
364 listEntryKeys.add(Integer.valueOf(i));
366 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
367 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
369 final DataTreeModification mod = source.takeSnapshot().newModification();
370 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
373 InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
374 payloadForModification(source, mod, nextTransactionId())));
377 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
378 new ApplyJournalEntries(nListEntries));
380 testRecovery(listEntryKeys);
384 @SuppressWarnings("checkstyle:IllegalCatch")
385 public void testConcurrentThreePhaseCommits() throws Exception {
386 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
387 final CountDownLatch commitLatch = new CountDownLatch(2);
389 final long timeoutSec = 5;
390 final Duration duration = Duration.ofSeconds(timeoutSec);
391 final Timeout timeout = Timeout.create(duration);
393 final TestActorRef<Shard> shard = actorFactory.createTestActor(
394 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
395 "testConcurrentThreePhaseCommits");
397 class OnFutureComplete extends OnComplete<Object> {
398 private final Class<?> expRespType;
400 OnFutureComplete(final Class<?> expRespType) {
401 this.expRespType = expRespType;
405 public void onComplete(final Throwable error, final Object resp) {
407 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
410 assertEquals("Commit response type", expRespType, resp.getClass());
412 } catch (final Exception e) {
418 void onSuccess(final Object resp) {
422 class OnCommitFutureComplete extends OnFutureComplete {
423 OnCommitFutureComplete() {
424 super(CommitTransactionReply.class);
428 public void onComplete(final Throwable error, final Object resp) {
429 super.onComplete(error, resp);
430 commitLatch.countDown();
434 class OnCanCommitFutureComplete extends OnFutureComplete {
435 private final TransactionIdentifier transactionID;
437 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
438 super(CanCommitTransactionReply.class);
439 this.transactionID = transactionID;
443 void onSuccess(final Object resp) {
444 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
445 assertTrue("Can commit", canCommitReply.getCanCommit());
447 final Future<Object> commitFuture = Patterns.ask(shard,
448 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
449 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
453 final ShardTestKit testKit = new ShardTestKit(getSystem());
454 ShardTestKit.waitUntilLeader(shard);
456 final TransactionIdentifier transactionID1 = nextTransactionId();
457 final TransactionIdentifier transactionID2 = nextTransactionId();
458 final TransactionIdentifier transactionID3 = nextTransactionId();
460 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
461 shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
462 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
463 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
464 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
466 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
467 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
468 final ReadyTransactionReply readyReply = ReadyTransactionReply
469 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
470 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
471 // Send the CanCommitTransaction message for the first Tx.
473 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
474 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
475 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
476 assertTrue("Can commit", canCommitReply.getCanCommit());
478 // Ready 2 more Tx's.
480 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
481 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
482 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
485 prepareBatchedModifications(transactionID3,
486 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
487 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
488 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
489 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
491 // Send the CanCommitTransaction message for the next 2 Tx's.
492 // These should get queued and
493 // processed after the first Tx completes.
495 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
496 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
498 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
499 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
501 // Send the CommitTransaction message for the first Tx. After it
502 // completes, it should
503 // trigger the 2nd Tx to proceed which should in turn then
506 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
507 testKit.expectMsgClass(duration, CommitTransactionReply.class);
509 // Wait for the next 2 Tx's to complete.
511 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
513 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
515 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
517 final Throwable t = caughtEx.get();
519 Throwables.propagateIfPossible(t, Exception.class);
520 throw new RuntimeException(t);
523 assertTrue("Commits complete", done);
525 // final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
526 // cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
527 // cohort3.getPreCommit(), cohort3.getCommit());
528 // inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
529 // inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
530 // inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
531 // inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
532 // inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
533 // inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
534 // inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
535 // inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
536 // inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
538 // Verify data in the data store.
540 verifyOuterListEntry(shard, 1);
542 verifyLastApplied(shard, 5);
546 public void testBatchedModificationsWithNoCommitOnReady() {
547 final ShardTestKit testKit = new ShardTestKit(getSystem());
548 final TestActorRef<Shard> shard = actorFactory.createTestActor(
549 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
550 "testBatchedModificationsWithNoCommitOnReady");
552 ShardTestKit.waitUntilLeader(shard);
554 final TransactionIdentifier transactionID = nextTransactionId();
555 final Duration duration = Duration.ofSeconds(5);
557 // Send a BatchedModifications to start a transaction.
559 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
560 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
561 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
563 // Send a couple more BatchedModifications.
565 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
566 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
568 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
570 shard.tell(newBatchedModifications(transactionID,
571 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
572 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
573 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
575 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
577 // Send the CanCommitTransaction message.
579 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
580 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
581 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
582 assertTrue("Can commit", canCommitReply.getCanCommit());
584 // Send the CommitTransaction message.
586 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
587 testKit.expectMsgClass(duration, CommitTransactionReply.class);
589 // Verify data in the data store.
591 verifyOuterListEntry(shard, 1);
595 public void testBatchedModificationsWithCommitOnReady() {
596 final ShardTestKit testKit = new ShardTestKit(getSystem());
597 final TestActorRef<Shard> shard = actorFactory.createTestActor(
598 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
599 "testBatchedModificationsWithCommitOnReady");
601 ShardTestKit.waitUntilLeader(shard);
603 final TransactionIdentifier transactionID = nextTransactionId();
604 final Duration duration = Duration.ofSeconds(5);
606 // Send a BatchedModifications to start a transaction.
608 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
609 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
610 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
612 // Send a couple more BatchedModifications.
614 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
615 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
617 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
619 shard.tell(newBatchedModifications(transactionID,
620 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
621 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
622 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
625 testKit.expectMsgClass(duration, CommitTransactionReply.class);
627 // Verify data in the data store.
628 verifyOuterListEntry(shard, 1);
631 @Test(expected = IllegalStateException.class)
632 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
633 final ShardTestKit testKit = new ShardTestKit(getSystem());
634 final TestActorRef<Shard> shard = actorFactory.createTestActor(
635 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
636 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
638 ShardTestKit.waitUntilLeader(shard);
640 final TransactionIdentifier transactionID = nextTransactionId();
641 final BatchedModifications batched = new BatchedModifications(transactionID,
642 DataStoreVersions.CURRENT_VERSION);
644 batched.setTotalMessagesSent(2);
646 shard.tell(batched, testKit.getRef());
648 final Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
650 if (failure != null) {
651 Throwables.propagateIfPossible(failure.cause(), Exception.class);
652 throw new RuntimeException(failure.cause());
657 public void testBatchedModificationsWithOperationFailure() {
658 final ShardTestKit testKit = new ShardTestKit(getSystem());
659 final TestActorRef<Shard> shard = actorFactory.createTestActor(
660 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
661 "testBatchedModificationsWithOperationFailure");
663 ShardTestKit.waitUntilLeader(shard);
665 // Test merge with invalid data. An exception should occur when
666 // the merge is applied. Note that
667 // write will not validate the children for performance reasons.
669 final TransactionIdentifier transactionID = nextTransactionId();
671 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
672 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
673 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
675 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
676 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
677 shard.tell(batched, testKit.getRef());
678 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
680 final Throwable cause = failure.cause();
682 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
684 batched.setTotalMessagesSent(2);
686 shard.tell(batched, testKit.getRef());
688 failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
689 assertEquals("Failure cause", cause, failure.cause());
693 public void testBatchedModificationsOnTransactionChain() {
694 final ShardTestKit testKit = new ShardTestKit(getSystem());
695 final TestActorRef<Shard> shard = actorFactory.createTestActor(
696 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
697 "testBatchedModificationsOnTransactionChain");
699 ShardTestKit.waitUntilLeader(shard);
701 final LocalHistoryIdentifier historyId = nextHistoryId();
702 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
703 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
705 final Duration duration = Duration.ofSeconds(5);
707 // Send a BatchedModifications to start a chained write
708 // transaction and ready it.
710 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
711 final YangInstanceIdentifier path = TestModel.TEST_PATH;
712 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
713 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
715 // Create a read Tx on the same chain.
717 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
718 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
720 final CreateTransactionReply createReply = testKit.expectMsgClass(Duration.ofSeconds(3),
721 CreateTransactionReply.class);
723 getSystem().actorSelection(createReply.getTransactionPath())
724 .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
725 final ReadDataReply readReply = testKit.expectMsgClass(Duration.ofSeconds(3), ReadDataReply.class);
726 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
728 // Commit the write transaction.
730 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
731 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
732 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
733 assertTrue("Can commit", canCommitReply.getCanCommit());
735 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
736 testKit.expectMsgClass(duration, CommitTransactionReply.class);
738 // Verify data in the data store.
740 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
741 assertEquals("Stored node", containerNode, actualNode);
745 public void testOnBatchedModificationsWhenNotLeader() {
746 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
747 final ShardTestKit testKit = new ShardTestKit(getSystem());
748 final Creator<Shard> creator = new Creator<Shard>() {
749 private static final long serialVersionUID = 1L;
752 public Shard create() {
753 return new Shard(newShardBuilder()) {
755 protected boolean isLeader() {
756 return overrideLeaderCalls.get() ? false : super.isLeader();
760 public ActorSelection getLeader() {
761 return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
768 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
769 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
770 "testOnBatchedModificationsWhenNotLeader");
772 ShardTestKit.waitUntilLeader(shard);
774 overrideLeaderCalls.set(true);
776 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
777 DataStoreVersions.CURRENT_VERSION);
779 shard.tell(batched, ActorRef.noSender());
781 testKit.expectMsgEquals(batched);
785 public void testTransactionMessagesWithNoLeader() {
786 final ShardTestKit testKit = new ShardTestKit(getSystem());
787 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
788 .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
789 final TestActorRef<Shard> shard = actorFactory.createTestActor(
790 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
791 "testTransactionMessagesWithNoLeader");
793 testKit.waitUntilNoLeader(shard);
795 final TransactionIdentifier txId = nextTransactionId();
796 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
797 Failure failure = testKit.expectMsgClass(Failure.class);
798 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
800 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
801 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
802 failure = testKit.expectMsgClass(Failure.class);
803 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
805 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
807 failure = testKit.expectMsgClass(Failure.class);
808 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
812 public void testReadyWithReadWriteImmediateCommit() {
813 testReadyWithImmediateCommit(true);
817 public void testReadyWithWriteOnlyImmediateCommit() {
818 testReadyWithImmediateCommit(false);
821 private void testReadyWithImmediateCommit(final boolean readWrite) {
822 final ShardTestKit testKit = new ShardTestKit(getSystem());
823 final TestActorRef<Shard> shard = actorFactory.createTestActor(
824 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
825 "testReadyWithImmediateCommit-" + readWrite);
827 ShardTestKit.waitUntilLeader(shard);
829 final TransactionIdentifier transactionID = nextTransactionId();
830 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
832 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
835 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
839 testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
841 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
842 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
846 public void testReadyLocalTransactionWithImmediateCommit() {
847 final ShardTestKit testKit = new ShardTestKit(getSystem());
848 final TestActorRef<Shard> shard = actorFactory.createTestActor(
849 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
850 "testReadyLocalTransactionWithImmediateCommit");
852 ShardTestKit.waitUntilLeader(shard);
854 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
856 final DataTreeModification modification = dataStore.newModification();
858 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
859 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
860 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
861 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
863 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
865 final TransactionIdentifier txId = nextTransactionId();
866 modification.ready();
867 final ReadyLocalTransaction readyMessage =
868 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
870 shard.tell(readyMessage, testKit.getRef());
872 testKit.expectMsgClass(CommitTransactionReply.class);
874 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
875 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
879 public void testReadyLocalTransactionWithThreePhaseCommit() {
880 final ShardTestKit testKit = new ShardTestKit(getSystem());
881 final TestActorRef<Shard> shard = actorFactory.createTestActor(
882 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
883 "testReadyLocalTransactionWithThreePhaseCommit");
885 ShardTestKit.waitUntilLeader(shard);
887 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
889 final DataTreeModification modification = dataStore.newModification();
891 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
892 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
893 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
894 .addChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
896 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
898 final TransactionIdentifier txId = nextTransactionId();
899 modification.ready();
900 final ReadyLocalTransaction readyMessage =
901 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
903 shard.tell(readyMessage, testKit.getRef());
905 testKit.expectMsgClass(ReadyTransactionReply.class);
907 // Send the CanCommitTransaction message.
909 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
910 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
911 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
912 assertTrue("Can commit", canCommitReply.getCanCommit());
914 // Send the CanCommitTransaction message.
916 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
917 testKit.expectMsgClass(CommitTransactionReply.class);
919 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
920 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
924 public void testReadWriteCommitWithPersistenceDisabled() {
925 dataStoreContextBuilder.persistent(false);
926 final ShardTestKit testKit = new ShardTestKit(getSystem());
927 final TestActorRef<Shard> shard = actorFactory.createTestActor(
928 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
929 "testCommitWithPersistenceDisabled");
931 ShardTestKit.waitUntilLeader(shard);
933 // Setup a simulated transactions with a mock cohort.
935 final Duration duration = Duration.ofSeconds(5);
937 final TransactionIdentifier transactionID = nextTransactionId();
938 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
939 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
941 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
943 // Send the CanCommitTransaction message.
945 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
946 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
947 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
948 assertTrue("Can commit", canCommitReply.getCanCommit());
950 // Send the CanCommitTransaction message.
952 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
953 testKit.expectMsgClass(duration, CommitTransactionReply.class);
955 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
956 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
960 public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
961 testCommitWhenTransactionHasModifications(true);
965 public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
966 testCommitWhenTransactionHasModifications(false);
969 private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
970 final ShardTestKit testKit = new ShardTestKit(getSystem());
971 final DataTree dataTree = createDelegatingMockDataTree();
972 final TestActorRef<Shard> shard = actorFactory.createTestActor(
973 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
974 "testCommitWhenTransactionHasModifications-" + readWrite);
976 ShardTestKit.waitUntilLeader(shard);
978 final Duration duration = Duration.ofSeconds(5);
979 final TransactionIdentifier transactionID = nextTransactionId();
982 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
983 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
985 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
986 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
989 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
991 // Send the CanCommitTransaction message.
993 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
994 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
995 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
996 assertTrue("Can commit", canCommitReply.getCanCommit());
998 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
999 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1001 final InOrder inOrder = inOrder(dataTree);
1002 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1003 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1004 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1006 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1008 Thread.sleep(HEARTBEAT_MILLIS * 2);
1010 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1011 final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1013 // Use MBean for verification
1014 // Committed transaction count should increase as usual
1015 assertEquals(1, shardStats.getCommittedTransactionsCount());
1017 // Commit index should advance as we do not have an empty
1019 assertEquals(1, shardStats.getCommitIndex());
1023 public void testCommitPhaseFailure() throws Exception {
1024 final ShardTestKit testKit = new ShardTestKit(getSystem());
1025 final DataTree dataTree = createDelegatingMockDataTree();
1026 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1027 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1028 "testCommitPhaseFailure");
1030 ShardTestKit.waitUntilLeader(shard);
1032 final Duration duration = Duration.ofSeconds(5);
1033 final Timeout timeout = Timeout.create(duration);
1035 // Setup 2 simulated transactions with mock cohorts. The first
1039 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1040 .commit(any(DataTreeCandidate.class));
1042 final TransactionIdentifier transactionID1 = nextTransactionId();
1043 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1044 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1045 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1047 final TransactionIdentifier transactionID2 = nextTransactionId();
1048 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1049 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1050 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1052 // Send the CanCommitTransaction message for the first Tx.
1054 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1055 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1056 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1057 assertTrue("Can commit", canCommitReply.getCanCommit());
1059 // Send the CanCommitTransaction message for the 2nd Tx. This
1060 // should get queued and
1061 // processed after the first Tx completes.
1063 final Future<Object> canCommitFuture = Patterns.ask(shard,
1064 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1066 // Send the CommitTransaction message for the first Tx. This
1067 // should send back an error
1068 // and trigger the 2nd Tx to proceed.
1070 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1071 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1073 // Wait for the 2nd Tx to complete the canCommit phase.
1075 final CountDownLatch latch = new CountDownLatch(1);
1076 canCommitFuture.onComplete(new OnComplete<Object>() {
1078 public void onComplete(final Throwable failure, final Object resp) {
1081 }, getSystem().dispatcher());
1083 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1085 final InOrder inOrder = inOrder(dataTree);
1086 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1087 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1089 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1090 // validate performs wrapping and we capture that mock
1091 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1093 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1097 public void testPreCommitPhaseFailure() throws Exception {
1098 final ShardTestKit testKit = new ShardTestKit(getSystem());
1099 final DataTree dataTree = createDelegatingMockDataTree();
1100 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1101 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1102 "testPreCommitPhaseFailure");
1104 ShardTestKit.waitUntilLeader(shard);
1106 final Duration duration = Duration.ofSeconds(5);
1107 final Timeout timeout = Timeout.create(duration);
1109 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1110 .prepare(any(DataTreeModification.class));
1112 final TransactionIdentifier transactionID1 = nextTransactionId();
1113 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1114 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1115 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1117 final TransactionIdentifier transactionID2 = nextTransactionId();
1118 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1119 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1120 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1122 // Send the CanCommitTransaction message for the first Tx.
1124 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1125 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1126 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1127 assertTrue("Can commit", canCommitReply.getCanCommit());
1129 // Send the CanCommitTransaction message for the 2nd Tx. This
1130 // should get queued and
1131 // processed after the first Tx completes.
1133 final Future<Object> canCommitFuture = Patterns.ask(shard,
1134 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1136 // Send the CommitTransaction message for the first Tx. This
1137 // should send back an error
1138 // and trigger the 2nd Tx to proceed.
1140 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1141 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1143 // Wait for the 2nd Tx to complete the canCommit phase.
1145 final CountDownLatch latch = new CountDownLatch(1);
1146 canCommitFuture.onComplete(new OnComplete<Object>() {
1148 public void onComplete(final Throwable failure, final Object resp) {
1151 }, getSystem().dispatcher());
1153 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1155 final InOrder inOrder = inOrder(dataTree);
1156 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1157 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1158 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1162 public void testCanCommitPhaseFailure() throws Exception {
1163 final ShardTestKit testKit = new ShardTestKit(getSystem());
1164 final DataTree dataTree = createDelegatingMockDataTree();
1165 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1166 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1167 "testCanCommitPhaseFailure");
1169 ShardTestKit.waitUntilLeader(shard);
1171 final Duration duration = Duration.ofSeconds(5);
1172 final TransactionIdentifier transactionID1 = nextTransactionId();
1174 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1175 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1177 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1178 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1179 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1181 // Send the CanCommitTransaction message.
1183 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1184 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1186 // Send another can commit to ensure the failed one got cleaned
1189 final TransactionIdentifier transactionID2 = nextTransactionId();
1190 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1191 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1192 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1194 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1195 final CanCommitTransactionReply reply = CanCommitTransactionReply
1196 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1197 assertTrue("getCanCommit", reply.getCanCommit());
1201 public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1202 testImmediateCommitWithCanCommitPhaseFailure(true);
1203 testImmediateCommitWithCanCommitPhaseFailure(false);
1206 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1207 final ShardTestKit testKit = new ShardTestKit(getSystem());
1208 final DataTree dataTree = createDelegatingMockDataTree();
1209 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1210 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1211 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1213 ShardTestKit.waitUntilLeader(shard);
1215 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1216 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1218 final Duration duration = Duration.ofSeconds(5);
1220 final TransactionIdentifier transactionID1 = nextTransactionId();
1223 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1224 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1226 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1227 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1230 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1232 // Send another can commit to ensure the failed one got cleaned
1235 final TransactionIdentifier transactionID2 = nextTransactionId();
1237 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1238 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1240 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1241 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1244 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1248 public void testAbortWithCommitPending() {
1249 final ShardTestKit testKit = new ShardTestKit(getSystem());
1250 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1252 void persistPayload(final Identifier id, final Payload payload,
1253 final boolean batchHint) {
1254 // Simulate an AbortTransaction message occurring during
1255 // replication, after
1256 // persisting and before finishing the commit to the
1259 doAbortTransaction(id, null);
1260 super.persistPayload(id, payload, batchHint);
1264 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1265 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1266 "testAbortWithCommitPending");
1268 ShardTestKit.waitUntilLeader(shard);
1270 final Duration duration = Duration.ofSeconds(5);
1272 final TransactionIdentifier transactionID = nextTransactionId();
1274 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1275 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1276 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1278 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1279 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1281 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1282 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1284 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1286 // Since we're simulating an abort occurring during replication
1287 // and before finish commit,
1288 // the data should still get written to the in-memory store
1289 // since we've gotten past
1290 // canCommit and preCommit and persisted the data.
1291 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1295 public void testTransactionCommitTimeout() throws Exception {
1296 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1297 final ShardTestKit testKit = new ShardTestKit(getSystem());
1298 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1299 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1300 "testTransactionCommitTimeout");
1302 ShardTestKit.waitUntilLeader(shard);
1304 final Duration duration = Duration.ofSeconds(5);
1306 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1307 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1308 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1310 // Ready 2 Tx's - the first will timeout
1312 final TransactionIdentifier transactionID1 = nextTransactionId();
1314 prepareBatchedModifications(transactionID1,
1315 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1316 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1317 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1319 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1321 final TransactionIdentifier transactionID2 = nextTransactionId();
1322 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1323 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1325 prepareBatchedModifications(transactionID2, listNodePath,
1326 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1327 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1329 // canCommit 1st Tx. We don't send the commit so it should
1332 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1333 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1335 // canCommit the 2nd Tx - it should complete after the 1st Tx
1338 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1339 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1341 // Try to commit the 1st Tx - should fail as it's not the
1344 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1345 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1347 // Commit the 2nd Tx.
1349 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1350 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1352 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1353 assertNotNull(listNodePath + " not found", node);
1358 // public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1359 // dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1361 // new ShardTestKit(getSystem()) {{
1362 // final TestActorRef<Shard> shard = actorFactory.createTestActor(
1363 // newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1364 // "testTransactionCommitQueueCapacityExceeded");
1366 // waitUntilLeader(shard);
1368 // final FiniteDuration duration = duration("5 seconds");
1370 // final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1372 // final TransactionIdentifier transactionID1 = nextTransactionId();
1373 // final MutableCompositeModification modification1 = new MutableCompositeModification();
1374 // final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1375 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1378 // final TransactionIdentifier transactionID2 = nextTransactionId();
1379 // final MutableCompositeModification modification2 = new MutableCompositeModification();
1380 // final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1381 // TestModel.OUTER_LIST_PATH,
1382 // ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1385 // final TransactionIdentifier transactionID3 = nextTransactionId();
1386 // final MutableCompositeModification modification3 = new MutableCompositeModification();
1387 // final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1388 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1391 // // Ready the Tx's
1393 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1394 // modification1), getRef());
1395 // expectMsgClass(duration, ReadyTransactionReply.class);
1397 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1398 // modification2), getRef());
1399 // expectMsgClass(duration, ReadyTransactionReply.class);
1401 // // The 3rd Tx should exceed queue capacity and fail.
1403 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1404 // modification3), getRef());
1405 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1407 // // canCommit 1st Tx.
1409 // shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1410 // expectMsgClass(duration, CanCommitTransactionReply.class);
1412 // // canCommit the 2nd Tx - it should get queued.
1414 // shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1416 // // canCommit the 3rd Tx - should exceed queue capacity and fail.
1418 // shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1419 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1424 public void testTransactionCommitWithPriorExpiredCohortEntries() {
1425 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1426 final ShardTestKit testKit = new ShardTestKit(getSystem());
1427 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1428 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1429 "testTransactionCommitWithPriorExpiredCohortEntries");
1431 ShardTestKit.waitUntilLeader(shard);
1433 final Duration duration = Duration.ofSeconds(5);
1435 final TransactionIdentifier transactionID1 = nextTransactionId();
1436 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1437 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1438 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1440 final TransactionIdentifier transactionID2 = nextTransactionId();
1441 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1442 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1443 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1445 final TransactionIdentifier transactionID3 = nextTransactionId();
1446 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1447 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1448 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1450 // All Tx's are readied. We'll send canCommit for the last one
1451 // but not the others. The others
1452 // should expire from the queue and the last one should be
1455 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1456 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1460 public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1461 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1462 final ShardTestKit testKit = new ShardTestKit(getSystem());
1463 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1464 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1465 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1467 ShardTestKit.waitUntilLeader(shard);
1469 final Duration duration = Duration.ofSeconds(5);
1471 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1473 final TransactionIdentifier transactionID1 = nextTransactionId();
1474 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1475 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1476 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1478 // CanCommit the first Tx so it's the current in-progress Tx.
1480 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1481 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1483 // Ready the second Tx.
1485 final TransactionIdentifier transactionID2 = nextTransactionId();
1486 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1487 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1488 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1490 // Ready the third Tx.
1492 final TransactionIdentifier transactionID3 = nextTransactionId();
1493 final DataTreeModification modification3 = dataStore.newModification();
1494 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1495 .apply(modification3);
1496 modification3.ready();
1497 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1498 true, Optional.empty());
1499 shard.tell(readyMessage, testKit.getRef());
1501 // Commit the first Tx. After completing, the second should
1502 // expire from the queue and the third
1505 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1506 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1508 // Expect commit reply from the third Tx.
1510 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1512 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1513 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1517 public void testCanCommitBeforeReadyFailure() {
1518 final ShardTestKit testKit = new ShardTestKit(getSystem());
1519 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1520 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1521 "testCanCommitBeforeReadyFailure");
1523 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1524 testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
1528 public void testAbortAfterCanCommit() throws Exception {
1529 final ShardTestKit testKit = new ShardTestKit(getSystem());
1530 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1531 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1533 ShardTestKit.waitUntilLeader(shard);
1535 final Duration duration = Duration.ofSeconds(5);
1536 final Timeout timeout = Timeout.create(duration);
1538 // Ready 2 transactions - the first one will be aborted.
1540 final TransactionIdentifier transactionID1 = nextTransactionId();
1541 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1542 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1543 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1545 final TransactionIdentifier transactionID2 = nextTransactionId();
1546 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1547 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1548 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1550 // Send the CanCommitTransaction message for the first Tx.
1552 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1553 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1554 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1555 assertTrue("Can commit", canCommitReply.getCanCommit());
1557 // Send the CanCommitTransaction message for the 2nd Tx. This
1558 // should get queued and
1559 // processed after the first Tx completes.
1561 final Future<Object> canCommitFuture = Patterns.ask(shard,
1562 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1564 // Send the AbortTransaction message for the first Tx. This
1565 // should trigger the 2nd
1568 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1569 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1571 // Wait for the 2nd Tx to complete the canCommit phase.
1573 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture,
1574 FiniteDuration.create(5, TimeUnit.SECONDS));
1575 assertTrue("Can commit", canCommitReply.getCanCommit());
1579 public void testAbortAfterReady() {
1580 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1581 final ShardTestKit testKit = new ShardTestKit(getSystem());
1582 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1583 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1585 ShardTestKit.waitUntilLeader(shard);
1587 final Duration duration = Duration.ofSeconds(5);
1591 final TransactionIdentifier transactionID1 = nextTransactionId();
1592 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1593 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1594 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1596 // Send the AbortTransaction message.
1598 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1599 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1601 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1603 // Now send CanCommitTransaction - should fail.
1605 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1606 final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1607 assertTrue("Failure type", failure instanceof IllegalStateException);
1609 // Ready and CanCommit another and verify success.
1611 final TransactionIdentifier transactionID2 = nextTransactionId();
1612 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1613 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1614 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1616 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1617 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1621 public void testAbortQueuedTransaction() {
1622 final ShardTestKit testKit = new ShardTestKit(getSystem());
1623 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1624 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1626 ShardTestKit.waitUntilLeader(shard);
1628 final Duration duration = Duration.ofSeconds(5);
1632 final TransactionIdentifier transactionID1 = nextTransactionId();
1633 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1634 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1635 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1637 final TransactionIdentifier transactionID2 = nextTransactionId();
1638 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1639 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1640 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1642 final TransactionIdentifier transactionID3 = nextTransactionId();
1643 shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1644 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1645 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1647 // Abort the second tx while it's queued.
1649 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1650 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1652 // Commit the other 2.
1654 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1655 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1657 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1658 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1660 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1661 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1663 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1664 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1666 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1670 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1671 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1675 public void testCreateSnapshot() throws Exception {
1676 testCreateSnapshot(true, "testCreateSnapshot");
1679 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1680 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1682 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1683 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1684 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1689 public void saveSnapshot(final Object obj) {
1690 savedSnapshot.set(obj);
1691 super.saveSnapshot(obj);
1695 dataStoreContextBuilder.persistent(persistent);
1697 class TestShard extends Shard {
1699 protected TestShard(final AbstractBuilder<?, ?> builder) {
1701 setPersistence(new TestPersistentDataProvider(super.persistence()));
1705 public void handleCommand(final Object message) {
1706 super.handleCommand(message);
1708 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1709 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1710 latch.get().countDown();
1715 public RaftActorContext getRaftActorContext() {
1716 return super.getRaftActorContext();
1720 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1722 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1723 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1726 ShardTestKit.waitUntilLeader(shard);
1727 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1729 final NormalizedNode<?, ?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1731 // Trigger creation of a snapshot by ensuring
1732 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1733 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1734 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1736 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1737 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1740 private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1741 final AtomicReference<Object> savedSnapshot, final NormalizedNode<?, ?> expectedRoot)
1742 throws InterruptedException {
1743 assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1745 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1747 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1749 latch.set(new CountDownLatch(1));
1750 savedSnapshot.set(null);
1753 private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
1754 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get();
1755 assertEquals("Root node", expectedRoot, actual);
1759 * This test simply verifies that the applySnapShot logic will work.
1762 public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1763 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1766 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1767 putTransaction.write(TestModel.TEST_PATH,
1768 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1769 commitTransaction(store, putTransaction);
1772 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1774 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1776 writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1777 writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1779 commitTransaction(store, writeTransaction);
1781 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1783 assertEquals(expected, actual);
1787 public void testRecoveryApplicable() {
1789 final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1790 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1792 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1793 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1795 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1796 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1798 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1799 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1801 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1803 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1805 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1807 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1811 public void testOnDatastoreContext() {
1812 dataStoreContextBuilder.persistent(true);
1814 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1816 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1818 ShardTestKit.waitUntilLeader(shard);
1820 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1822 assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1824 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1826 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1830 public void testRegisterRoleChangeListener() {
1831 final ShardTestKit testKit = new ShardTestKit(getSystem());
1832 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1833 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1834 "testRegisterRoleChangeListener");
1836 ShardTestKit.waitUntilLeader(shard);
1838 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1840 shard.tell(new RegisterRoleChangeListener(), listener);
1842 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1844 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1845 ShardLeaderStateChanged.class);
1846 assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1847 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1848 leaderStateChanged.getLocalShardDataTree().get());
1850 MessageCollectorActor.clearMessages(listener);
1852 // Force a leader change
1854 shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1856 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1857 assertFalse("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1861 public void testFollowerInitialSyncStatus() {
1862 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1863 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1864 "testFollowerInitialSyncStatus");
1866 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1867 "member-1-shard-inventory-operational"));
1869 assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1871 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1872 "member-1-shard-inventory-operational"));
1874 assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1878 public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1879 final ShardTestKit testKit = new ShardTestKit(getSystem());
1880 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1881 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1882 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1884 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1885 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1886 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1888 setupInMemorySnapshotStore();
1890 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1891 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1892 actorFactory.generateActorId(testName + "-shard"));
1894 testKit.waitUntilNoLeader(shard);
1896 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1897 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1898 RegisterDataTreeNotificationListenerReply.class);
1899 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1901 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1902 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1904 listener.waitForChangeEvents();
1908 public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1909 final ShardTestKit testKit = new ShardTestKit(getSystem());
1910 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1911 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1912 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1914 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1915 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1916 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1918 setupInMemorySnapshotStore();
1920 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1921 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1922 actorFactory.generateActorId(testName + "-shard"));
1924 testKit.waitUntilNoLeader(shard);
1926 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1927 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1928 RegisterDataTreeNotificationListenerReply.class);
1929 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1931 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1932 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1933 testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1935 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1936 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1938 listener.expectNoMoreChanges("Received unexpected change after close");
1942 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1943 final ShardTestKit testKit = new ShardTestKit(getSystem());
1944 final String testName = "testClusteredDataTreeChangeListenerRegistration";
1945 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1946 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1948 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1949 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1951 final TestActorRef<Shard> followerShard = actorFactory
1952 .createTestActor(Shard.builder().id(followerShardID)
1953 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1954 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1955 "akka://test/user/" + leaderShardID.toString()))
1956 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1957 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1959 final TestActorRef<Shard> leaderShard = actorFactory
1960 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1961 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1962 "akka://test/user/" + followerShardID.toString()))
1963 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1964 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1966 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1967 final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1968 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1970 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1971 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1972 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1973 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1975 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1976 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
1977 RegisterDataTreeNotificationListenerReply.class);
1978 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1980 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1982 listener.waitForChangeEvents();
1986 public void testServerRemoved() {
1987 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
1988 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1990 final ActorRef shard = parent.underlyingActor().context().actorOf(
1991 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1992 "testServerRemoved");
1994 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
1996 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);