2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.Matchers.any;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.base.Throwables;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.Collections;
37 import java.util.HashSet;
39 import java.util.Optional;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
78 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
79 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
80 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
83 import org.opendaylight.controller.cluster.raft.RaftActorContext;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
85 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
94 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
95 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
96 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
97 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
100 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.yangtools.concepts.Identifier;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
121 public class ShardTest extends AbstractShardTest {
122 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
123 + "InMemorySnapshotStore and journal recovery seq number will start from 1";
126 public void testRegisterDataTreeChangeListener() throws Exception {
127 final ShardTestKit testKit = new ShardTestKit(getSystem());
128 final TestActorRef<Shard> shard = actorFactory.createTestActor(
129 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
130 "testRegisterDataTreeChangeListener");
132 ShardTestKit.waitUntilLeader(shard);
134 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
136 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
137 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
138 TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
140 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), testKit.getRef());
142 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
143 RegisterDataTreeNotificationListenerReply.class);
144 final String replyPath = reply.getListenerRegistrationPath().toString();
145 assertTrue("Incorrect reply path: " + replyPath,
146 replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
148 final YangInstanceIdentifier path = TestModel.TEST_PATH;
149 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
151 listener.waitForChangeEvents();
154 @SuppressWarnings("serial")
156 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
157 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
158 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
159 final Creator<Shard> creator = new Creator<Shard>() {
160 boolean firstElectionTimeout = true;
163 public Shard create() {
164 return new Shard(newShardBuilder()) {
166 public void handleCommand(final Object message) {
167 if (message instanceof ElectionTimeout && firstElectionTimeout) {
168 firstElectionTimeout = false;
169 final ActorRef self = getSelf();
171 Uninterruptibles.awaitUninterruptibly(
172 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
173 self.tell(message, self);
176 onFirstElectionTimeout.countDown();
178 super.handleCommand(message);
185 setupInMemorySnapshotStore();
187 final YangInstanceIdentifier path = TestModel.TEST_PATH;
188 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
189 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
190 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
192 final TestActorRef<Shard> shard = actorFactory.createTestActor(
193 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
194 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
196 final ShardTestKit testKit = new ShardTestKit(getSystem());
197 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
199 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), testKit.getRef());
200 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
201 RegisterDataTreeNotificationListenerReply.class);
202 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
204 shard.tell(FindLeader.INSTANCE, testKit.getRef());
205 final FindLeaderReply findLeadeReply = testKit.expectMsgClass(testKit.duration("5 seconds"),
206 FindLeaderReply.class);
207 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
209 onChangeListenerRegistered.countDown();
211 // TODO: investigate why we do not receive data chage events
212 listener.waitForChangeEvents();
216 public void testCreateTransaction() {
217 final ShardTestKit testKit = new ShardTestKit(getSystem());
218 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
220 ShardTestKit.waitUntilLeader(shard);
222 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), testKit.getRef());
224 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
225 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
227 final CreateTransactionReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
228 CreateTransactionReply.class);
230 final String path = reply.getTransactionPath().toString();
231 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
232 "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
233 shardID.getShardName(), shardID.getMemberName().getName())));
237 public void testCreateTransactionOnChain() {
238 final ShardTestKit testKit = new ShardTestKit(getSystem());
239 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
241 ShardTestKit.waitUntilLeader(shard);
243 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
244 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
246 final CreateTransactionReply reply = testKit.expectMsgClass(testKit.duration("3 seconds"),
247 CreateTransactionReply.class);
249 final String path = reply.getTransactionPath().toString();
250 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
251 "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
252 shardID.getShardName(), shardID.getMemberName().getName())));
256 public void testPeerAddressResolved() {
257 final ShardTestKit testKit = new ShardTestKit(getSystem());
258 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
260 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
261 .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
262 .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
264 final String address = "akka://foobar";
265 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
267 shard.tell(GetOnDemandRaftState.INSTANCE, testKit.getRef());
268 final OnDemandRaftState state = testKit.expectMsgClass(OnDemandRaftState.class);
269 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
273 public void testApplySnapshot() throws Exception {
275 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
276 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
278 ShardTestKit.waitUntilLeader(shard);
280 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
283 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
284 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
285 .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
286 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
288 writeToStore(store, TestModel.TEST_PATH, container);
290 final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
291 final NormalizedNode<?,?> expected = readStore(store, root);
293 final Snapshot snapshot = Snapshot.create(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
294 Collections.emptyList(), 1, 2, 3, 4, -1, null, null);
296 shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
298 final Stopwatch sw = Stopwatch.createStarted();
299 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
300 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
303 assertEquals("Root node", expected, readStore(shard, root));
305 } catch (final AssertionError e) {
310 fail("Snapshot was not applied");
314 public void testApplyState() throws Exception {
315 final TestActorRef<Shard> shard = actorFactory.createTestActor(
316 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
318 ShardTestKit.waitUntilLeader(shard);
320 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
323 final DataTreeModification writeMod = store.takeSnapshot().newModification();
324 final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
325 writeMod.write(TestModel.TEST_PATH, node);
328 final TransactionIdentifier tx = nextTransactionId();
329 shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
331 final Stopwatch sw = Stopwatch.createStarted();
332 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
333 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
335 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
336 if (actual != null) {
337 assertEquals("Applied state", node, actual);
342 fail("State was not applied");
346 public void testDataTreeCandidateRecovery() throws Exception {
347 // Set up the InMemorySnapshotStore.
348 final DataTree source = setupInMemorySnapshotStore();
350 final DataTreeModification writeMod = source.takeSnapshot().newModification();
351 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
353 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
355 // Set up the InMemoryJournal.
356 InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
357 payloadForModification(source, writeMod, nextTransactionId())));
359 final int nListEntries = 16;
360 final Set<Integer> listEntryKeys = new HashSet<>();
362 // Add some ModificationPayload entries
363 for (int i = 1; i <= nListEntries; i++) {
364 listEntryKeys.add(Integer.valueOf(i));
366 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
367 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
369 final DataTreeModification mod = source.takeSnapshot().newModification();
370 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
373 InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
374 payloadForModification(source, mod, nextTransactionId())));
377 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
378 new ApplyJournalEntries(nListEntries));
380 testRecovery(listEntryKeys);
384 @SuppressWarnings("checkstyle:IllegalCatch")
385 public void testConcurrentThreePhaseCommits() throws Exception {
386 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
387 final CountDownLatch commitLatch = new CountDownLatch(2);
389 final long timeoutSec = 5;
390 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
391 final Timeout timeout = new Timeout(duration);
393 final TestActorRef<Shard> shard = actorFactory.createTestActor(
394 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
395 "testConcurrentThreePhaseCommits");
397 class OnFutureComplete extends OnComplete<Object> {
398 private final Class<?> expRespType;
400 OnFutureComplete(final Class<?> expRespType) {
401 this.expRespType = expRespType;
405 public void onComplete(final Throwable error, final Object resp) {
407 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
410 assertEquals("Commit response type", expRespType, resp.getClass());
412 } catch (final Exception e) {
418 void onSuccess(final Object resp) {
422 class OnCommitFutureComplete extends OnFutureComplete {
423 OnCommitFutureComplete() {
424 super(CommitTransactionReply.class);
428 public void onComplete(final Throwable error, final Object resp) {
429 super.onComplete(error, resp);
430 commitLatch.countDown();
434 class OnCanCommitFutureComplete extends OnFutureComplete {
435 private final TransactionIdentifier transactionID;
437 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
438 super(CanCommitTransactionReply.class);
439 this.transactionID = transactionID;
443 void onSuccess(final Object resp) {
444 final CanCommitTransactionReply canCommitReply =
445 CanCommitTransactionReply.fromSerializable(resp);
446 assertEquals("Can commit", true, canCommitReply.getCanCommit());
448 final Future<Object> commitFuture = Patterns.ask(shard,
449 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
450 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
454 final ShardTestKit testKit = new ShardTestKit(getSystem());
455 ShardTestKit.waitUntilLeader(shard);
457 final TransactionIdentifier transactionID1 = nextTransactionId();
458 final TransactionIdentifier transactionID2 = nextTransactionId();
459 final TransactionIdentifier transactionID3 = nextTransactionId();
461 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
462 shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
463 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
464 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
465 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
467 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
468 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
469 final ReadyTransactionReply readyReply = ReadyTransactionReply
470 .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
471 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
472 // Send the CanCommitTransaction message for the first Tx.
474 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
475 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
476 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
477 assertEquals("Can commit", true, canCommitReply.getCanCommit());
479 // Ready 2 more Tx's.
481 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
482 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
483 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
486 prepareBatchedModifications(transactionID3,
487 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
488 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
489 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
490 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
492 // Send the CanCommitTransaction message for the next 2 Tx's.
493 // These should get queued and
494 // processed after the first Tx completes.
496 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
497 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
499 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
500 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
502 // Send the CommitTransaction message for the first Tx. After it
503 // completes, it should
504 // trigger the 2nd Tx to proceed which should in turn then
507 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
508 testKit.expectMsgClass(duration, CommitTransactionReply.class);
510 // Wait for the next 2 Tx's to complete.
512 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
514 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
516 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
518 final Throwable t = caughtEx.get();
520 Throwables.propagateIfPossible(t, Exception.class);
521 throw new RuntimeException(t);
524 assertEquals("Commits complete", true, done);
526 // final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
527 // cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
528 // cohort3.getPreCommit(), cohort3.getCommit());
529 // inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
530 // inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
531 // inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
532 // inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
533 // inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
534 // inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
535 // inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
536 // inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
537 // inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
539 // Verify data in the data store.
541 verifyOuterListEntry(shard, 1);
543 verifyLastApplied(shard, 5);
547 public void testBatchedModificationsWithNoCommitOnReady() {
548 final ShardTestKit testKit = new ShardTestKit(getSystem());
549 final TestActorRef<Shard> shard = actorFactory.createTestActor(
550 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
551 "testBatchedModificationsWithNoCommitOnReady");
553 ShardTestKit.waitUntilLeader(shard);
555 final TransactionIdentifier transactionID = nextTransactionId();
556 final FiniteDuration duration = testKit.duration("5 seconds");
558 // Send a BatchedModifications to start a transaction.
560 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
561 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
562 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
564 // Send a couple more BatchedModifications.
566 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
567 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
569 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
571 shard.tell(newBatchedModifications(transactionID,
572 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
573 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
574 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
576 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
578 // Send the CanCommitTransaction message.
580 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
581 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
582 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
583 assertEquals("Can commit", true, canCommitReply.getCanCommit());
585 // Send the CommitTransaction message.
587 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
588 testKit.expectMsgClass(duration, CommitTransactionReply.class);
590 // Verify data in the data store.
592 verifyOuterListEntry(shard, 1);
596 public void testBatchedModificationsWithCommitOnReady() {
597 final ShardTestKit testKit = new ShardTestKit(getSystem());
598 final TestActorRef<Shard> shard = actorFactory.createTestActor(
599 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
600 "testBatchedModificationsWithCommitOnReady");
602 ShardTestKit.waitUntilLeader(shard);
604 final TransactionIdentifier transactionID = nextTransactionId();
605 final FiniteDuration duration = testKit.duration("5 seconds");
607 // Send a BatchedModifications to start a transaction.
609 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
610 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), testKit.getRef());
611 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
613 // Send a couple more BatchedModifications.
615 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
616 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
618 testKit.expectMsgClass(duration, BatchedModificationsReply.class);
620 shard.tell(newBatchedModifications(transactionID,
621 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
622 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
623 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
626 testKit.expectMsgClass(duration, CommitTransactionReply.class);
628 // Verify data in the data store.
629 verifyOuterListEntry(shard, 1);
632 @Test(expected = IllegalStateException.class)
633 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
634 final ShardTestKit testKit = new ShardTestKit(getSystem());
635 final TestActorRef<Shard> shard = actorFactory.createTestActor(
636 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
637 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
639 ShardTestKit.waitUntilLeader(shard);
641 final TransactionIdentifier transactionID = nextTransactionId();
642 final BatchedModifications batched = new BatchedModifications(transactionID,
643 DataStoreVersions.CURRENT_VERSION);
645 batched.setTotalMessagesSent(2);
647 shard.tell(batched, testKit.getRef());
649 final Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), Failure.class);
651 if (failure != null) {
652 Throwables.propagateIfPossible(failure.cause(), Exception.class);
653 throw new RuntimeException(failure.cause());
658 public void testBatchedModificationsWithOperationFailure() {
659 final ShardTestKit testKit = new ShardTestKit(getSystem());
660 final TestActorRef<Shard> shard = actorFactory.createTestActor(
661 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
662 "testBatchedModificationsWithOperationFailure");
664 ShardTestKit.waitUntilLeader(shard);
666 // Test merge with invalid data. An exception should occur when
667 // the merge is applied. Note that
668 // write will not validate the children for performance reasons.
670 final TransactionIdentifier transactionID = nextTransactionId();
672 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
673 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
674 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
676 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
677 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
678 shard.tell(batched, testKit.getRef());
679 Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
681 final Throwable cause = failure.cause();
683 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
685 batched.setTotalMessagesSent(2);
687 shard.tell(batched, testKit.getRef());
689 failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
690 assertEquals("Failure cause", cause, failure.cause());
694 public void testBatchedModificationsOnTransactionChain() {
695 final ShardTestKit testKit = new ShardTestKit(getSystem());
696 final TestActorRef<Shard> shard = actorFactory.createTestActor(
697 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
698 "testBatchedModificationsOnTransactionChain");
700 ShardTestKit.waitUntilLeader(shard);
702 final LocalHistoryIdentifier historyId = nextHistoryId();
703 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
704 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
706 final FiniteDuration duration = testKit.duration("5 seconds");
708 // Send a BatchedModifications to start a chained write
709 // transaction and ready it.
711 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
712 final YangInstanceIdentifier path = TestModel.TEST_PATH;
713 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), testKit.getRef());
714 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
716 // Create a read Tx on the same chain.
718 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
719 DataStoreVersions.CURRENT_VERSION).toSerializable(), testKit.getRef());
721 final CreateTransactionReply createReply = testKit.expectMsgClass(testKit.duration("3 seconds"),
722 CreateTransactionReply.class);
724 getSystem().actorSelection(createReply.getTransactionPath())
725 .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
726 final ReadDataReply readReply = testKit.expectMsgClass(testKit.duration("3 seconds"), ReadDataReply.class);
727 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
729 // Commit the write transaction.
731 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
732 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
733 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
734 assertEquals("Can commit", true, canCommitReply.getCanCommit());
736 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
737 testKit.expectMsgClass(duration, CommitTransactionReply.class);
739 // Verify data in the data store.
741 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
742 assertEquals("Stored node", containerNode, actualNode);
746 public void testOnBatchedModificationsWhenNotLeader() {
747 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
748 final ShardTestKit testKit = new ShardTestKit(getSystem());
749 final Creator<Shard> creator = new Creator<Shard>() {
750 private static final long serialVersionUID = 1L;
753 public Shard create() {
754 return new Shard(newShardBuilder()) {
756 protected boolean isLeader() {
757 return overrideLeaderCalls.get() ? false : super.isLeader();
761 public ActorSelection getLeader() {
762 return overrideLeaderCalls.get() ? getSystem().actorSelection(testKit.getRef().path())
769 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
770 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
771 "testOnBatchedModificationsWhenNotLeader");
773 ShardTestKit.waitUntilLeader(shard);
775 overrideLeaderCalls.set(true);
777 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
778 DataStoreVersions.CURRENT_VERSION);
780 shard.tell(batched, ActorRef.noSender());
782 testKit.expectMsgEquals(batched);
786 public void testTransactionMessagesWithNoLeader() {
787 final ShardTestKit testKit = new ShardTestKit(getSystem());
788 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
789 .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
790 final TestActorRef<Shard> shard = actorFactory.createTestActor(
791 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
792 "testTransactionMessagesWithNoLeader");
794 testKit.waitUntilNoLeader(shard);
796 final TransactionIdentifier txId = nextTransactionId();
797 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
798 Failure failure = testKit.expectMsgClass(Failure.class);
799 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
801 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
802 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
803 failure = testKit.expectMsgClass(Failure.class);
804 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
806 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
808 failure = testKit.expectMsgClass(Failure.class);
809 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
813 public void testReadyWithReadWriteImmediateCommit() {
814 testReadyWithImmediateCommit(true);
818 public void testReadyWithWriteOnlyImmediateCommit() {
819 testReadyWithImmediateCommit(false);
822 private void testReadyWithImmediateCommit(final boolean readWrite) {
823 final ShardTestKit testKit = new ShardTestKit(getSystem());
824 final TestActorRef<Shard> shard = actorFactory.createTestActor(
825 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
826 "testReadyWithImmediateCommit-" + readWrite);
828 ShardTestKit.waitUntilLeader(shard);
830 final TransactionIdentifier transactionID = nextTransactionId();
831 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
833 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH, containerNode, true),
836 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
840 testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class);
842 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
843 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
847 public void testReadyLocalTransactionWithImmediateCommit() {
848 final ShardTestKit testKit = new ShardTestKit(getSystem());
849 final TestActorRef<Shard> shard = actorFactory.createTestActor(
850 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
851 "testReadyLocalTransactionWithImmediateCommit");
853 ShardTestKit.waitUntilLeader(shard);
855 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
857 final DataTreeModification modification = dataStore.newModification();
859 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
860 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
861 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
862 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
864 final TransactionIdentifier txId = nextTransactionId();
865 modification.ready();
866 final ReadyLocalTransaction readyMessage =
867 new ReadyLocalTransaction(txId, modification, true, Optional.empty());
869 shard.tell(readyMessage, testKit.getRef());
871 testKit.expectMsgClass(CommitTransactionReply.class);
873 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
874 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
878 public void testReadyLocalTransactionWithThreePhaseCommit() {
879 final ShardTestKit testKit = new ShardTestKit(getSystem());
880 final TestActorRef<Shard> shard = actorFactory.createTestActor(
881 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
882 "testReadyLocalTransactionWithThreePhaseCommit");
884 ShardTestKit.waitUntilLeader(shard);
886 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
888 final DataTreeModification modification = dataStore.newModification();
890 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
891 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
892 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
893 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
895 final TransactionIdentifier txId = nextTransactionId();
896 modification.ready();
897 final ReadyLocalTransaction readyMessage =
898 new ReadyLocalTransaction(txId, modification, false, Optional.empty());
900 shard.tell(readyMessage, testKit.getRef());
902 testKit.expectMsgClass(ReadyTransactionReply.class);
904 // Send the CanCommitTransaction message.
906 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
907 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
908 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
909 assertEquals("Can commit", true, canCommitReply.getCanCommit());
911 // Send the CanCommitTransaction message.
913 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), testKit.getRef());
914 testKit.expectMsgClass(CommitTransactionReply.class);
916 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
917 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
921 public void testReadWriteCommitWithPersistenceDisabled() {
922 dataStoreContextBuilder.persistent(false);
923 final ShardTestKit testKit = new ShardTestKit(getSystem());
924 final TestActorRef<Shard> shard = actorFactory.createTestActor(
925 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
926 "testCommitWithPersistenceDisabled");
928 ShardTestKit.waitUntilLeader(shard);
930 // Setup a simulated transactions with a mock cohort.
932 final FiniteDuration duration = testKit.duration("5 seconds");
934 final TransactionIdentifier transactionID = nextTransactionId();
935 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
936 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
938 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
940 // Send the CanCommitTransaction message.
942 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
943 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
944 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
945 assertEquals("Can commit", true, canCommitReply.getCanCommit());
947 // Send the CanCommitTransaction message.
949 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
950 testKit.expectMsgClass(duration, CommitTransactionReply.class);
952 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
953 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
957 public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
958 testCommitWhenTransactionHasModifications(true);
962 public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
963 testCommitWhenTransactionHasModifications(false);
966 private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
967 final ShardTestKit testKit = new ShardTestKit(getSystem());
968 final DataTree dataTree = createDelegatingMockDataTree();
969 final TestActorRef<Shard> shard = actorFactory.createTestActor(
970 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
971 "testCommitWhenTransactionHasModifications-" + readWrite);
973 ShardTestKit.waitUntilLeader(shard);
975 final FiniteDuration duration = testKit.duration("5 seconds");
976 final TransactionIdentifier transactionID = nextTransactionId();
979 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
980 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
982 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
983 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
986 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
988 // Send the CanCommitTransaction message.
990 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
991 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
992 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
993 assertEquals("Can commit", true, canCommitReply.getCanCommit());
995 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
996 testKit.expectMsgClass(duration, CommitTransactionReply.class);
998 final InOrder inOrder = inOrder(dataTree);
999 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1000 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1001 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1003 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1005 Thread.sleep(HEARTBEAT_MILLIS * 2);
1007 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, testKit.getRef());
1008 final ShardStats shardStats = testKit.expectMsgClass(duration, ShardStats.class);
1010 // Use MBean for verification
1011 // Committed transaction count should increase as usual
1012 assertEquals(1, shardStats.getCommittedTransactionsCount());
1014 // Commit index should advance as we do not have an empty
1016 assertEquals(1, shardStats.getCommitIndex());
1020 public void testCommitPhaseFailure() throws Exception {
1021 final ShardTestKit testKit = new ShardTestKit(getSystem());
1022 final DataTree dataTree = createDelegatingMockDataTree();
1023 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1024 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1025 "testCommitPhaseFailure");
1027 ShardTestKit.waitUntilLeader(shard);
1029 final FiniteDuration duration = testKit.duration("5 seconds");
1030 final Timeout timeout = new Timeout(duration);
1032 // Setup 2 simulated transactions with mock cohorts. The first
1036 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1037 .commit(any(DataTreeCandidate.class));
1039 final TransactionIdentifier transactionID1 = nextTransactionId();
1040 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1041 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1042 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1044 final TransactionIdentifier transactionID2 = nextTransactionId();
1045 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1046 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1047 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1049 // Send the CanCommitTransaction message for the first Tx.
1051 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1052 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1053 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1054 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1056 // Send the CanCommitTransaction message for the 2nd Tx. This
1057 // should get queued and
1058 // processed after the first Tx completes.
1060 final Future<Object> canCommitFuture = Patterns.ask(shard,
1061 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1063 // Send the CommitTransaction message for the first Tx. This
1064 // should send back an error
1065 // and trigger the 2nd Tx to proceed.
1067 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1068 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1070 // Wait for the 2nd Tx to complete the canCommit phase.
1072 final CountDownLatch latch = new CountDownLatch(1);
1073 canCommitFuture.onComplete(new OnComplete<Object>() {
1075 public void onComplete(final Throwable failure, final Object resp) {
1078 }, getSystem().dispatcher());
1080 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1082 final InOrder inOrder = inOrder(dataTree);
1083 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1084 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1086 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1087 // validate performs wrapping and we capture that mock
1088 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1090 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1094 public void testPreCommitPhaseFailure() throws Exception {
1095 final ShardTestKit testKit = new ShardTestKit(getSystem());
1096 final DataTree dataTree = createDelegatingMockDataTree();
1097 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1098 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1099 "testPreCommitPhaseFailure");
1101 ShardTestKit.waitUntilLeader(shard);
1103 final FiniteDuration duration = testKit.duration("5 seconds");
1104 final Timeout timeout = new Timeout(duration);
1106 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1107 .prepare(any(DataTreeModification.class));
1109 final TransactionIdentifier transactionID1 = nextTransactionId();
1110 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1111 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1112 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1114 final TransactionIdentifier transactionID2 = nextTransactionId();
1115 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1116 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1117 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1119 // Send the CanCommitTransaction message for the first Tx.
1121 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1122 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1123 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1124 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1126 // Send the CanCommitTransaction message for the 2nd Tx. This
1127 // should get queued and
1128 // processed after the first Tx completes.
1130 final Future<Object> canCommitFuture = Patterns.ask(shard,
1131 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1133 // Send the CommitTransaction message for the first Tx. This
1134 // should send back an error
1135 // and trigger the 2nd Tx to proceed.
1137 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1138 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1140 // Wait for the 2nd Tx to complete the canCommit phase.
1142 final CountDownLatch latch = new CountDownLatch(1);
1143 canCommitFuture.onComplete(new OnComplete<Object>() {
1145 public void onComplete(final Throwable failure, final Object resp) {
1148 }, getSystem().dispatcher());
1150 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1152 final InOrder inOrder = inOrder(dataTree);
1153 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1154 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1155 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1159 public void testCanCommitPhaseFailure() throws Exception {
1160 final ShardTestKit testKit = new ShardTestKit(getSystem());
1161 final DataTree dataTree = createDelegatingMockDataTree();
1162 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1163 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1164 "testCanCommitPhaseFailure");
1166 ShardTestKit.waitUntilLeader(shard);
1168 final FiniteDuration duration = testKit.duration("5 seconds");
1169 final TransactionIdentifier transactionID1 = nextTransactionId();
1171 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1172 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1174 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1175 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1176 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1178 // Send the CanCommitTransaction message.
1180 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1181 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1183 // Send another can commit to ensure the failed one got cleaned
1186 final TransactionIdentifier transactionID2 = nextTransactionId();
1187 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1188 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1189 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1191 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1192 final CanCommitTransactionReply reply = CanCommitTransactionReply
1193 .fromSerializable(testKit.expectMsgClass(CanCommitTransactionReply.class));
1194 assertEquals("getCanCommit", true, reply.getCanCommit());
1198 public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1199 testImmediateCommitWithCanCommitPhaseFailure(true);
1200 testImmediateCommitWithCanCommitPhaseFailure(false);
1203 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1204 final ShardTestKit testKit = new ShardTestKit(getSystem());
1205 final DataTree dataTree = createDelegatingMockDataTree();
1206 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1207 newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1208 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1210 ShardTestKit.waitUntilLeader(shard);
1212 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1213 .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1215 final FiniteDuration duration = testKit.duration("5 seconds");
1217 final TransactionIdentifier transactionID1 = nextTransactionId();
1220 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1221 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1223 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1224 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1227 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1229 // Send another can commit to ensure the failed one got cleaned
1232 final TransactionIdentifier transactionID2 = nextTransactionId();
1234 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1235 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1237 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1238 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), testKit.getRef());
1241 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1245 public void testAbortWithCommitPending() {
1246 final ShardTestKit testKit = new ShardTestKit(getSystem());
1247 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1249 void persistPayload(final Identifier id, final Payload payload,
1250 final boolean batchHint) {
1251 // Simulate an AbortTransaction message occurring during
1252 // replication, after
1253 // persisting and before finishing the commit to the
1256 doAbortTransaction(id, null);
1257 super.persistPayload(id, payload, batchHint);
1261 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1262 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1263 "testAbortWithCommitPending");
1265 ShardTestKit.waitUntilLeader(shard);
1267 final FiniteDuration duration = testKit.duration("5 seconds");
1269 final TransactionIdentifier transactionID = nextTransactionId();
1271 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1272 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1273 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1275 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1276 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1278 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), testKit.getRef());
1279 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1281 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1283 // Since we're simulating an abort occurring during replication
1284 // and before finish commit,
1285 // the data should still get written to the in-memory store
1286 // since we've gotten past
1287 // canCommit and preCommit and persisted the data.
1288 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1292 public void testTransactionCommitTimeout() throws Exception {
1293 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1294 final ShardTestKit testKit = new ShardTestKit(getSystem());
1295 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1296 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1297 "testTransactionCommitTimeout");
1299 ShardTestKit.waitUntilLeader(shard);
1301 final FiniteDuration duration = testKit.duration("5 seconds");
1303 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1304 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1305 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1307 // Ready 2 Tx's - the first will timeout
1309 final TransactionIdentifier transactionID1 = nextTransactionId();
1311 prepareBatchedModifications(transactionID1,
1312 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1313 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1314 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1316 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1318 final TransactionIdentifier transactionID2 = nextTransactionId();
1319 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1320 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1322 prepareBatchedModifications(transactionID2, listNodePath,
1323 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
1324 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1326 // canCommit 1st Tx. We don't send the commit so it should
1329 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1330 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1332 // canCommit the 2nd Tx - it should complete after the 1st Tx
1335 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1336 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1338 // Try to commit the 1st Tx - should fail as it's not the
1341 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1342 testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
1344 // Commit the 2nd Tx.
1346 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1347 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1349 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1350 assertNotNull(listNodePath + " not found", node);
1355 // public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1356 // dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1358 // new ShardTestKit(getSystem()) {{
1359 // final TestActorRef<Shard> shard = actorFactory.createTestActor(
1360 // newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1361 // "testTransactionCommitQueueCapacityExceeded");
1363 // waitUntilLeader(shard);
1365 // final FiniteDuration duration = duration("5 seconds");
1367 // final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1369 // final TransactionIdentifier transactionID1 = nextTransactionId();
1370 // final MutableCompositeModification modification1 = new MutableCompositeModification();
1371 // final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1372 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1375 // final TransactionIdentifier transactionID2 = nextTransactionId();
1376 // final MutableCompositeModification modification2 = new MutableCompositeModification();
1377 // final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1378 // TestModel.OUTER_LIST_PATH,
1379 // ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1382 // final TransactionIdentifier transactionID3 = nextTransactionId();
1383 // final MutableCompositeModification modification3 = new MutableCompositeModification();
1384 // final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1385 // TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1388 // // Ready the Tx's
1390 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1391 // modification1), getRef());
1392 // expectMsgClass(duration, ReadyTransactionReply.class);
1394 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1395 // modification2), getRef());
1396 // expectMsgClass(duration, ReadyTransactionReply.class);
1398 // // The 3rd Tx should exceed queue capacity and fail.
1400 // shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1401 // modification3), getRef());
1402 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1404 // // canCommit 1st Tx.
1406 // shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1407 // expectMsgClass(duration, CanCommitTransactionReply.class);
1409 // // canCommit the 2nd Tx - it should get queued.
1411 // shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1413 // // canCommit the 3rd Tx - should exceed queue capacity and fail.
1415 // shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1416 // expectMsgClass(duration, akka.actor.Status.Failure.class);
1421 public void testTransactionCommitWithPriorExpiredCohortEntries() {
1422 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1423 final ShardTestKit testKit = new ShardTestKit(getSystem());
1424 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1425 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1426 "testTransactionCommitWithPriorExpiredCohortEntries");
1428 ShardTestKit.waitUntilLeader(shard);
1430 final FiniteDuration duration = testKit.duration("5 seconds");
1432 final TransactionIdentifier transactionID1 = nextTransactionId();
1433 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1434 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1435 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1437 final TransactionIdentifier transactionID2 = nextTransactionId();
1438 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1439 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1440 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1442 final TransactionIdentifier transactionID3 = nextTransactionId();
1443 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1444 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1445 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1447 // All Tx's are readied. We'll send canCommit for the last one
1448 // but not the others. The others
1449 // should expire from the queue and the last one should be
1452 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1453 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1457 public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1458 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1459 final ShardTestKit testKit = new ShardTestKit(getSystem());
1460 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1461 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1462 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1464 ShardTestKit.waitUntilLeader(shard);
1466 final FiniteDuration duration = testKit.duration("5 seconds");
1468 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1470 final TransactionIdentifier transactionID1 = nextTransactionId();
1471 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1472 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1473 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1475 // CanCommit the first Tx so it's the current in-progress Tx.
1477 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1478 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1480 // Ready the second Tx.
1482 final TransactionIdentifier transactionID2 = nextTransactionId();
1483 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1484 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
1485 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1487 // Ready the third Tx.
1489 final TransactionIdentifier transactionID3 = nextTransactionId();
1490 final DataTreeModification modification3 = dataStore.newModification();
1491 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1492 .apply(modification3);
1493 modification3.ready();
1494 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1495 true, Optional.empty());
1496 shard.tell(readyMessage, testKit.getRef());
1498 // Commit the first Tx. After completing, the second should
1499 // expire from the queue and the third
1502 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1503 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1505 // Expect commit reply from the third Tx.
1507 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1509 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1510 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1514 public void testCanCommitBeforeReadyFailure() {
1515 final ShardTestKit testKit = new ShardTestKit(getSystem());
1516 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1517 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1518 "testCanCommitBeforeReadyFailure");
1520 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), testKit.getRef());
1521 testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
1525 public void testAbortAfterCanCommit() throws Exception {
1526 final ShardTestKit testKit = new ShardTestKit(getSystem());
1527 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1528 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1530 ShardTestKit.waitUntilLeader(shard);
1532 final FiniteDuration duration = testKit.duration("5 seconds");
1533 final Timeout timeout = new Timeout(duration);
1535 // Ready 2 transactions - the first one will be aborted.
1537 final TransactionIdentifier transactionID1 = nextTransactionId();
1538 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1539 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1540 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1542 final TransactionIdentifier transactionID2 = nextTransactionId();
1543 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1544 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1545 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1547 // Send the CanCommitTransaction message for the first Tx.
1549 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1550 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1551 .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
1552 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1554 // Send the CanCommitTransaction message for the 2nd Tx. This
1555 // should get queued and
1556 // processed after the first Tx completes.
1558 final Future<Object> canCommitFuture = Patterns.ask(shard,
1559 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1561 // Send the AbortTransaction message for the first Tx. This
1562 // should trigger the 2nd
1565 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1566 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1568 // Wait for the 2nd Tx to complete the canCommit phase.
1570 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1571 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1575 public void testAbortAfterReady() {
1576 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1577 final ShardTestKit testKit = new ShardTestKit(getSystem());
1578 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1579 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1581 ShardTestKit.waitUntilLeader(shard);
1583 final FiniteDuration duration = testKit.duration("5 seconds");
1587 final TransactionIdentifier transactionID1 = nextTransactionId();
1588 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1589 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1590 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1592 // Send the AbortTransaction message.
1594 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1595 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1597 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1599 // Now send CanCommitTransaction - should fail.
1601 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1602 final Throwable failure = testKit.expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1603 assertTrue("Failure type", failure instanceof IllegalStateException);
1605 // Ready and CanCommit another and verify success.
1607 final TransactionIdentifier transactionID2 = nextTransactionId();
1608 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1609 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1610 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1612 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1613 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1617 public void testAbortQueuedTransaction() {
1618 final ShardTestKit testKit = new ShardTestKit(getSystem());
1619 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1620 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1622 ShardTestKit.waitUntilLeader(shard);
1624 final FiniteDuration duration = testKit.duration("5 seconds");
1628 final TransactionIdentifier transactionID1 = nextTransactionId();
1629 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1630 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1631 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1633 final TransactionIdentifier transactionID2 = nextTransactionId();
1634 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1635 ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), testKit.getRef());
1636 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1638 final TransactionIdentifier transactionID3 = nextTransactionId();
1639 shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1640 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), testKit.getRef());
1641 testKit.expectMsgClass(duration, ReadyTransactionReply.class);
1643 // Abort the second tx while it's queued.
1645 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
1646 testKit.expectMsgClass(duration, AbortTransactionReply.class);
1648 // Commit the other 2.
1650 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1651 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1653 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
1654 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1656 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1657 testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
1659 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), testKit.getRef());
1660 testKit.expectMsgClass(duration, CommitTransactionReply.class);
1662 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1666 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1667 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1671 public void testCreateSnapshot() throws Exception {
1672 testCreateSnapshot(true, "testCreateSnapshot");
1675 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1676 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1678 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1679 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1680 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1685 public void saveSnapshot(final Object obj) {
1686 savedSnapshot.set(obj);
1687 super.saveSnapshot(obj);
1691 dataStoreContextBuilder.persistent(persistent);
1693 class TestShard extends Shard {
1695 protected TestShard(final AbstractBuilder<?, ?> builder) {
1697 setPersistence(new TestPersistentDataProvider(super.persistence()));
1701 public void handleCommand(final Object message) {
1702 super.handleCommand(message);
1704 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1705 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1706 latch.get().countDown();
1711 public RaftActorContext getRaftActorContext() {
1712 return super.getRaftActorContext();
1716 final ShardTestKit testKit = new ShardTestKit(getSystem());
1718 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1720 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1721 .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1724 ShardTestKit.waitUntilLeader(shard);
1725 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1727 final NormalizedNode<?, ?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1729 // Trigger creation of a snapshot by ensuring
1730 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1731 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1732 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1734 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1735 awaitAndValidateSnapshot(latch, savedSnapshot, expectedRoot);
1738 private static void awaitAndValidateSnapshot(final AtomicReference<CountDownLatch> latch,
1739 final AtomicReference<Object> savedSnapshot, final NormalizedNode<?, ?> expectedRoot)
1740 throws InterruptedException {
1741 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1743 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1745 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1747 latch.set(new CountDownLatch(1));
1748 savedSnapshot.set(null);
1751 private static void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
1752 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot().getRootNode().get();
1753 assertEquals("Root node", expectedRoot, actual);
1757 * This test simply verifies that the applySnapShot logic will work.
1760 public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1761 final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1764 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1765 putTransaction.write(TestModel.TEST_PATH,
1766 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1767 commitTransaction(store, putTransaction);
1770 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1772 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1774 writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1775 writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1777 commitTransaction(store, writeTransaction);
1779 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1781 assertEquals(expected, actual);
1785 public void testRecoveryApplicable() {
1787 final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1788 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1790 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1791 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1793 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1794 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1796 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1797 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1799 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1801 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1803 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1805 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1809 public void testOnDatastoreContext() {
1810 dataStoreContextBuilder.persistent(true);
1812 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1814 assertEquals("isRecoveryApplicable", true, shard.underlyingActor().persistence().isRecoveryApplicable());
1816 ShardTestKit.waitUntilLeader(shard);
1818 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1820 assertEquals("isRecoveryApplicable", false, shard.underlyingActor().persistence().isRecoveryApplicable());
1822 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1824 assertEquals("isRecoveryApplicable", true, shard.underlyingActor().persistence().isRecoveryApplicable());
1828 public void testRegisterRoleChangeListener() {
1829 final ShardTestKit testKit = new ShardTestKit(getSystem());
1830 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1831 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1832 "testRegisterRoleChangeListener");
1834 ShardTestKit.waitUntilLeader(shard);
1836 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1838 shard.tell(new RegisterRoleChangeListener(), listener);
1840 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1842 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1843 ShardLeaderStateChanged.class);
1844 assertEquals("getLocalShardDataTree present", true,
1845 leaderStateChanged.getLocalShardDataTree().isPresent());
1846 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1847 leaderStateChanged.getLocalShardDataTree().get());
1849 MessageCollectorActor.clearMessages(listener);
1851 // Force a leader change
1853 shard.tell(new RequestVote(10000, "member2", 50, 50), testKit.getRef());
1855 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1856 ShardLeaderStateChanged.class);
1857 assertEquals("getLocalShardDataTree present", false, leaderStateChanged.getLocalShardDataTree().isPresent());
1861 public void testFollowerInitialSyncStatus() {
1862 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1863 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1864 "testFollowerInitialSyncStatus");
1866 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1867 "member-1-shard-inventory-operational"));
1869 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1871 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1872 "member-1-shard-inventory-operational"));
1874 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1878 public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1879 final ShardTestKit testKit = new ShardTestKit(getSystem());
1880 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1881 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1882 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1884 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1885 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1886 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1888 setupInMemorySnapshotStore();
1890 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1891 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1892 actorFactory.generateActorId(testName + "-shard"));
1894 testKit.waitUntilNoLeader(shard);
1896 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1897 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
1898 RegisterDataTreeNotificationListenerReply.class);
1899 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1901 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1902 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1904 listener.waitForChangeEvents();
1908 public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
1909 final ShardTestKit testKit = new ShardTestKit(getSystem());
1910 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
1911 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1912 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1914 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
1915 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
1916 TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1918 setupInMemorySnapshotStore();
1920 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1921 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1922 actorFactory.generateActorId(testName + "-shard"));
1924 testKit.waitUntilNoLeader(shard);
1926 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1927 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
1928 RegisterDataTreeNotificationListenerReply.class);
1929 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1931 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
1932 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), testKit.getRef());
1933 testKit.expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
1935 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
1936 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
1938 listener.expectNoMoreChanges("Received unexpected change after close");
1942 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
1943 final ShardTestKit testKit = new ShardTestKit(getSystem());
1944 final String testName = "testClusteredDataTreeChangeListenerRegistration";
1945 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
1946 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
1948 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
1949 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
1951 final TestActorRef<Shard> followerShard = actorFactory
1952 .createTestActor(Shard.builder().id(followerShardID)
1953 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
1954 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
1955 "akka://test/user/" + leaderShardID.toString()))
1956 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1957 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
1959 final TestActorRef<Shard> leaderShard = actorFactory
1960 .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
1961 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
1962 "akka://test/user/" + followerShardID.toString()))
1963 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
1964 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
1966 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1967 final String leaderPath = ShardTestKit.waitUntilLeader(followerShard);
1968 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
1970 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1971 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
1972 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
1973 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
1975 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), testKit.getRef());
1976 final RegisterDataTreeNotificationListenerReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
1977 RegisterDataTreeNotificationListenerReply.class);
1978 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
1980 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1982 listener.waitForChangeEvents();
1986 public void testServerRemoved() {
1987 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
1988 .withDispatcher(Dispatchers.DefaultDispatcherId()));
1990 final ActorRef shard = parent.underlyingActor().context().actorOf(
1991 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1992 "testServerRemoved");
1994 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
1996 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);