2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.base.Throwables;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collections;
37 import java.util.HashSet;
39 import java.util.Optional;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
78 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
79 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
80 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
83 import org.opendaylight.controller.cluster.raft.RaftActorContext;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
85 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
94 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
95 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
96 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
97 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
100 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.yangtools.concepts.Identifier;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
121 public class ShardTest extends AbstractShardTest {
122 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
123 + "InMemorySnapshotStore and journal recovery seq number will start from 1";
126 public void testRegisterDataTreeChangeListener() throws Exception {
127 final ShardTestKit testKit = new ShardTestKit(getSystem());
128 final TestActorRef<Shard> shard = actorFactory.createTestActor(
129 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
130 "testRegisterDataTreeChangeListener");
132 ShardTestKit.waitUntilLeader(shard);
134 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
136 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
137 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
138 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
140 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
142 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
143 RegisterDataTreeNotificationListenerReply.class);
144 final String replyPath = reply.getListenerRegistrationPath().toString();
145 assertTrue("Incorrect reply path: " + replyPath,
146 replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
148 final YangInstanceIdentifier path = TestModel.TEST_PATH;
149 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
151 listener.waitForChangeEvents();
154 @SuppressWarnings("serial")
156 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
157 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
158 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
159 final Creator<Shard> creator = new Creator<Shard>() {
160 boolean firstElectionTimeout = true;
163 public Shard create() {
164 return new Shard(newShardBuilder()) {
166 public void handleCommand(final Object message) {
167 if (message instanceof ElectionTimeout && firstElectionTimeout) {
168 firstElectionTimeout = false;
169 final ActorRef self = getSelf();
171 Uninterruptibles.awaitUninterruptibly(
172 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
173 self.tell(message, self);
176 onFirstElectionTimeout.countDown();
178 super.handleCommand(message);
185 setupInMemorySnapshotStore();
187 final YangInstanceIdentifier path = TestModel.TEST_PATH;
188 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
189 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
190 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
192 final TestActorRef<Shard> shard = actorFactory.createTestActor(
193 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
194 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
196 final ShardTestKit testKit = new ShardTestKit(getSystem());
197 assertTrue("Got first ElectionTimeout", onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
199 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
200 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
201 RegisterDataTreeNotificationListenerReply.class);
202 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
204 shard.tell(FindLeader.INSTANCE, testKit.getRef());
205 final FindLeaderReply findLeadeReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
206 FindLeaderReply.class);
207 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
209 onChangeListenerRegistered.countDown();
211 // TODO: investigate why we do not receive data chage events
212 listener.waitForChangeEvents();
216 public void testCreateTransaction() {
217 final ShardTestKit testKit = new ShardTestKit(getSystem());
218 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
220 ShardTestKit.waitUntilLeader(shard);
222 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
224 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
225 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
227 final CreateTransactionReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
228 CreateTransactionReply.class);
230 final String path = reply.getTransactionPath().toString();
231 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
232 "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
233 shardID.getShardName(), shardID.getMemberName().getName())));
237 public void testCreateTransactionOnChain() {
238 final ShardTestKit testKit = new ShardTestKit(getSystem());
239 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
241 ShardTestKit.waitUntilLeader(shard);
243 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
244 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
246 final CreateTransactionReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
247 CreateTransactionReply.class);
249 final String path = reply.getTransactionPath().toString();
250 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
251 "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
252 shardID.getShardName(), shardID.getMemberName().getName())));
256 public void testPeerAddressResolved() {
257 final ShardTestKit testKit = new ShardTestKit(getSystem());
258 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
260 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
261 .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
262 .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
264 final String address = "akka://foobar";
265 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
267 shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
268 final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
269 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
273 public void testApplySnapshot() throws Exception {
275 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
276 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
278 ShardTestKit.waitUntilLeader(shard);
280 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
283 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
284 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
285 .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
286 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
288 writeToStore(store, TestModel.TEST_PATH, container);
290 final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
291 final NormalizedNode<?,?> expected = readStore(store, root);
293 final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
294 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
296 shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
298 final Stopwatch sw = Stopwatch.createStarted();
299 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
300 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
303 assertEquals("Root node", expected, readStore(shard, root));
305 } catch (final AssertionError e) {
310 fail("Snapshot was not applied");
314 public void testApplyState() throws Exception {
315 final TestActorRef<Shard> shard = actorFactory.createTestActor(
316 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
318 ShardTestKit.waitUntilLeader(shard);
320 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
323 final DataTreeModification writeMod = store.takeSnapshot().newModification();
324 final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
325 writeMod.write(TestModel.TEST_PATH, node);
328 final TransactionIdentifier tx = nextTransactionId();
329 shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
331 final Stopwatch sw = Stopwatch.createStarted();
332 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
333 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
335 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
336 if (actual != null) {
337 assertEquals("Applied state", node, actual);
342 fail("State was not applied");
346 public void testDataTreeCandidateRecovery() throws Exception {
347 // Set up the InMemorySnapshotStore.
348 final DataTree source = setupInMemorySnapshotStore();
350 final DataTreeModification writeMod = source.takeSnapshot().newModification();
351 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
353 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
355 // Set up the InMemoryJournal.
356 InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
357 payloadForModification(source, writeMod, nextTransactionId())));
359 final int nListEntries = 16;
360 final Set<Integer> listEntryKeys = new HashSet<>();
362 // Add some ModificationPayload entries
363 for (int i = 1; i <= nListEntries; i++) {
364 listEntryKeys.add(Integer.valueOf(i));
366 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
367 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
369 final DataTreeModification mod = source.takeSnapshot().newModification();
370 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
373 InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
374 payloadForModification(source, mod, nextTransactionId())));
377 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
378 new ApplyJournalEntries(nListEntries));
380 testRecovery(listEntryKeys);
384 @SuppressWarnings("checkstyle:IllegalCatch")
385 public void testConcurrentThreePhaseCommits() throws Exception {
386 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
387 final CountDownLatch commitLatch = new CountDownLatch(2);
389 final long timeoutSec = 5;
390 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
391 final Timeout timeout = new Timeout(duration);
393 final TestActorRef<Shard> shard = actorFactory.createTestActor(
394 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
395 "testConcurrentThreePhaseCommits");
397 class OnFutureComplete extends OnComplete<Object> {
398 private final Class<?> expRespType;
400 OnFutureComplete(final Class<?> expRespType) {
401 this.expRespType = expRespType;
405 public void onComplete(final Throwable error, final Object resp) {
407 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
410 assertEquals("Commit response type", expRespType, resp.getClass());
412 } catch (final Exception e) {
418 void onSuccess(final Object resp) {
422 class OnCommitFutureComplete extends OnFutureComplete {
423 OnCommitFutureComplete() {
424 super(CommitTransactionReply.class);
428 public void onComplete(final Throwable error, final Object resp) {
429 super.onComplete(error, resp);
430 commitLatch.countDown();
434 class OnCanCommitFutureComplete extends OnFutureComplete {
435 private final TransactionIdentifier transactionID;
437 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
438 super(CanCommitTransactionReply.class);
439 this.transactionID = transactionID;
443 void onSuccess(final Object resp) {
444 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
445 assertTrue("Can commit", canCommitReply.getCanCommit());
447 final Future<Object> commitFuture = Patterns.ask(shard,
448 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
449 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
453 final ShardTestKit testKit = new ShardTestKit(getSystem());
454 ShardTestKit.waitUntilLeader(shard);
456 final TransactionIdentifier transactionID1 = nextTransactionId();
457 final TransactionIdentifier transactionID2 = nextTransactionId();
458 final TransactionIdentifier transactionID3 = nextTransactionId();
460 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
461 shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
462 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
463 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
464 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
466 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
467 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
468 final ReadyTransactionReply readyReply = ReadyTransactionReply
469 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
470 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
471 // Send the CanCommitTransaction message for the first Tx.
473 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
474 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
475 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
476 assertTrue("Can commit", canCommitReply.getCanCommit());
478 // Ready 2 more Tx's.
480 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
481 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
482 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
485 prepareBatchedModifications(transactionID3,
486 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
487 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
488 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
489 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
491 // Send the CanCommitTransaction message for the next 2 Tx's.
492 // These should get queued and
493 // processed after the first Tx completes.
495 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
496 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
498 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
499 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
501 // Send the CommitTransaction message for the first Tx. After it
502 // completes, it should
503 // trigger the 2nd Tx to proceed which should in turn then
506 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
507 testKit.expectMsgClass(duration, CommitTransactionReply.class);
509 // Wait for the next 2 Tx's to complete.
511 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
513 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
515 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
517 final Throwable t = caughtEx.get();
519 Throwables.propagateIfPossible(t, Exception.class);
520 throw new RuntimeException(t);
523 assertTrue("Commits complete", done);
525 // final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
526 // cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
527 // cohort3.getPreCommit(), cohort3.getCommit());
528 // inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
529 // inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
530 // inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
531 // inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
532 // inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
533 // inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
534 // inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
535 // inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
536 // inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
538 // Verify data in the data store.
540 verifyOuterListEntry(shard, 1);
542 verifyLastApplied(shard, 5);
546 public void testBatchedModificationsWithNoCommitOnReady() {
547 final ShardTestKit testKit = new ShardTestKit(getSystem());
548 final TestActorRef<Shard> shard = actorFactory.createTestActor(
549 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
550 "testBatchedModificationsWithNoCommitOnReady");
552 ShardTestKit.waitUntilLeader(shard);
554 final TransactionIdentifier transactionID = nextTransactionId();
555 final FiniteDuration duration = testKit.duration("5 seconds");
557 // Send a BatchedModifications to start a transaction.
559 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
560 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
561 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
563 // Send a couple more BatchedModifications.
565 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
566 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
568 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
570 shard.tell(newBatchedModifications(transactionID,
571 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
572 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
573 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
575 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
577 // Send the CanCommitTransaction message.
579 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
580 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
581 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
582 assertTrue("Can commit", canCommitReply.getCanCommit());
584 // Send the CommitTransaction message.
586 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
587 testKit.expectMsgClass(duration, CommitTransactionReply.class);
589 // Verify data in the data store.
591 verifyOuterListEntry(shard, 1);
595 public void testBatchedModificationsWithCommitOnReady() {
596 final ShardTestKit testKit = new ShardTestKit(getSystem());
597 final TestActorRef<Shard> shard = actorFactory.createTestActor(
598 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
599 "testBatchedModificationsWithCommitOnReady");
601 ShardTestKit.waitUntilLeader(shard);
603 final TransactionIdentifier transactionID = nextTransactionId();
604 final FiniteDuration duration = testKit.duration("5 seconds");
606 // Send a BatchedModifications to start a transaction.
608 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
609 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
610 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
612 // Send a couple more BatchedModifications.
614 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
615 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
617 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
619 shard.tell(newBatchedModifications(transactionID,
620 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
621 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
622 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
625 testKit.expectMsgClass(duration, CommitTransactionReply.class);
627 // Verify data in the data store.
628 verifyOuterListEntry(shard, 1);
631 @Test(expected = IllegalStateException.class)
632 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
633 final ShardTestKit testKit = new ShardTestKit(getSystem());
634 final TestActorRef<Shard> shard = actorFactory.createTestActor(
635 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
636 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
638 ShardTestKit.waitUntilLeader(shard);
640 final TransactionIdentifier transactionID = nextTransactionId();
641 final BatchedModifications batched = new BatchedModifications(transactionID,
642 DataStoreVersions.CURRENT_VERSION);
644 batched.setTotalMessagesSent(2);
646 shard.tell(batched, testKit.getRef());
648 final Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), Failure.class);
650 if (failure != null) {
651 Throwables.propagateIfPossible(failure.cause(), Exception.class);
652 throw new RuntimeException(failure.cause());
657 public void testBatchedModificationsWithOperationFailure() {
658 final ShardTestKit testKit = new ShardTestKit(getSystem());
659 final TestActorRef<Shard> shard = actorFactory.createTestActor(
660 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
661 "testBatchedModificationsWithOperationFailure");
663 ShardTestKit.waitUntilLeader(shard);
665 // Test merge with invalid data. An exception should occur when
666 // the merge is applied. Note that
667 // write will not validate the children for performance reasons.
669 final TransactionIdentifier transactionID = nextTransactionId();
671 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
672 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
673 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
675 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
676 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
677 shard.tell(batched, testKit.getRef());
678 Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
680 final Throwable cause = failure.cause();
682 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
684 batched.setTotalMessagesSent(2);
686 shard.tell(batched, testKit.getRef());
688 failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
689 assertEquals("Failure cause", cause, failure.cause());
693 public void testBatchedModificationsOnTransactionChain() {
694 final ShardTestKit testKit = new ShardTestKit(getSystem());
695 final TestActorRef<Shard> shard = actorFactory.createTestActor(
696 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
697 "testBatchedModificationsOnTransactionChain");
699 ShardTestKit.waitUntilLeader(shard);
701 final LocalHistoryIdentifier historyId = nextHistoryId();
702 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
703 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
705 final FiniteDuration duration = testKit.duration("5 seconds");
707 // Send a BatchedModifications to start a chained write
708 // transaction and ready it.
710 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
711 final YangInstanceIdentifier path = TestModel.TEST_PATH;
712 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
713 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
715 // Create a read Tx on the same chain.
717 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
718 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
720 final CreateTransactionReply createReply = testKit.expectMsgClass(testKit.duration("3 seconds"),
721 CreateTransactionReply.class);
723 getSystem().actorSelection(createReply.getTransactionPath())
724 .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
725 final ReadDataReply readReply = testKit.expectMsgClass(testKit.duration("3 seconds"), ReadDataReply.class);
726 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
728 // Commit the write transaction.
730 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
731 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
732 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
733 assertTrue("Can commit", canCommitReply.getCanCommit());
735 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
736 testKit.expectMsgClass(duration, CommitTransactionReply.class);
738 // Verify data in the data store.
740 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
741 assertEquals("Stored node", containerNode, actualNode);
745 public void testOnBatchedModificationsWhenNotLeader() {
746 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
747 final ShardTestKit testKit = new ShardTestKit(getSystem());
748 final Creator<Shard> creator = new Creator<Shard>() {
749 private static final long serialVersionUID = 1L;
752 public Shard create() {
753 return new Shard(newShardBuilder()) {
755 protected boolean isLeader() {
756 return overrideLeaderCalls.get() ? false : super.isLeader();
760 public ActorSelection getLeader() {
761 return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
768 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
769 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
770 "testOnBatchedModificationsWhenNotLeader");
772 ShardTestKit.waitUntilLeader(shard);
774 overrideLeaderCalls.set(true);
776 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
777 DataStoreVersions.CURRENT_VERSION);
779 shard.tell(batched, ActorRef.noSender());
781 testKit.expectMsgEquals(batched);
785 public void testTransactionMessagesWithNoLeader() {
786 final ShardTestKit testKit = new ShardTestKit(getSystem());
787 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
788 .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
789 final TestActorRef<Shard> shard = actorFactory.createTestActor(
790 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
791 "testTransactionMessagesWithNoLeader");
793 testKit.waitUntilNoLeader(shard);
795 final TransactionIdentifier txId = nextTransactionId();
796 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
797 Failure failure = testKit.expectMsgClass(Failure.class);
798 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
800 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
801 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
802 failure = testKit.expectMsgClass(Failure.class);
803 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
805 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
807 failure = testKit.expectMsgClass(Failure.class);
808 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
812 public void testReadyWithReadWriteImmediateCommit() {
813 testReadyWithImmediateCommit(true);
817 public void testReadyWithWriteOnlyImmediateCommit() {
818 testReadyWithImmediateCommit(false);
821 private void testReadyWithImmediateCommit(final boolean readWrite) {
822 final ShardTestKit testKit = new ShardTestKit(getSystem());
823 final TestActorRef<Shard> shard = actorFactory.createTestActor(
824 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
825 "testReadyWithImmediateCommit-" + readWrite);
827 ShardTestKit.waitUntilLeader(shard);
829 final TransactionIdentifier transactionID = nextTransactionId();
830 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
832 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
835 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
839 testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class);
841 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
842 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
846 public void testReadyLocalTransactionWithImmediateCommit() {
847 final ShardTestKit testKit = new ShardTestKit(getSystem());
848 final TestActorRef<Shard> shard = actorFactory.createTestActor(
849 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
850 "testReadyLocalTransactionWithImmediateCommit");
852 ShardTestKit.waitUntilLeader(shard);
854 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
856 final DataTreeModification modification = dataStore.newModification();
858 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
859 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
860 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
861 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
863 final TransactionIdentifier txId = nextTransactionId();
864 modification.ready();
865 final ReadyLocalTransaction readyMessage =
866 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
868 shard.tell(readyMessage, testKit.getRef());
870 testKit.expectMsgClass(CommitTransactionReply.class);
872 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
873 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
877 public void testReadyLocalTransactionWithThreePhaseCommit() {
878 final ShardTestKit testKit = new ShardTestKit(getSystem());
879 final TestActorRef<Shard> shard = actorFactory.createTestActor(
880 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
881 "testReadyLocalTransactionWithThreePhaseCommit");
883 ShardTestKit.waitUntilLeader(shard);
885 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
887 final DataTreeModification modification = dataStore.newModification();
889 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
890 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
891 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
892 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
894 final TransactionIdentifier txId = nextTransactionId();
895 modification.ready();
896 final ReadyLocalTransaction readyMessage =
897 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
899 shard.tell(readyMessage, testKit.getRef());
901 testKit.expectMsgClass(ReadyTransactionReply.class);
903 // Send the CanCommitTransaction message.
905 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
906 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
907 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
908 assertTrue("Can commit", canCommitReply.getCanCommit());
910 // Send the CanCommitTransaction message.
912 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
913 testKit.expectMsgClass(CommitTransactionReply.class);
915 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
916 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
920 public void testReadWriteCommitWithPersistenceDisabled() {
921 dataStoreContextBuilder.persistent(false);
922 final ShardTestKit testKit = new ShardTestKit(getSystem());
923 final TestActorRef<Shard> shard = actorFactory.createTestActor(
924 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
925 "testCommitWithPersistenceDisabled");
927 ShardTestKit.waitUntilLeader(shard);
929 // Setup a simulated transactions with a mock cohort.
931 final FiniteDuration duration = testKit.duration("5 seconds");
933 final TransactionIdentifier transactionID = nextTransactionId();
934 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
935 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
937 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
939 // Send the CanCommitTransaction message.
941 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
942 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
943 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
944 assertTrue("Can commit", canCommitReply.getCanCommit());
946 // Send the CanCommitTransaction message.
948 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
949 testKit.expectMsgClass(duration, CommitTransactionReply.class);
951 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
952 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
956 public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
957 testCommitWhenTransactionHasModifications(true);
961 public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
962 testCommitWhenTransactionHasModifications(false);
965 private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
966 final ShardTestKit testKit = new ShardTestKit(getSystem());
967 final DataTree dataTree = createDelegatingMockDataTree();
968 final TestActorRef<Shard> shard = actorFactory.createTestActor(
969 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
970 "testCommitWhenTransactionHasModifications-" + readWrite);
972 ShardTestKit.waitUntilLeader(shard);
974 final FiniteDuration duration = testKit.duration("5 seconds");
975 final TransactionIdentifier transactionID = nextTransactionId();
978 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
979 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
981 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
982 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
985 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
987 // Send the CanCommitTransaction message.
989 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
990 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
991 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
992 assertTrue("Can commit", canCommitReply.getCanCommit());
994 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
995 testKit.expectMsgClass(duration, CommitTransactionReply.class);
997 final InOrder inOrder = inOrder(dataTree);
998 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
999 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1000 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1002 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1004 Thread.sleep(HEARTBEAT_MILLIS * 2);
1006 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1007 final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1009 // Use MBean for verification
1010 // Committed transaction count should increase as usual
1011 assertEquals(1, shardStats.getCommittedTransactionsCount());
1013 // Commit index should advance as we do not have an empty
1015 assertEquals(1, shardStats.getCommitIndex());
1019 public void testCommitPhaseFailure() throws Exception {
1020 final ShardTestKit testKit = new ShardTestKit(getSystem());
1021 final DataTree dataTree = createDelegatingMockDataTree();
1022 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1023 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1024 "testCommitPhaseFailure");
1026 ShardTestKit.waitUntilLeader(shard);
1028 final FiniteDuration duration = testKit.duration("5 seconds");
1029 final Timeout timeout = new Timeout(duration);
1031 // Setup 2 simulated transactions with mock cohorts. The first
1035 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1036 .commit(any(DataTreeCandidate.class));
1038 final TransactionIdentifier transactionID1 = nextTransactionId();
1039 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1040 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1041 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1043 final TransactionIdentifier transactionID2 = nextTransactionId();
1044 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1045 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1046 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1048 // Send the CanCommitTransaction message for the first Tx.
1050 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1051 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1052 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1053 assertTrue("Can commit", canCommitReply.getCanCommit());
1055 // Send the CanCommitTransaction message for the 2nd Tx. This
1056 // should get queued and
1057 // processed after the first Tx completes.
1059 final Future<Object> canCommitFuture = Patterns.ask(shard,
1060 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1062 // Send the CommitTransaction message for the first Tx. This
1063 // should send back an error
1064 // and trigger the 2nd Tx to proceed.
1066 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1067 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1069 // Wait for the 2nd Tx to complete the canCommit phase.
1071 final CountDownLatch latch = new CountDownLatch(1);
1072 canCommitFuture.onComplete(new OnComplete<Object>() {
1074 public void onComplete(final Throwable failure, final Object resp) {
1077 }, getSystem().dispatcher());
1079 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1081 final InOrder inOrder = inOrder(dataTree);
1082 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1083 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1085 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1086 // validate performs wrapping and we capture that mock
1087 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1089 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1093 public void testPreCommitPhaseFailure() throws Exception {
1094 final ShardTestKit testKit = new ShardTestKit(getSystem());
1095 final DataTree dataTree = createDelegatingMockDataTree();
1096 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1097 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1098 "testPreCommitPhaseFailure");
1100 ShardTestKit.waitUntilLeader(shard);
1102 final FiniteDuration duration = testKit.duration("5 seconds");
1103 final Timeout timeout = new Timeout(duration);
1105 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1106 .prepare(any(DataTreeModification.class));
1108 final TransactionIdentifier transactionID1 = nextTransactionId();
1109 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1110 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1111 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1113 final TransactionIdentifier transactionID2 = nextTransactionId();
1114 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1115 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1116 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1118 // Send the CanCommitTransaction message for the first Tx.
1120 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1121 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1122 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1123 assertTrue("Can commit", canCommitReply.getCanCommit());
1125 // Send the CanCommitTransaction message for the 2nd Tx. This
1126 // should get queued and
1127 // processed after the first Tx completes.
1129 final Future<Object> canCommitFuture = Patterns.ask(shard,
1130 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1132 // Send the CommitTransaction message for the first Tx. This
1133 // should send back an error
1134 // and trigger the 2nd Tx to proceed.
1136 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1137 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1139 // Wait for the 2nd Tx to complete the canCommit phase.
1141 final CountDownLatch latch = new CountDownLatch(1);
1142 canCommitFuture.onComplete(new OnComplete<Object>() {
1144 public void onComplete(final Throwable failure, final Object resp) {
1147 }, getSystem().dispatcher());
1149 assertTrue("2nd CanCommit complete", latch.await(5, TimeUnit.SECONDS));
1151 final InOrder inOrder = inOrder(dataTree);
1152 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1153 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1154 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1158 public void testCanCommitPhaseFailure() throws Exception {
1159 final ShardTestKit testKit = new ShardTestKit(getSystem());
1160 final DataTree dataTree = createDelegatingMockDataTree();
1161 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1162 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1163 "testCanCommitPhaseFailure");
1165 ShardTestKit.waitUntilLeader(shard);
1167 final FiniteDuration duration = testKit.duration("5 seconds");
1168 final TransactionIdentifier transactionID1 = nextTransactionId();
1170 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1171 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1173 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1174 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1175 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1177 // Send the CanCommitTransaction message.
1179 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1180 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1182 // Send another can commit to ensure the failed one got cleaned
1185 final TransactionIdentifier transactionID2 = nextTransactionId();
1186 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1187 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1188 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1190 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1191 final CanCommitTransactionReply reply = CanCommitTransactionReply
1192 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1193 assertTrue("getCanCommit", reply.getCanCommit());
1197 public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1198 testImmediateCommitWithCanCommitPhaseFailure(true);
1199 testImmediateCommitWithCanCommitPhaseFailure(false);
1202 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1203 final ShardTestKit testKit = new ShardTestKit(getSystem());
1204 final DataTree dataTree = createDelegatingMockDataTree();
1205 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1206 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1207 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1209 ShardTestKit.waitUntilLeader(shard);
1211 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1212 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1214 final FiniteDuration duration = testKit.duration("5 seconds");
1216 final TransactionIdentifier transactionID1 = nextTransactionId();
1219 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1220 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1222 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1223 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1226 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1228 // Send another can commit to ensure the failed one got cleaned
1231 final TransactionIdentifier transactionID2 = nextTransactionId();
1233 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1234 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1236 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1237 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1240 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1244 public void testAbortWithCommitPending() {
1245 final ShardTestKit testKit = new ShardTestKit(getSystem());
1246 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1248 void persistPayload(final Identifier id, final Payload payload,
1249 final boolean batchHint) {
1250 // Simulate an AbortTransaction message occurring during
1251 // replication, after
1252 // persisting and before finishing the commit to the
1255 doAbortTransaction(id, null);
1256 super.persistPayload(id, payload, batchHint);
1260 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1261 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1262 "testAbortWithCommitPending");
1264 ShardTestKit.waitUntilLeader(shard);
1266 final FiniteDuration duration = testKit.duration("5 seconds");
1268 final TransactionIdentifier transactionID = nextTransactionId();
1270 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1271 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1272 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1274 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1275 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1277 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1278 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1280 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1282 // Since we're simulating an abort occurring during replication
1283 // and before finish commit,
1284 // the data should still get written to the in-memory store
1285 // since we've gotten past
1286 // canCommit and preCommit and persisted the data.
1287 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1291 public void testTransactionCommitTimeout() throws Exception {
1292 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1293 final ShardTestKit testKit = new ShardTestKit(getSystem());
1294 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1295 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1296 "testTransactionCommitTimeout");
1298 ShardTestKit.waitUntilLeader(shard);
1300 final FiniteDuration duration = testKit.duration("5 seconds");
1302 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1303 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1304 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1306 // Ready 2 Tx's - the first will timeout
1308 final TransactionIdentifier transactionID1 = nextTransactionId();
1310 prepareBatchedModifications(transactionID1,
1311 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1312 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1313 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1315 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1317 final TransactionIdentifier transactionID2 = nextTransactionId();
1318 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1319 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1321 prepareBatchedModifications(transactionID2, listNodePath,
1322 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1323 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1325 // canCommit 1st Tx. We don't send the commit so it should
1328 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1329 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1331 // canCommit the 2nd Tx - it should complete after the 1st Tx
1334 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1335 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1337 // Try to commit the 1st Tx - should fail as it's not the
1340 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1341 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1343 // Commit the 2nd Tx.
1345 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1346 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1348 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1349 assertNotNull(listNodePath + " not found", node);
1354 // public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1355 // dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1357 // new ShardTestKit(getSystem()) {{
1358 // final TestActorRef<Shard> shard = actorFactory.createTestActor(
1359 // newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1360 // "testTransactionCommitQueueCapacityExceeded");
1362 // waitUntilLeader(shard);
1364 // final FiniteDuration duration = duration("5 seconds");
1366 // final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1368 // final TransactionIdentifier transactionID1 = nextTransactionId();
1369 // final MutableCompositeModification modification1 = new MutableCompositeModification();
1370 // final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1371 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1374 // final TransactionIdentifier transactionID2 = nextTransactionId();
1375 // final MutableCompositeModification modification2 = new MutableCompositeModification();
1376 // final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1377 // TestModel.OUTER_LIST_PATH,
1378 // ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1381 // final TransactionIdentifier transactionID3 = nextTransactionId();
1382 // final MutableCompositeModification modification3 = new MutableCompositeModification();
1383 // final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1384 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1387 // // Ready the Tx's
1389 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1390 // modification1), getRef());
1391 // expectMsgClass(duration, ReadyTransactionReply.class);
1393 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1394 // modification2), getRef());
1395 // expectMsgClass(duration, ReadyTransactionReply.class);
1397 // // The 3rd Tx should exceed queue capacity and fail.
1399 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1400 // modification3), getRef());
1401 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1403 // // canCommit 1st Tx.
1405 // shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1406 // expectMsgClass(duration, CanCommitTransactionReply.class);
1408 // // canCommit the 2nd Tx - it should get queued.
1410 // shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1412 // // canCommit the 3rd Tx - should exceed queue capacity and fail.
1414 // shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1415 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1420 public void testTransactionCommitWithPriorExpiredCohortEntries() {
1421 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1422 final ShardTestKit testKit = new ShardTestKit(getSystem());
1423 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1424 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1425 "testTransactionCommitWithPriorExpiredCohortEntries");
1427 ShardTestKit.waitUntilLeader(shard);
1429 final FiniteDuration duration = testKit.duration("5 seconds");
1431 final TransactionIdentifier transactionID1 = nextTransactionId();
1432 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1433 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1434 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1436 final TransactionIdentifier transactionID2 = nextTransactionId();
1437 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1438 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1439 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1441 final TransactionIdentifier transactionID3 = nextTransactionId();
1442 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1443 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1444 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1446 // All Tx's are readied. We'll send canCommit for the last one
1447 // but not the others. The others
1448 // should expire from the queue and the last one should be
1451 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1452 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1456 public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1457 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1458 final ShardTestKit testKit = new ShardTestKit(getSystem());
1459 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1460 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1461 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1463 ShardTestKit.waitUntilLeader(shard);
1465 final FiniteDuration duration = testKit.duration("5 seconds");
1467 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1469 final TransactionIdentifier transactionID1 = nextTransactionId();
1470 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1471 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1472 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1474 // CanCommit the first Tx so it's the current in-progress Tx.
1476 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1477 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1479 // Ready the second Tx.
1481 final TransactionIdentifier transactionID2 = nextTransactionId();
1482 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1483 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1484 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1486 // Ready the third Tx.
1488 final TransactionIdentifier transactionID3 = nextTransactionId();
1489 final DataTreeModification modification3 = dataStore.newModification();
1490 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1491 .apply(modification3);
1492 modification3.ready();
1493 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1494 true, Optional.empty());
1495 shard.tell(readyMessage, testKit.getRef());
1497 // Commit the first Tx. After completing, the second should
1498 // expire from the queue and the third
1501 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1502 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1504 // Expect commit reply from the third Tx.
1506 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1508 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1509 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1513 public void testCanCommitBeforeReadyFailure() {
1514 final ShardTestKit testKit = new ShardTestKit(getSystem());
1515 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1516 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1517 "testCanCommitBeforeReadyFailure");
1519 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1520 testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
1524 public void testAbortAfterCanCommit() throws Exception {
1525 final ShardTestKit testKit = new ShardTestKit(getSystem());
1526 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1527 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1529 ShardTestKit.waitUntilLeader(shard);
1531 final FiniteDuration duration = testKit.duration("5 seconds");
1532 final Timeout timeout = new Timeout(duration);
1534 // Ready 2 transactions - the first one will be aborted.
1536 final TransactionIdentifier transactionID1 = nextTransactionId();
1537 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1538 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1539 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1541 final TransactionIdentifier transactionID2 = nextTransactionId();
1542 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1543 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1544 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1546 // Send the CanCommitTransaction message for the first Tx.
1548 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1549 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1550 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1551 assertTrue("Can commit", canCommitReply.getCanCommit());
1553 // Send the CanCommitTransaction message for the 2nd Tx. This
1554 // should get queued and
1555 // processed after the first Tx completes.
1557 final Future<Object> canCommitFuture = Patterns.ask(shard,
1558 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1560 // Send the AbortTransaction message for the first Tx. This
1561 // should trigger the 2nd
1564 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1565 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1567 // Wait for the 2nd Tx to complete the canCommit phase.
1569 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1570 assertTrue("Can commit", canCommitReply.getCanCommit());
1574 public void testAbortAfterReady() {
1575 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1576 final ShardTestKit testKit = new ShardTestKit(getSystem());
1577 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1578 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1580 ShardTestKit.waitUntilLeader(shard);
1582 final FiniteDuration duration = testKit.duration("5 seconds");
1586 final TransactionIdentifier transactionID1 = nextTransactionId();
1587 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1588 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1589 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1591 // Send the AbortTransaction message.
1593 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1594 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1596 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1598 // Now send CanCommitTransaction - should fail.
1600 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1601 final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1602 assertTrue("Failure type", failure instanceof IllegalStateException);
1604 // Ready and CanCommit another and verify success.
1606 final TransactionIdentifier transactionID2 = nextTransactionId();
1607 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1608 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1609 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1611 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1612 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1616 public void testAbortQueuedTransaction() {
1617 final ShardTestKit testKit = new ShardTestKit(getSystem());
1618 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1619 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1621 ShardTestKit.waitUntilLeader(shard);
1623 final FiniteDuration duration = testKit.duration("5 seconds");
1627 final TransactionIdentifier transactionID1 = nextTransactionId();
1628 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1629 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1630 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1632 final TransactionIdentifier transactionID2 = nextTransactionId();
1633 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1634 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1635 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1637 final TransactionIdentifier transactionID3 = nextTransactionId();
1638 shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1639 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1640 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1642 // Abort the second tx while it's queued.
1644 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1645 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1647 // Commit the other 2.
1649 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1650 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1652 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1653 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1655 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1656 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1658 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1659 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1661 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1665 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1666 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1670 public void testCreateSnapshot() throws Exception {
1671 testCreateSnapshot(true, "testCreateSnapshot");
1674 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1675 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1677 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1678 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1679 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1684 public void saveSnapshot(final Object obj) {
1685 savedSnapshot.set(obj);
1686 super.saveSnapshot(obj);
1690 dataStoreContextBuilder.persistent(persistent);
1692 class TestShard extends Shard {
1694 protected TestShard(final AbstractBuilder<?, ?> builder) {
1696 setPersistence(new TestPersistentDataProvider(super.persistence()));
1700 public void handleCommand(final Object message) {
1701 super.handleCommand(message);
1703 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1704 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1705 latch.get().countDown();
1710 public RaftActorContext getRaftActorContext() {
1711 return super.getRaftActorContext();
1715 final ShardTestKit testKit = new ShardTestKit(getSystem());
1717 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1719 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1720 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1723 ShardTestKit.waitUntilLeader(shard);
1724 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1726 final NormalizedNode<?, ?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1728 // Trigger creation of a snapshot by ensuring
1729 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1730 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1731 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1733 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1734 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1737 private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1738 final AtomicReference<Object> savedSnapshot, final NormalizedNode<?, ?> expectedRoot)
1739 throws InterruptedException {
1740 assertTrue("Snapshot saved", latch.get().await(5, TimeUnit.SECONDS));
1742 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1744 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1746 latch.set(new CountDownLatch(1));
1747 savedSnapshot.set(null);
1750 private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
1751 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get();
1752 assertEquals("Root node", expectedRoot, actual);
1756 * This test simply verifies that the applySnapShot logic will work.
1759 public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1760 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1763 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1764 putTransaction.write(TestModel.TEST_PATH,
1765 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1766 commitTransaction(store, putTransaction);
1769 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1771 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1773 writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1774 writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1776 commitTransaction(store, writeTransaction);
1778 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1780 assertEquals(expected, actual);
1784 public void testRecoveryApplicable() {
1786 final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1787 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1789 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1790 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1792 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1793 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1795 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1796 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1798 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1800 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1802 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1804 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1808 public void testOnDatastoreContext() {
1809 dataStoreContextBuilder.persistent(true);
1811 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1813 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1815 ShardTestKit.waitUntilLeader(shard);
1817 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1819 assertFalse("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1821 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1823 assertTrue("isRecoveryApplicable", shard.underlyingActor().persistence().isRecoveryApplicable());
1827 public void testRegisterRoleChangeListener() {
1828 final ShardTestKit testKit = new ShardTestKit(getSystem());
1829 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1830 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1831 "testRegisterRoleChangeListener");
1833 ShardTestKit.waitUntilLeader(shard);
1835 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1837 shard.tell(new RegisterRoleChangeListener(), listener);
1839 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1841 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1842 ShardLeaderStateChanged.class);
1843 assertTrue("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1844 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1845 leaderStateChanged.getLocalShardDataTree().get());
1847 MessageCollectorActor.clearMessages(listener);
1849 // Force a leader change
1851 shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1853 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener, ShardLeaderStateChanged.class);
1854 assertFalse("getLocalShardDataTree present", leaderStateChanged.getLocalShardDataTree().isPresent());
1858 public void testFollowerInitialSyncStatus() {
1859 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1860 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1861 "testFollowerInitialSyncStatus");
1863 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1864 "member-1-shard-inventory-operational"));
1866 assertFalse(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1868 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1869 "member-1-shard-inventory-operational"));
1871 assertTrue(shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1875 public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1876 final ShardTestKit testKit = new ShardTestKit(getSystem());
1877 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1878 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1879 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1881 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1882 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1883 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1885 setupInMemorySnapshotStore();
1887 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1888 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1889 actorFactory.generateActorId(testName + "-shard"));
1891 testKit.waitUntilNoLeader(shard);
1893 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1894 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
1895 RegisterDataTreeNotificationListenerReply.class);
1896 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1898 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1899 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1901 listener.waitForChangeEvents();
1905 public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1906 final ShardTestKit testKit = new ShardTestKit(getSystem());
1907 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1908 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1909 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1911 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1912 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1913 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1915 setupInMemorySnapshotStore();
1917 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1918 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1919 actorFactory.generateActorId(testName + "-shard"));
1921 testKit.waitUntilNoLeader(shard);
1923 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1924 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
1925 RegisterDataTreeNotificationListenerReply.class);
1926 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1928 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1929 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1930 testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1932 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1933 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1935 listener.expectNoMoreChanges("Received unexpected change after close");
1939 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1940 final ShardTestKit testKit = new ShardTestKit(getSystem());
1941 final String testName = "testClusteredDataTreeChangeListenerRegistration";
1942 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1943 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1945 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1946 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1948 final TestActorRef<Shard> followerShard = actorFactory
1949 .createTestActor(Shard.builder().id(followerShardID)
1950 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1951 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1952 "akka://test/user/" + leaderShardID.toString()))
1953 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1954 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1956 final TestActorRef<Shard> leaderShard = actorFactory
1957 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1958 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1959 "akka://test/user/" + followerShardID.toString()))
1960 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1961 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1963 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1964 final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1965 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1967 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1968 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1969 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1970 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1972 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1973 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
1974 RegisterDataTreeNotificationListenerReply.class);
1975 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1977 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1979 listener.waitForChangeEvents();
1983 public void testServerRemoved() {
1984 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
1985 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1987 final ActorRef shard = parent.underlyingActor().context().actorOf(
1988 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1989 "testServerRemoved");
1991 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
1993 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);