2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Function;
35 import com.google.common.base.Stopwatch;
36 import com.google.common.util.concurrent.Futures;
37 import com.google.common.util.concurrent.ListenableFuture;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.Collections;
40 import java.util.HashSet;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
66 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
78 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
96 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
97 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
98 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
99 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
100 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
101 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
102 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
103 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
104 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
105 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
106 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
107 import org.opendaylight.yangtools.util.StringIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
119 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
120 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
121 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
122 import scala.concurrent.Await;
123 import scala.concurrent.Future;
124 import scala.concurrent.duration.FiniteDuration;
126 public class ShardTest extends AbstractShardTest {
127 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
130 public void testRegisterChangeListener() throws Exception {
131 new ShardTestKit(getSystem()) {{
132 final TestActorRef<Shard> shard = actorFactory.createTestActor(
133 newShardProps(), "testRegisterChangeListener");
135 waitUntilLeader(shard);
137 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
139 final MockDataChangeListener listener = new MockDataChangeListener(1);
140 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
141 "testRegisterChangeListener-DataChangeListener");
143 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
144 dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
146 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
147 RegisterChangeListenerReply.class);
148 final String replyPath = reply.getListenerRegistrationPath().toString();
149 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
150 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
152 final YangInstanceIdentifier path = TestModel.TEST_PATH;
153 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
155 listener.waitForChangeEvents(path);
159 @SuppressWarnings("serial")
161 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
162 // This test tests the timing window in which a change listener is registered before the
163 // shard becomes the leader. We verify that the listener is registered and notified of the
164 // existing data when the shard becomes the leader.
165 new ShardTestKit(getSystem()) {{
166 // For this test, we want to send the RegisterChangeListener message after the shard
167 // has recovered from persistence and before it becomes the leader. So we subclass
168 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
169 // we know that the shard has been initialized to a follower and has started the
170 // election process. The following 2 CountDownLatches are used to coordinate the
171 // ElectionTimeout with the sending of the RegisterChangeListener message.
172 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
173 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
174 final Creator<Shard> creator = new Creator<Shard>() {
175 boolean firstElectionTimeout = true;
178 public Shard create() throws Exception {
179 // Use a non persistent provider because this test actually invokes persist on the journal
180 // this will cause all other messages to not be queued properly after that.
181 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
182 // it does do a persist)
183 return new Shard(newShardBuilder()) {
185 public void handleCommand(final Object message) {
186 if(message instanceof ElectionTimeout && firstElectionTimeout) {
187 // Got the first ElectionTimeout. We don't forward it to the
188 // base Shard yet until we've sent the RegisterChangeListener
189 // message. So we signal the onFirstElectionTimeout latch to tell
190 // the main thread to send the RegisterChangeListener message and
191 // start a thread to wait on the onChangeListenerRegistered latch,
192 // which the main thread signals after it has sent the message.
193 // After the onChangeListenerRegistered is triggered, we send the
194 // original ElectionTimeout message to proceed with the election.
195 firstElectionTimeout = false;
196 final ActorRef self = getSelf();
200 Uninterruptibles.awaitUninterruptibly(
201 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
202 self.tell(message, self);
206 onFirstElectionTimeout.countDown();
208 super.handleCommand(message);
215 setupInMemorySnapshotStore();
217 final MockDataChangeListener listener = new MockDataChangeListener(1);
218 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
219 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
221 final TestActorRef<Shard> shard = actorFactory.createTestActor(
222 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
223 "testRegisterChangeListenerWhenNotLeaderInitially");
225 final YangInstanceIdentifier path = TestModel.TEST_PATH;
227 // Wait until the shard receives the first ElectionTimeout message.
228 assertEquals("Got first ElectionTimeout", true,
229 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
231 // Now send the RegisterChangeListener and wait for the reply.
232 shard.tell(new RegisterChangeListener(path, dclActor,
233 AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
235 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
236 RegisterChangeListenerReply.class);
237 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
239 // Sanity check - verify the shard is not the leader yet.
240 shard.tell(FindLeader.INSTANCE, getRef());
241 final FindLeaderReply findLeadeReply =
242 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
243 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
245 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
246 // with the election process.
247 onChangeListenerRegistered.countDown();
249 // Wait for the shard to become the leader and notify our listener with the existing
250 // data in the store.
251 listener.waitForChangeEvents(path);
256 public void testRegisterDataTreeChangeListener() throws Exception {
257 new ShardTestKit(getSystem()) {{
258 final TestActorRef<Shard> shard = actorFactory.createTestActor(
259 newShardProps(), "testRegisterDataTreeChangeListener");
261 waitUntilLeader(shard);
263 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
265 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
266 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
267 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
269 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
271 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
272 RegisterDataTreeChangeListenerReply.class);
273 final String replyPath = reply.getListenerRegistrationPath().toString();
274 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
275 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
277 final YangInstanceIdentifier path = TestModel.TEST_PATH;
278 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
280 listener.waitForChangeEvents();
284 @SuppressWarnings("serial")
286 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
287 new ShardTestKit(getSystem()) {{
288 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
289 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
290 final Creator<Shard> creator = new Creator<Shard>() {
291 boolean firstElectionTimeout = true;
294 public Shard create() throws Exception {
295 return new Shard(newShardBuilder()) {
297 public void handleCommand(final Object message) {
298 if(message instanceof ElectionTimeout && firstElectionTimeout) {
299 firstElectionTimeout = false;
300 final ActorRef self = getSelf();
304 Uninterruptibles.awaitUninterruptibly(
305 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
306 self.tell(message, self);
310 onFirstElectionTimeout.countDown();
312 super.handleCommand(message);
319 setupInMemorySnapshotStore();
321 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
322 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
323 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
325 final TestActorRef<Shard> shard = actorFactory.createTestActor(
326 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
327 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
329 final YangInstanceIdentifier path = TestModel.TEST_PATH;
331 assertEquals("Got first ElectionTimeout", true,
332 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
334 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
335 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
336 RegisterDataTreeChangeListenerReply.class);
337 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
339 shard.tell(FindLeader.INSTANCE, getRef());
340 final FindLeaderReply findLeadeReply =
341 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
342 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
345 onChangeListenerRegistered.countDown();
347 // TODO: investigate why we do not receive data chage events
348 listener.waitForChangeEvents();
353 public void testCreateTransaction(){
354 new ShardTestKit(getSystem()) {{
355 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
357 waitUntilLeader(shard);
359 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
362 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
364 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
365 CreateTransactionReply.class);
367 final String path = reply.getTransactionPath().toString();
368 assertTrue("Unexpected transaction path " + path,
369 path.startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
374 public void testCreateTransactionOnChain(){
375 new ShardTestKit(getSystem()) {{
376 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
378 waitUntilLeader(shard);
380 shard.tell(new CreateTransaction(nextTransactionId(),TransactionType.READ_ONLY.ordinal(),
381 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
383 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
384 CreateTransactionReply.class);
386 final String path = reply.getTransactionPath().toString();
387 assertTrue("Unexpected transaction path " + path,
388 path.startsWith("akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
393 public void testPeerAddressResolved() throws Exception {
394 new ShardTestKit(getSystem()) {{
395 ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
396 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
397 peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
398 withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
400 final String address = "akka://foobar";
401 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
403 shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
404 OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
405 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
410 public void testApplySnapshot() throws Exception {
412 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
414 ShardTestKit.waitUntilLeader(shard);
416 final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
417 store.setSchemaContext(SCHEMA_CONTEXT);
419 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
420 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
421 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
422 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
424 writeToStore(store, TestModel.TEST_PATH, container);
426 final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
427 final NormalizedNode<?,?> expected = readStore(store, root);
429 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
430 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
432 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
434 final NormalizedNode<?,?> actual = readStore(shard, root);
436 assertEquals("Root node", expected, actual);
440 public void testApplyState() throws Exception {
441 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
443 ShardTestKit.waitUntilLeader(shard);
445 final DataTree source = setupInMemorySnapshotStore();
446 final DataTreeModification writeMod = source.takeSnapshot().newModification();
447 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
448 writeMod.write(TestModel.TEST_PATH, node);
451 final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
452 new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod)));
454 shard.tell(applyState, shard);
456 Stopwatch sw = Stopwatch.createStarted();
457 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
458 Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
460 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
462 assertEquals("Applied state", node, actual);
467 fail("State was not applied");
471 public void testDataTreeCandidateRecovery() throws Exception {
472 // Set up the InMemorySnapshotStore.
473 final DataTree source = setupInMemorySnapshotStore();
475 final DataTreeModification writeMod = source.takeSnapshot().newModification();
476 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
478 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
480 // Set up the InMemoryJournal.
481 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
483 final int nListEntries = 16;
484 final Set<Integer> listEntryKeys = new HashSet<>();
486 // Add some ModificationPayload entries
487 for (int i = 1; i <= nListEntries; i++) {
488 listEntryKeys.add(Integer.valueOf(i));
490 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
491 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
493 final DataTreeModification mod = source.takeSnapshot().newModification();
494 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
496 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
497 payloadForModification(source, mod)));
500 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
501 new ApplyJournalEntries(nListEntries));
503 testRecovery(listEntryKeys);
507 public void testConcurrentThreePhaseCommits() throws Throwable {
508 new ShardTestKit(getSystem()) {{
509 final TestActorRef<Shard> shard = actorFactory.createTestActor(
510 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
511 "testConcurrentThreePhaseCommits");
513 waitUntilLeader(shard);
515 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
517 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
519 final TransactionIdentifier transactionID1 = nextTransactionId();
520 final MutableCompositeModification modification1 = new MutableCompositeModification();
521 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
522 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
524 final TransactionIdentifier transactionID2 = nextTransactionId();
525 final MutableCompositeModification modification2 = new MutableCompositeModification();
526 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
527 TestModel.OUTER_LIST_PATH,
528 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
531 final TransactionIdentifier transactionID3 = nextTransactionId();
532 final MutableCompositeModification modification3 = new MutableCompositeModification();
533 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
534 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
535 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
536 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
539 final long timeoutSec = 5;
540 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
541 final Timeout timeout = new Timeout(duration);
543 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
544 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
545 expectMsgClass(duration, ReadyTransactionReply.class));
546 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
548 // Send the CanCommitTransaction message for the first Tx.
550 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
551 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
552 expectMsgClass(duration, CanCommitTransactionReply.class));
553 assertEquals("Can commit", true, canCommitReply.getCanCommit());
555 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
556 expectMsgClass(duration, ReadyTransactionReply.class);
558 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
559 expectMsgClass(duration, ReadyTransactionReply.class);
561 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
562 // processed after the first Tx completes.
564 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
565 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
567 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
568 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
570 // Send the CommitTransaction message for the first Tx. After it completes, it should
571 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
573 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
574 expectMsgClass(duration, CommitTransactionReply.class);
576 // Wait for the next 2 Tx's to complete.
578 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
579 final CountDownLatch commitLatch = new CountDownLatch(2);
581 class OnFutureComplete extends OnComplete<Object> {
582 private final Class<?> expRespType;
584 OnFutureComplete(final Class<?> expRespType) {
585 this.expRespType = expRespType;
589 public void onComplete(final Throwable error, final Object resp) {
591 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
594 assertEquals("Commit response type", expRespType, resp.getClass());
596 } catch (final Exception e) {
602 void onSuccess(final Object resp) throws Exception {
606 class OnCommitFutureComplete extends OnFutureComplete {
607 OnCommitFutureComplete() {
608 super(CommitTransactionReply.class);
612 public void onComplete(final Throwable error, final Object resp) {
613 super.onComplete(error, resp);
614 commitLatch.countDown();
618 class OnCanCommitFutureComplete extends OnFutureComplete {
619 private final TransactionIdentifier transactionID;
621 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
622 super(CanCommitTransactionReply.class);
623 this.transactionID = transactionID;
627 void onSuccess(final Object resp) throws Exception {
628 final CanCommitTransactionReply canCommitReply =
629 CanCommitTransactionReply.fromSerializable(resp);
630 assertEquals("Can commit", true, canCommitReply.getCanCommit());
632 final Future<Object> commitFuture = Patterns.ask(shard,
633 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
634 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
638 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
639 getSystem().dispatcher());
641 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
642 getSystem().dispatcher());
644 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
646 if(caughtEx.get() != null) {
647 throw caughtEx.get();
650 assertEquals("Commits complete", true, done);
652 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
653 inOrder.verify(cohort1).canCommit();
654 inOrder.verify(cohort1).preCommit();
655 inOrder.verify(cohort1).commit();
656 inOrder.verify(cohort2).canCommit();
657 inOrder.verify(cohort2).preCommit();
658 inOrder.verify(cohort2).commit();
659 inOrder.verify(cohort3).canCommit();
660 inOrder.verify(cohort3).preCommit();
661 inOrder.verify(cohort3).commit();
663 // Verify data in the data store.
665 verifyOuterListEntry(shard, 1);
667 verifyLastApplied(shard, 2);
672 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
673 new ShardTestKit(getSystem()) {{
674 final TestActorRef<Shard> shard = actorFactory.createTestActor(
675 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
676 "testBatchedModificationsWithNoCommitOnReady");
678 waitUntilLeader(shard);
680 final TransactionIdentifier transactionID = nextTransactionId();
681 final FiniteDuration duration = duration("5 seconds");
683 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
684 final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
685 if(mockCohort.get() == null) {
686 mockCohort.set(createDelegatingMockCohort("cohort", actual));
689 return mockCohort.get();
692 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
694 // Send a BatchedModifications to start a transaction.
696 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
697 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
698 expectMsgClass(duration, BatchedModificationsReply.class);
700 // Send a couple more BatchedModifications.
702 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
703 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
704 expectMsgClass(duration, BatchedModificationsReply.class);
706 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
707 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
708 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
709 expectMsgClass(duration, ReadyTransactionReply.class);
711 // Send the CanCommitTransaction message.
713 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
714 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
715 expectMsgClass(duration, CanCommitTransactionReply.class));
716 assertEquals("Can commit", true, canCommitReply.getCanCommit());
718 // Send the CanCommitTransaction message.
720 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
721 expectMsgClass(duration, CommitTransactionReply.class);
723 final InOrder inOrder = inOrder(mockCohort.get());
724 inOrder.verify(mockCohort.get()).canCommit();
725 inOrder.verify(mockCohort.get()).preCommit();
726 inOrder.verify(mockCohort.get()).commit();
728 // Verify data in the data store.
730 verifyOuterListEntry(shard, 1);
735 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
736 new ShardTestKit(getSystem()) {{
737 final TestActorRef<Shard> shard = actorFactory.createTestActor(
738 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
739 "testBatchedModificationsWithCommitOnReady");
741 waitUntilLeader(shard);
743 final TransactionIdentifier transactionID = nextTransactionId();
744 final FiniteDuration duration = duration("5 seconds");
746 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
747 final ShardCommitCoordinator.CohortDecorator cohortDecorator = (txID, actual) -> {
748 if(mockCohort.get() == null) {
749 mockCohort.set(createDelegatingMockCohort("cohort", actual));
752 return mockCohort.get();
755 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
757 // Send a BatchedModifications to start a transaction.
759 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
760 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
761 expectMsgClass(duration, BatchedModificationsReply.class);
763 // Send a couple more BatchedModifications.
765 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
766 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
767 expectMsgClass(duration, BatchedModificationsReply.class);
769 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
770 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
771 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
773 expectMsgClass(duration, CommitTransactionReply.class);
775 final InOrder inOrder = inOrder(mockCohort.get());
776 inOrder.verify(mockCohort.get()).canCommit();
777 inOrder.verify(mockCohort.get()).preCommit();
778 inOrder.verify(mockCohort.get()).commit();
780 // Verify data in the data store.
782 verifyOuterListEntry(shard, 1);
786 @Test(expected=IllegalStateException.class)
787 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
788 new ShardTestKit(getSystem()) {{
789 final TestActorRef<Shard> shard = actorFactory.createTestActor(
790 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
791 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
793 waitUntilLeader(shard);
795 final TransactionIdentifier transactionID = nextTransactionId();
796 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
797 batched.setReady(true);
798 batched.setTotalMessagesSent(2);
800 shard.tell(batched, getRef());
802 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
804 if(failure != null) {
805 throw failure.cause();
811 public void testBatchedModificationsWithOperationFailure() throws Throwable {
812 new ShardTestKit(getSystem()) {{
813 final TestActorRef<Shard> shard = actorFactory.createTestActor(
814 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
815 "testBatchedModificationsWithOperationFailure");
817 waitUntilLeader(shard);
819 // Test merge with invalid data. An exception should occur when the merge is applied. Note that
820 // write will not validate the children for performance reasons.
822 TransactionIdentifier transactionID = nextTransactionId();
824 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
825 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
826 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
828 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
829 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
830 shard.tell(batched, getRef());
831 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
833 Throwable cause = failure.cause();
835 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
836 batched.setReady(true);
837 batched.setTotalMessagesSent(2);
839 shard.tell(batched, getRef());
841 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
842 assertEquals("Failure cause", cause, failure.cause());
847 public void testBatchedModificationsOnTransactionChain() throws Throwable {
848 new ShardTestKit(getSystem()) {{
849 final TestActorRef<Shard> shard = actorFactory.createTestActor(
850 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
851 "testBatchedModificationsOnTransactionChain");
853 waitUntilLeader(shard);
855 final LocalHistoryIdentifier historyId = nextHistoryId();
856 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
857 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
859 final FiniteDuration duration = duration("5 seconds");
861 // Send a BatchedModifications to start a chained write transaction and ready it.
863 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
864 final YangInstanceIdentifier path = TestModel.TEST_PATH;
865 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
866 expectMsgClass(duration, ReadyTransactionReply.class);
868 // Create a read Tx on the same chain.
870 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
871 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
873 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
875 getSystem().actorSelection(createReply.getTransactionPath()).tell(
876 new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
877 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
878 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
880 // Commit the write transaction.
882 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
883 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
884 expectMsgClass(duration, CanCommitTransactionReply.class));
885 assertEquals("Can commit", true, canCommitReply.getCanCommit());
887 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
888 expectMsgClass(duration, CommitTransactionReply.class);
890 // Verify data in the data store.
892 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
893 assertEquals("Stored node", containerNode, actualNode);
898 public void testOnBatchedModificationsWhenNotLeader() {
899 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
900 new ShardTestKit(getSystem()) {{
901 final Creator<Shard> creator = new Creator<Shard>() {
902 private static final long serialVersionUID = 1L;
905 public Shard create() throws Exception {
906 return new Shard(newShardBuilder()) {
908 protected boolean isLeader() {
909 return overrideLeaderCalls.get() ? false : super.isLeader();
913 protected ActorSelection getLeader() {
914 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
921 final TestActorRef<Shard> shard = actorFactory.createTestActor(
922 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
924 waitUntilLeader(shard);
926 overrideLeaderCalls.set(true);
928 final BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
930 shard.tell(batched, ActorRef.noSender());
932 expectMsgEquals(batched);
937 public void testTransactionMessagesWithNoLeader() {
938 new ShardTestKit(getSystem()) {{
939 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
940 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
941 final TestActorRef<Shard> shard = actorFactory.createTestActor(
942 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
943 "testTransactionMessagesWithNoLeader");
945 waitUntilNoLeader(shard);
947 final TransactionIdentifier txId = nextTransactionId();
948 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
949 Failure failure = expectMsgClass(Failure.class);
950 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
952 shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), txId,
953 DataStoreVersions.CURRENT_VERSION, true), getRef());
954 failure = expectMsgClass(Failure.class);
955 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
957 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
958 failure = expectMsgClass(Failure.class);
959 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
964 public void testReadyWithReadWriteImmediateCommit() throws Exception{
965 testReadyWithImmediateCommit(true);
969 public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
970 testReadyWithImmediateCommit(false);
973 private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
974 new ShardTestKit(getSystem()) {{
975 final TestActorRef<Shard> shard = actorFactory.createTestActor(
976 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
977 "testReadyWithImmediateCommit-" + readWrite);
979 waitUntilLeader(shard);
981 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
983 final TransactionIdentifier transactionID = nextTransactionId();
984 final MutableCompositeModification modification = new MutableCompositeModification();
985 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
986 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
987 TestModel.TEST_PATH, containerNode, modification);
989 final FiniteDuration duration = duration("5 seconds");
991 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
993 expectMsgClass(duration, CommitTransactionReply.class);
995 final InOrder inOrder = inOrder(cohort);
996 inOrder.verify(cohort).canCommit();
997 inOrder.verify(cohort).preCommit();
998 inOrder.verify(cohort).commit();
1000 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1001 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1006 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1007 new ShardTestKit(getSystem()) {{
1008 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1009 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1010 "testReadyLocalTransactionWithImmediateCommit");
1012 waitUntilLeader(shard);
1014 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1016 final DataTreeModification modification = dataStore.newModification();
1018 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1019 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1020 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1021 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1023 final TransactionIdentifier txId = nextTransactionId();
1024 modification.ready();
1025 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1027 shard.tell(readyMessage, getRef());
1029 expectMsgClass(CommitTransactionReply.class);
1031 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1032 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1037 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1038 new ShardTestKit(getSystem()) {{
1039 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1040 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1041 "testReadyLocalTransactionWithThreePhaseCommit");
1043 waitUntilLeader(shard);
1045 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1047 final DataTreeModification modification = dataStore.newModification();
1049 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1050 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1051 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1052 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1054 final TransactionIdentifier txId = nextTransactionId();
1055 modification.ready();
1056 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1058 shard.tell(readyMessage, getRef());
1060 expectMsgClass(ReadyTransactionReply.class);
1062 // Send the CanCommitTransaction message.
1064 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1065 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1066 expectMsgClass(CanCommitTransactionReply.class));
1067 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1069 // Send the CanCommitTransaction message.
1071 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1072 expectMsgClass(CommitTransactionReply.class);
1074 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1075 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1080 public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
1081 testCommitWithPersistenceDisabled(true);
1085 public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
1086 testCommitWithPersistenceDisabled(true);
1089 private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1090 dataStoreContextBuilder.persistent(false);
1091 new ShardTestKit(getSystem()) {{
1092 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1093 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1094 "testCommitWithPersistenceDisabled-" + readWrite);
1096 waitUntilLeader(shard);
1098 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1100 // Setup a simulated transactions with a mock cohort.
1102 final TransactionIdentifier transactionID = nextTransactionId();
1103 final MutableCompositeModification modification = new MutableCompositeModification();
1104 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1105 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1106 TestModel.TEST_PATH, containerNode, modification);
1108 final FiniteDuration duration = duration("5 seconds");
1110 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1111 expectMsgClass(duration, ReadyTransactionReply.class);
1113 // Send the CanCommitTransaction message.
1115 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1116 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1117 expectMsgClass(duration, CanCommitTransactionReply.class));
1118 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1120 // Send the CanCommitTransaction message.
1122 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1123 expectMsgClass(duration, CommitTransactionReply.class);
1125 final InOrder inOrder = inOrder(cohort);
1126 inOrder.verify(cohort).canCommit();
1127 inOrder.verify(cohort).preCommit();
1128 inOrder.verify(cohort).commit();
1130 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1131 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1136 public void testReadWriteCommitWhenTransactionHasNoModifications() {
1137 testCommitWhenTransactionHasNoModifications(true);
1141 public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
1142 testCommitWhenTransactionHasNoModifications(false);
1145 private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1146 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1147 // but here that need not happen
1148 new ShardTestKit(getSystem()) {
1150 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1151 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1152 "testCommitWhenTransactionHasNoModifications-" + readWrite);
1154 waitUntilLeader(shard);
1156 final TransactionIdentifier transactionID = nextTransactionId();
1157 final MutableCompositeModification modification = new MutableCompositeModification();
1158 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1159 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1160 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1161 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1162 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1164 final FiniteDuration duration = duration("5 seconds");
1166 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1167 expectMsgClass(duration, ReadyTransactionReply.class);
1169 // Send the CanCommitTransaction message.
1171 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1172 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1173 expectMsgClass(duration, CanCommitTransactionReply.class));
1174 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1176 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1177 expectMsgClass(duration, CommitTransactionReply.class);
1179 final InOrder inOrder = inOrder(cohort);
1180 inOrder.verify(cohort).canCommit();
1181 inOrder.verify(cohort).preCommit();
1182 inOrder.verify(cohort).commit();
1184 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1185 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1187 // Use MBean for verification
1188 // Committed transaction count should increase as usual
1189 assertEquals(1,shardStats.getCommittedTransactionsCount());
1191 // Commit index should not advance because this does not go into the journal
1192 assertEquals(-1, shardStats.getCommitIndex());
1198 public void testReadWriteCommitWhenTransactionHasModifications() {
1199 testCommitWhenTransactionHasModifications(true);
1203 public void testWriteOnlyCommitWhenTransactionHasModifications() {
1204 testCommitWhenTransactionHasModifications(false);
1207 private void testCommitWhenTransactionHasModifications(final boolean readWrite){
1208 new ShardTestKit(getSystem()) {
1210 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1211 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1212 "testCommitWhenTransactionHasModifications-" + readWrite);
1214 waitUntilLeader(shard);
1216 final TransactionIdentifier transactionID = nextTransactionId();
1217 final MutableCompositeModification modification = new MutableCompositeModification();
1218 modification.addModification(new DeleteModification(YangInstanceIdentifier.EMPTY));
1219 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1220 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1221 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1222 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1223 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1225 final FiniteDuration duration = duration("5 seconds");
1227 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1228 expectMsgClass(duration, ReadyTransactionReply.class);
1230 // Send the CanCommitTransaction message.
1232 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1233 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1234 expectMsgClass(duration, CanCommitTransactionReply.class));
1235 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1237 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1238 expectMsgClass(duration, CommitTransactionReply.class);
1240 final InOrder inOrder = inOrder(cohort);
1241 inOrder.verify(cohort).canCommit();
1242 inOrder.verify(cohort).preCommit();
1243 inOrder.verify(cohort).commit();
1245 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1246 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1248 // Use MBean for verification
1249 // Committed transaction count should increase as usual
1250 assertEquals(1, shardStats.getCommittedTransactionsCount());
1252 // Commit index should advance as we do not have an empty modification
1253 assertEquals(0, shardStats.getCommitIndex());
1259 public void testCommitPhaseFailure() throws Throwable {
1260 testCommitPhaseFailure(true);
1261 testCommitPhaseFailure(false);
1264 private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1265 new ShardTestKit(getSystem()) {{
1266 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1267 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1268 "testCommitPhaseFailure-" + readWrite);
1270 waitUntilLeader(shard);
1272 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1275 final TransactionIdentifier transactionID1 = nextTransactionId();
1276 final MutableCompositeModification modification1 = new MutableCompositeModification();
1277 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1278 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1279 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1280 doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
1281 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1283 final TransactionIdentifier transactionID2 = nextTransactionId();
1284 final MutableCompositeModification modification2 = new MutableCompositeModification();
1285 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1286 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1288 final FiniteDuration duration = duration("5 seconds");
1289 final Timeout timeout = new Timeout(duration);
1291 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1292 expectMsgClass(duration, ReadyTransactionReply.class);
1294 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1295 expectMsgClass(duration, ReadyTransactionReply.class);
1297 // Send the CanCommitTransaction message for the first Tx.
1299 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1300 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1301 expectMsgClass(duration, CanCommitTransactionReply.class));
1302 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1304 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1305 // processed after the first Tx completes.
1307 final Future<Object> canCommitFuture = Patterns.ask(shard,
1308 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1310 // Send the CommitTransaction message for the first Tx. This should send back an error
1311 // and trigger the 2nd Tx to proceed.
1313 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1314 expectMsgClass(duration, akka.actor.Status.Failure.class);
1316 // Wait for the 2nd Tx to complete the canCommit phase.
1318 final CountDownLatch latch = new CountDownLatch(1);
1319 canCommitFuture.onComplete(new OnComplete<Object>() {
1321 public void onComplete(final Throwable t, final Object resp) {
1324 }, getSystem().dispatcher());
1326 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1328 final InOrder inOrder = inOrder(cohort1, cohort2);
1329 inOrder.verify(cohort1).canCommit();
1330 inOrder.verify(cohort1).preCommit();
1331 inOrder.verify(cohort1).commit();
1332 inOrder.verify(cohort2).canCommit();
1337 public void testPreCommitPhaseFailure() throws Throwable {
1338 testPreCommitPhaseFailure(true);
1339 testPreCommitPhaseFailure(false);
1342 private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1343 new ShardTestKit(getSystem()) {{
1344 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1345 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1346 "testPreCommitPhaseFailure-" + readWrite);
1348 waitUntilLeader(shard);
1350 final TransactionIdentifier transactionID1 = nextTransactionId();
1351 final MutableCompositeModification modification1 = new MutableCompositeModification();
1352 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1353 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1354 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1356 final TransactionIdentifier transactionID2 = nextTransactionId();
1357 final MutableCompositeModification modification2 = new MutableCompositeModification();
1358 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1359 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1361 final FiniteDuration duration = duration("5 seconds");
1362 final Timeout timeout = new Timeout(duration);
1364 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1365 expectMsgClass(duration, ReadyTransactionReply.class);
1367 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1368 expectMsgClass(duration, ReadyTransactionReply.class);
1370 // Send the CanCommitTransaction message for the first Tx.
1372 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1373 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1374 expectMsgClass(duration, CanCommitTransactionReply.class));
1375 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1377 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1378 // processed after the first Tx completes.
1380 final Future<Object> canCommitFuture = Patterns.ask(shard,
1381 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1383 // Send the CommitTransaction message for the first Tx. This should send back an error
1384 // and trigger the 2nd Tx to proceed.
1386 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1387 expectMsgClass(duration, akka.actor.Status.Failure.class);
1389 // Wait for the 2nd Tx to complete the canCommit phase.
1391 final CountDownLatch latch = new CountDownLatch(1);
1392 canCommitFuture.onComplete(new OnComplete<Object>() {
1394 public void onComplete(final Throwable t, final Object resp) {
1397 }, getSystem().dispatcher());
1399 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1401 final InOrder inOrder = inOrder(cohort1, cohort2);
1402 inOrder.verify(cohort1).canCommit();
1403 inOrder.verify(cohort1).preCommit();
1404 inOrder.verify(cohort2).canCommit();
1409 public void testCanCommitPhaseFailure() throws Throwable {
1410 testCanCommitPhaseFailure(true);
1411 testCanCommitPhaseFailure(false);
1414 private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1415 new ShardTestKit(getSystem()) {{
1416 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1417 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1418 "testCanCommitPhaseFailure-" + readWrite);
1420 waitUntilLeader(shard);
1422 final FiniteDuration duration = duration("5 seconds");
1424 final TransactionIdentifier transactionID1 = nextTransactionId();
1425 final MutableCompositeModification modification = new MutableCompositeModification();
1426 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1427 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1429 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1430 expectMsgClass(duration, ReadyTransactionReply.class);
1432 // Send the CanCommitTransaction message.
1434 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1435 expectMsgClass(duration, akka.actor.Status.Failure.class);
1437 // Send another can commit to ensure the failed one got cleaned up.
1441 final TransactionIdentifier transactionID2 = nextTransactionId();
1442 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1444 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1445 expectMsgClass(duration, ReadyTransactionReply.class);
1447 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1448 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1449 expectMsgClass(CanCommitTransactionReply.class));
1450 assertEquals("getCanCommit", true, reply.getCanCommit());
1455 public void testCanCommitPhaseFalseResponse() throws Throwable {
1456 testCanCommitPhaseFalseResponse(true);
1457 testCanCommitPhaseFalseResponse(false);
1460 private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1461 new ShardTestKit(getSystem()) {{
1462 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1463 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1464 "testCanCommitPhaseFalseResponse-" + readWrite);
1466 waitUntilLeader(shard);
1468 final FiniteDuration duration = duration("5 seconds");
1470 final TransactionIdentifier transactionID1 = nextTransactionId();
1471 final MutableCompositeModification modification = new MutableCompositeModification();
1472 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1473 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1475 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1476 expectMsgClass(duration, ReadyTransactionReply.class);
1478 // Send the CanCommitTransaction message.
1480 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1481 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1482 expectMsgClass(CanCommitTransactionReply.class));
1483 assertEquals("getCanCommit", false, reply.getCanCommit());
1485 // Send another can commit to ensure the failed one got cleaned up.
1489 final TransactionIdentifier transactionID2 = nextTransactionId();
1490 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1492 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1493 expectMsgClass(duration, ReadyTransactionReply.class);
1495 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1496 reply = CanCommitTransactionReply.fromSerializable(
1497 expectMsgClass(CanCommitTransactionReply.class));
1498 assertEquals("getCanCommit", true, reply.getCanCommit());
1503 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1504 testImmediateCommitWithCanCommitPhaseFailure(true);
1505 testImmediateCommitWithCanCommitPhaseFailure(false);
1508 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1509 new ShardTestKit(getSystem()) {{
1510 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1511 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1512 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1514 waitUntilLeader(shard);
1516 final FiniteDuration duration = duration("5 seconds");
1518 final TransactionIdentifier transactionID1 = nextTransactionId();
1519 final MutableCompositeModification modification = new MutableCompositeModification();
1520 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1521 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1523 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1525 expectMsgClass(duration, akka.actor.Status.Failure.class);
1527 // Send another can commit to ensure the failed one got cleaned up.
1531 final TransactionIdentifier transactionID2 = nextTransactionId();
1532 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1533 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1534 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1535 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1536 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1537 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1538 doReturn(candidateRoot).when(candidate).getRootNode();
1539 doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
1540 doReturn(candidate).when(cohort).getCandidate();
1542 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1544 expectMsgClass(duration, CommitTransactionReply.class);
1549 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1550 testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1551 testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1554 private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1555 new ShardTestKit(getSystem()) {{
1556 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1557 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1558 "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1560 waitUntilLeader(shard);
1562 final FiniteDuration duration = duration("5 seconds");
1564 final TransactionIdentifier transactionID1 = nextTransactionId();
1565 final MutableCompositeModification modification = new MutableCompositeModification();
1566 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1567 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1569 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1571 expectMsgClass(duration, akka.actor.Status.Failure.class);
1573 // Send another can commit to ensure the failed one got cleaned up.
1577 final TransactionIdentifier transactionID2 = nextTransactionId();
1578 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1579 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1580 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1581 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1582 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1583 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1584 doReturn(candidateRoot).when(candidate).getRootNode();
1585 doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
1586 doReturn(candidate).when(cohort).getCandidate();
1588 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1590 expectMsgClass(duration, CommitTransactionReply.class);
1595 public void testAbortBeforeFinishCommit() throws Throwable {
1596 testAbortBeforeFinishCommit(true);
1597 testAbortBeforeFinishCommit(false);
1600 private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1601 new ShardTestKit(getSystem()) {{
1602 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1603 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1604 "testAbortBeforeFinishCommit-" + readWrite);
1606 waitUntilLeader(shard);
1608 final FiniteDuration duration = duration("5 seconds");
1609 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1611 final TransactionIdentifier transactionID = nextTransactionId();
1612 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1614 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1616 // Simulate an AbortTransaction message occurring during replication, after
1617 // persisting and before finishing the commit to the in-memory store.
1618 // We have no followers so due to optimizations in the RaftActor, it does not
1619 // attempt replication and thus we can't send an AbortTransaction message b/c
1620 // it would be processed too late after CommitTransaction completes. So we'll
1621 // simulate an AbortTransaction message occurring during replication by calling
1622 // the shard directly.
1624 shard.underlyingActor().doAbortTransaction(transactionID, null);
1626 return preCommitFuture;
1629 final MutableCompositeModification modification = new MutableCompositeModification();
1630 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1631 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1632 modification, preCommit);
1634 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1635 expectMsgClass(duration, ReadyTransactionReply.class);
1637 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1638 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1639 expectMsgClass(duration, CanCommitTransactionReply.class));
1640 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1642 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1643 expectMsgClass(duration, CommitTransactionReply.class);
1645 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1647 // Since we're simulating an abort occurring during replication and before finish commit,
1648 // the data should still get written to the in-memory store since we've gotten past
1649 // canCommit and preCommit and persisted the data.
1650 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1655 public void testTransactionCommitTimeout() throws Throwable {
1656 testTransactionCommitTimeout(true);
1657 testTransactionCommitTimeout(false);
1660 private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1661 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1663 new ShardTestKit(getSystem()) {{
1664 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1665 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1666 "testTransactionCommitTimeout-" + readWrite);
1668 waitUntilLeader(shard);
1670 final FiniteDuration duration = duration("5 seconds");
1672 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1674 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1675 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1676 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1678 // Create 1st Tx - will timeout
1680 final TransactionIdentifier transactionID1 = nextTransactionId();
1681 final MutableCompositeModification modification1 = new MutableCompositeModification();
1682 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1683 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1684 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1685 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1690 final TransactionIdentifier transactionID2 = nextTransactionId();
1691 final MutableCompositeModification modification2 = new MutableCompositeModification();
1692 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1693 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1694 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1696 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1701 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1702 expectMsgClass(duration, ReadyTransactionReply.class);
1704 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1705 expectMsgClass(duration, ReadyTransactionReply.class);
1707 // canCommit 1st Tx. We don't send the commit so it should timeout.
1709 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1710 expectMsgClass(duration, CanCommitTransactionReply.class);
1712 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1714 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1715 expectMsgClass(duration, CanCommitTransactionReply.class);
1717 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1719 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1720 expectMsgClass(duration, akka.actor.Status.Failure.class);
1722 // Commit the 2nd Tx.
1724 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1725 expectMsgClass(duration, CommitTransactionReply.class);
1727 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1728 assertNotNull(listNodePath + " not found", node);
1733 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1734 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1736 new ShardTestKit(getSystem()) {{
1737 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1738 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1739 "testTransactionCommitQueueCapacityExceeded");
1741 waitUntilLeader(shard);
1743 final FiniteDuration duration = duration("5 seconds");
1745 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1747 final TransactionIdentifier transactionID1 = nextTransactionId();
1748 final MutableCompositeModification modification1 = new MutableCompositeModification();
1749 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1750 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1752 final TransactionIdentifier transactionID2 = nextTransactionId();
1753 final MutableCompositeModification modification2 = new MutableCompositeModification();
1754 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1755 TestModel.OUTER_LIST_PATH,
1756 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1759 final TransactionIdentifier transactionID3 = nextTransactionId();
1760 final MutableCompositeModification modification3 = new MutableCompositeModification();
1761 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1762 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1766 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1767 expectMsgClass(duration, ReadyTransactionReply.class);
1769 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1770 expectMsgClass(duration, ReadyTransactionReply.class);
1772 // The 3rd Tx should exceed queue capacity and fail.
1774 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1775 expectMsgClass(duration, akka.actor.Status.Failure.class);
1777 // canCommit 1st Tx.
1779 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1780 expectMsgClass(duration, CanCommitTransactionReply.class);
1782 // canCommit the 2nd Tx - it should get queued.
1784 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1786 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1788 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1789 expectMsgClass(duration, akka.actor.Status.Failure.class);
1794 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1795 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1797 new ShardTestKit(getSystem()) {{
1798 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1799 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1800 "testTransactionCommitWithPriorExpiredCohortEntries");
1802 waitUntilLeader(shard);
1804 final FiniteDuration duration = duration("5 seconds");
1806 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1808 final TransactionIdentifier transactionID1 = nextTransactionId();
1809 final MutableCompositeModification modification1 = new MutableCompositeModification();
1810 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1811 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1813 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1814 expectMsgClass(duration, ReadyTransactionReply.class);
1816 final TransactionIdentifier transactionID2 = nextTransactionId();
1817 final MutableCompositeModification modification2 = new MutableCompositeModification();
1818 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1819 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1821 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1822 expectMsgClass(duration, ReadyTransactionReply.class);
1824 final TransactionIdentifier transactionID3 = nextTransactionId();
1825 final MutableCompositeModification modification3 = new MutableCompositeModification();
1826 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1827 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1829 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1830 expectMsgClass(duration, ReadyTransactionReply.class);
1832 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1833 // should expire from the queue and the last one should be processed.
1835 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1836 expectMsgClass(duration, CanCommitTransactionReply.class);
1841 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1842 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1844 new ShardTestKit(getSystem()) {{
1845 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1846 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1847 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1849 waitUntilLeader(shard);
1851 final FiniteDuration duration = duration("5 seconds");
1853 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1855 final TransactionIdentifier transactionID1 = nextTransactionId();
1856 final MutableCompositeModification modification1 = new MutableCompositeModification();
1857 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1858 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1860 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1861 expectMsgClass(duration, ReadyTransactionReply.class);
1863 // CanCommit the first one so it's the current in-progress CohortEntry.
1865 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1866 expectMsgClass(duration, CanCommitTransactionReply.class);
1868 // Ready the second Tx.
1870 final TransactionIdentifier transactionID2 = nextTransactionId();
1871 final MutableCompositeModification modification2 = new MutableCompositeModification();
1872 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1873 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1875 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1876 expectMsgClass(duration, ReadyTransactionReply.class);
1878 // Ready the third Tx.
1880 final TransactionIdentifier transactionID3 = nextTransactionId();
1881 final DataTreeModification modification3 = dataStore.newModification();
1882 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1883 .apply(modification3);
1884 modification3.ready();
1885 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1887 shard.tell(readyMessage, getRef());
1889 // Commit the first Tx. After completing, the second should expire from the queue and the third
1892 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1893 expectMsgClass(duration, CommitTransactionReply.class);
1895 // Expect commit reply from the third Tx.
1897 expectMsgClass(duration, CommitTransactionReply.class);
1899 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1900 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1905 public void testCanCommitBeforeReadyFailure() throws Throwable {
1906 new ShardTestKit(getSystem()) {{
1907 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1908 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1909 "testCanCommitBeforeReadyFailure");
1911 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1912 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1917 public void testAbortCurrentTransaction() throws Throwable {
1918 testAbortCurrentTransaction(true);
1919 testAbortCurrentTransaction(false);
1922 private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
1923 new ShardTestKit(getSystem()) {{
1924 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1925 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1926 "testAbortCurrentTransaction-" + readWrite);
1928 waitUntilLeader(shard);
1930 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1932 final TransactionIdentifier transactionID1 = nextTransactionId();
1933 final MutableCompositeModification modification1 = new MutableCompositeModification();
1934 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1935 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1936 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1938 final TransactionIdentifier transactionID2 = nextTransactionId();
1939 final MutableCompositeModification modification2 = new MutableCompositeModification();
1940 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1941 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1943 final FiniteDuration duration = duration("5 seconds");
1944 final Timeout timeout = new Timeout(duration);
1946 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1947 expectMsgClass(duration, ReadyTransactionReply.class);
1949 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1950 expectMsgClass(duration, ReadyTransactionReply.class);
1952 // Send the CanCommitTransaction message for the first Tx.
1954 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1955 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1956 expectMsgClass(duration, CanCommitTransactionReply.class));
1957 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1959 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1960 // processed after the first Tx completes.
1962 final Future<Object> canCommitFuture = Patterns.ask(shard,
1963 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1965 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1968 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1969 expectMsgClass(duration, AbortTransactionReply.class);
1971 // Wait for the 2nd Tx to complete the canCommit phase.
1973 Await.ready(canCommitFuture, duration);
1975 final InOrder inOrder = inOrder(cohort1, cohort2);
1976 inOrder.verify(cohort1).canCommit();
1977 inOrder.verify(cohort2).canCommit();
1982 public void testAbortQueuedTransaction() throws Throwable {
1983 testAbortQueuedTransaction(true);
1984 testAbortQueuedTransaction(false);
1987 private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
1988 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1989 new ShardTestKit(getSystem()) {{
1990 final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
1991 @SuppressWarnings("serial")
1992 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1994 public void handleCommand(final Object message) {
1995 super.handleCommand(message);
1996 if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
1997 if(cleaupCheckLatch.get() != null) {
1998 cleaupCheckLatch.get().countDown();
2004 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2005 Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2006 Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2008 waitUntilLeader(shard);
2010 final TransactionIdentifier transactionID = nextTransactionId();
2011 final MutableCompositeModification modification = new MutableCompositeModification();
2012 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2013 doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2015 final FiniteDuration duration = duration("5 seconds");
2019 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2020 expectMsgClass(duration, ReadyTransactionReply.class);
2022 assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2024 // Send the AbortTransaction message.
2026 shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2027 expectMsgClass(duration, AbortTransactionReply.class);
2029 verify(cohort).abort();
2031 // Verify the tx cohort is removed from queue at the cleanup check interval.
2033 cleaupCheckLatch.set(new CountDownLatch(1));
2034 assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2035 cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2037 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2039 // Now send CanCommitTransaction - should fail.
2041 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2043 Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2044 assertTrue("Failure type", failure instanceof IllegalStateException);
2049 public void testCreateSnapshot() throws Exception {
2050 testCreateSnapshot(true, "testCreateSnapshot");
2054 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2055 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2058 @SuppressWarnings("serial")
2059 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2061 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2063 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2064 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2065 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2070 public void saveSnapshot(final Object o) {
2071 savedSnapshot.set(o);
2072 super.saveSnapshot(o);
2076 dataStoreContextBuilder.persistent(persistent);
2078 new ShardTestKit(getSystem()) {{
2079 class TestShard extends Shard {
2081 protected TestShard(AbstractBuilder<?, ?> builder) {
2083 setPersistence(new TestPersistentDataProvider(super.persistence()));
2087 public void handleCommand(final Object message) {
2088 super.handleCommand(message);
2090 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
2091 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
2092 latch.get().countDown();
2097 public RaftActorContext getRaftActorContext() {
2098 return super.getRaftActorContext();
2102 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
2104 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2105 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2107 waitUntilLeader(shard);
2108 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2110 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
2112 // Trigger creation of a snapshot by ensuring
2113 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2114 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2115 awaitAndValidateSnapshot(expectedRoot);
2117 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2118 awaitAndValidateSnapshot(expectedRoot);
2121 private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
2122 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2124 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2125 savedSnapshot.get() instanceof Snapshot);
2127 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2129 latch.set(new CountDownLatch(1));
2130 savedSnapshot.set(null);
2133 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2135 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2136 assertEquals("Root node", expectedRoot, actual);
2142 * This test simply verifies that the applySnapShot logic will work
2143 * @throws ReadFailedException
2144 * @throws DataValidationFailedException
2147 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2148 final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2149 store.setSchemaContext(SCHEMA_CONTEXT);
2151 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2152 putTransaction.write(TestModel.TEST_PATH,
2153 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2154 commitTransaction(store, putTransaction);
2157 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
2159 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2161 writeTransaction.delete(YangInstanceIdentifier.EMPTY);
2162 writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
2164 commitTransaction(store, writeTransaction);
2166 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
2168 assertEquals(expected, actual);
2172 public void testRecoveryApplicable(){
2174 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2175 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2177 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2178 schemaContext(SCHEMA_CONTEXT).props();
2180 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2181 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2183 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2184 schemaContext(SCHEMA_CONTEXT).props();
2186 new ShardTestKit(getSystem()) {{
2187 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2189 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2191 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2193 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2198 public void testOnDatastoreContext() {
2199 new ShardTestKit(getSystem()) {{
2200 dataStoreContextBuilder.persistent(true);
2202 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
2204 assertEquals("isRecoveryApplicable", true,
2205 shard.underlyingActor().persistence().isRecoveryApplicable());
2207 waitUntilLeader(shard);
2209 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2211 assertEquals("isRecoveryApplicable", false,
2212 shard.underlyingActor().persistence().isRecoveryApplicable());
2214 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2216 assertEquals("isRecoveryApplicable", true,
2217 shard.underlyingActor().persistence().isRecoveryApplicable());
2222 public void testRegisterRoleChangeListener() throws Exception {
2223 new ShardTestKit(getSystem()) {
2225 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2226 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2227 "testRegisterRoleChangeListener");
2229 waitUntilLeader(shard);
2231 final TestActorRef<MessageCollectorActor> listener =
2232 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2234 shard.tell(new RegisterRoleChangeListener(), listener);
2236 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2238 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2239 ShardLeaderStateChanged.class);
2240 assertEquals("getLocalShardDataTree present", true,
2241 leaderStateChanged.getLocalShardDataTree().isPresent());
2242 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2243 leaderStateChanged.getLocalShardDataTree().get());
2245 MessageCollectorActor.clearMessages(listener);
2247 // Force a leader change
2249 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2251 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2252 ShardLeaderStateChanged.class);
2253 assertEquals("getLocalShardDataTree present", false,
2254 leaderStateChanged.getLocalShardDataTree().isPresent());
2260 public void testFollowerInitialSyncStatus() throws Exception {
2261 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2262 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2263 "testFollowerInitialSyncStatus");
2265 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2267 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2269 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2271 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2275 public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2276 new ShardTestKit(getSystem()) {{
2277 String testName = "testClusteredDataChangeListenerDelayedRegistration";
2278 dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2279 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2281 final MockDataChangeListener listener = new MockDataChangeListener(1);
2282 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2283 actorFactory.generateActorId(testName + "-DataChangeListener"));
2285 setupInMemorySnapshotStore();
2287 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2288 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2289 actorFactory.generateActorId(testName + "-shard"));
2291 waitUntilNoLeader(shard);
2293 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2295 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2296 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2297 RegisterChangeListenerReply.class);
2298 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2300 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2301 customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2303 listener.waitForChangeEvents();
2308 public void testClusteredDataChangeListenerRegistration() throws Exception {
2309 new ShardTestKit(getSystem()) {{
2310 String testName = "testClusteredDataChangeListenerRegistration";
2311 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2312 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2314 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2315 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2317 final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2318 Shard.builder().id(followerShardID).
2319 datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2320 peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2321 "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2322 withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2324 final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2325 Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2326 peerAddresses(Collections.singletonMap(followerShardID.toString(),
2327 "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2328 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2330 leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
2331 String leaderPath = waitUntilLeader(followerShard);
2332 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2334 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2335 final MockDataChangeListener listener = new MockDataChangeListener(1);
2336 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2337 actorFactory.generateActorId(testName + "-DataChangeListener"));
2339 followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2340 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2341 RegisterChangeListenerReply.class);
2342 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2344 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2346 listener.waitForChangeEvents();
2351 public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2352 new ShardTestKit(getSystem()) {{
2353 String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2354 dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2355 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2357 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2358 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2359 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2361 setupInMemorySnapshotStore();
2363 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2364 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2365 actorFactory.generateActorId(testName + "-shard"));
2367 waitUntilNoLeader(shard);
2369 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2371 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2372 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2373 RegisterDataTreeChangeListenerReply.class);
2374 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2376 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2377 customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2379 listener.waitForChangeEvents();
2384 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2385 new ShardTestKit(getSystem()) {{
2386 String testName = "testClusteredDataTreeChangeListenerRegistration";
2387 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2388 MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2390 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2391 MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2393 final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2394 Shard.builder().id(followerShardID).
2395 datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2396 peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2397 "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2398 withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2400 final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2401 Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2402 peerAddresses(Collections.singletonMap(followerShardID.toString(),
2403 "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2404 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2406 leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
2407 String leaderPath = waitUntilLeader(followerShard);
2408 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2410 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2411 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2412 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2413 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2415 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2416 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2417 RegisterDataTreeChangeListenerReply.class);
2418 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2420 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2422 listener.waitForChangeEvents();
2427 public void testServerRemoved() throws Exception {
2428 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2430 final ActorRef shard = parent.underlyingActor().context().actorOf(
2431 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2432 "testServerRemoved");
2434 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2436 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);