2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.inOrder;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.reset;
20 import static org.mockito.Mockito.verify;
21 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Function;
35 import com.google.common.util.concurrent.Futures;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import java.util.Collections;
39 import java.util.HashSet;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
68 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
72 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
73 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
74 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
75 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
76 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
77 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
78 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
79 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
80 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
82 import org.opendaylight.controller.cluster.raft.RaftActorContext;
83 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
85 import org.opendaylight.controller.cluster.raft.Snapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
87 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
88 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
89 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
91 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
92 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
93 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
94 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
95 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
96 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
97 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
98 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
99 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
100 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
112 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
113 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
114 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
115 import scala.concurrent.Await;
116 import scala.concurrent.Future;
117 import scala.concurrent.duration.FiniteDuration;
119 public class ShardTest extends AbstractShardTest {
120 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
123 public void testRegisterChangeListener() throws Exception {
124 new ShardTestKit(getSystem()) {{
125 final TestActorRef<Shard> shard = actorFactory.createTestActor(
126 newShardProps(), "testRegisterChangeListener");
128 waitUntilLeader(shard);
130 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
132 final MockDataChangeListener listener = new MockDataChangeListener(1);
133 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
134 "testRegisterChangeListener-DataChangeListener");
136 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
137 dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
139 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
140 RegisterChangeListenerReply.class);
141 final String replyPath = reply.getListenerRegistrationPath().toString();
142 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
143 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
145 final YangInstanceIdentifier path = TestModel.TEST_PATH;
146 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
148 listener.waitForChangeEvents(path);
152 @SuppressWarnings("serial")
154 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
155 // This test tests the timing window in which a change listener is registered before the
156 // shard becomes the leader. We verify that the listener is registered and notified of the
157 // existing data when the shard becomes the leader.
158 new ShardTestKit(getSystem()) {{
159 // For this test, we want to send the RegisterChangeListener message after the shard
160 // has recovered from persistence and before it becomes the leader. So we subclass
161 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
162 // we know that the shard has been initialized to a follower and has started the
163 // election process. The following 2 CountDownLatches are used to coordinate the
164 // ElectionTimeout with the sending of the RegisterChangeListener message.
165 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
166 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
167 final Creator<Shard> creator = new Creator<Shard>() {
168 boolean firstElectionTimeout = true;
171 public Shard create() throws Exception {
172 // Use a non persistent provider because this test actually invokes persist on the journal
173 // this will cause all other messages to not be queued properly after that.
174 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
175 // it does do a persist)
176 return new Shard(newShardBuilder()) {
178 public void handleCommand(final Object message) {
179 if(message instanceof ElectionTimeout && firstElectionTimeout) {
180 // Got the first ElectionTimeout. We don't forward it to the
181 // base Shard yet until we've sent the RegisterChangeListener
182 // message. So we signal the onFirstElectionTimeout latch to tell
183 // the main thread to send the RegisterChangeListener message and
184 // start a thread to wait on the onChangeListenerRegistered latch,
185 // which the main thread signals after it has sent the message.
186 // After the onChangeListenerRegistered is triggered, we send the
187 // original ElectionTimeout message to proceed with the election.
188 firstElectionTimeout = false;
189 final ActorRef self = getSelf();
193 Uninterruptibles.awaitUninterruptibly(
194 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
195 self.tell(message, self);
199 onFirstElectionTimeout.countDown();
201 super.handleCommand(message);
208 setupInMemorySnapshotStore();
210 final MockDataChangeListener listener = new MockDataChangeListener(1);
211 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
212 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
214 final TestActorRef<Shard> shard = actorFactory.createTestActor(
215 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
216 "testRegisterChangeListenerWhenNotLeaderInitially");
218 final YangInstanceIdentifier path = TestModel.TEST_PATH;
220 // Wait until the shard receives the first ElectionTimeout message.
221 assertEquals("Got first ElectionTimeout", true,
222 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
224 // Now send the RegisterChangeListener and wait for the reply.
225 shard.tell(new RegisterChangeListener(path, dclActor,
226 AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
228 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
229 RegisterChangeListenerReply.class);
230 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
232 // Sanity check - verify the shard is not the leader yet.
233 shard.tell(FindLeader.INSTANCE, getRef());
234 final FindLeaderReply findLeadeReply =
235 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
236 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
238 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
239 // with the election process.
240 onChangeListenerRegistered.countDown();
242 // Wait for the shard to become the leader and notify our listener with the existing
243 // data in the store.
244 listener.waitForChangeEvents(path);
249 public void testRegisterDataTreeChangeListener() throws Exception {
250 new ShardTestKit(getSystem()) {{
251 final TestActorRef<Shard> shard = actorFactory.createTestActor(
252 newShardProps(), "testRegisterDataTreeChangeListener");
254 waitUntilLeader(shard);
256 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
258 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
259 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
260 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
262 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
264 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
265 RegisterDataTreeChangeListenerReply.class);
266 final String replyPath = reply.getListenerRegistrationPath().toString();
267 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
268 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
270 final YangInstanceIdentifier path = TestModel.TEST_PATH;
271 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
273 listener.waitForChangeEvents();
277 @SuppressWarnings("serial")
279 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
280 new ShardTestKit(getSystem()) {{
281 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
282 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
283 final Creator<Shard> creator = new Creator<Shard>() {
284 boolean firstElectionTimeout = true;
287 public Shard create() throws Exception {
288 return new Shard(newShardBuilder()) {
290 public void handleCommand(final Object message) {
291 if(message instanceof ElectionTimeout && firstElectionTimeout) {
292 firstElectionTimeout = false;
293 final ActorRef self = getSelf();
297 Uninterruptibles.awaitUninterruptibly(
298 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
299 self.tell(message, self);
303 onFirstElectionTimeout.countDown();
305 super.handleCommand(message);
312 setupInMemorySnapshotStore();
314 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
315 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
316 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
318 final TestActorRef<Shard> shard = actorFactory.createTestActor(
319 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
320 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
322 final YangInstanceIdentifier path = TestModel.TEST_PATH;
324 assertEquals("Got first ElectionTimeout", true,
325 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
327 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
328 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
329 RegisterDataTreeChangeListenerReply.class);
330 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
332 shard.tell(FindLeader.INSTANCE, getRef());
333 final FindLeaderReply findLeadeReply =
334 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
335 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
338 onChangeListenerRegistered.countDown();
340 // TODO: investigate why we do not receive data chage events
341 listener.waitForChangeEvents();
346 public void testCreateTransaction(){
347 new ShardTestKit(getSystem()) {{
348 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
350 waitUntilLeader(shard);
352 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
354 shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
355 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
357 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
358 CreateTransactionReply.class);
360 final String path = reply.getTransactionPath().toString();
361 assertTrue("Unexpected transaction path " + path,
362 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
367 public void testCreateTransactionOnChain(){
368 new ShardTestKit(getSystem()) {{
369 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
371 waitUntilLeader(shard);
373 shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
374 DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
376 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
377 CreateTransactionReply.class);
379 final String path = reply.getTransactionPath().toString();
380 assertTrue("Unexpected transaction path " + path,
381 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
385 @SuppressWarnings("serial")
387 public void testPeerAddressResolved() throws Exception {
388 new ShardTestKit(getSystem()) {{
389 final CountDownLatch recoveryComplete = new CountDownLatch(1);
390 class TestShard extends Shard {
392 super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
393 peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
394 schemaContext(SCHEMA_CONTEXT));
397 String getPeerAddress(String id) {
398 return getRaftActorContext().getPeerAddress(id);
402 protected void onRecoveryComplete() {
404 super.onRecoveryComplete();
406 recoveryComplete.countDown();
411 final TestActorRef<Shard> shard = actorFactory.createTestActor(
412 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
414 public TestShard create() throws Exception {
415 return new TestShard();
417 })), "testPeerAddressResolved");
419 assertEquals("Recovery complete", true,
420 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
422 final String address = "akka://foobar";
423 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
425 assertEquals("getPeerAddress", address,
426 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
431 public void testApplySnapshot() throws Exception {
433 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
435 ShardTestKit.waitUntilLeader(shard);
437 final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
438 store.setSchemaContext(SCHEMA_CONTEXT);
440 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
441 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
442 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
443 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
445 writeToStore(store, TestModel.TEST_PATH, container);
447 final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
448 final NormalizedNode<?,?> expected = readStore(store, root);
450 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
451 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
453 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
455 final NormalizedNode<?,?> actual = readStore(shard, root);
457 assertEquals("Root node", expected, actual);
461 public void testApplyState() throws Exception {
462 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
464 ShardTestKit.waitUntilLeader(shard);
466 final DataTree source = setupInMemorySnapshotStore();
467 final DataTreeModification writeMod = source.takeSnapshot().newModification();
468 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
469 writeMod.write(TestModel.TEST_PATH, node);
472 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
473 payloadForModification(source, writeMod)));
475 shard.underlyingActor().onReceiveCommand(applyState);
477 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
478 assertEquals("Applied state", node, actual);
482 public void testDataTreeCandidateRecovery() throws Exception {
483 // Set up the InMemorySnapshotStore.
484 final DataTree source = setupInMemorySnapshotStore();
486 final DataTreeModification writeMod = source.takeSnapshot().newModification();
487 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
489 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
491 // Set up the InMemoryJournal.
492 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
494 final int nListEntries = 16;
495 final Set<Integer> listEntryKeys = new HashSet<>();
497 // Add some ModificationPayload entries
498 for (int i = 1; i <= nListEntries; i++) {
499 listEntryKeys.add(Integer.valueOf(i));
501 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
502 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
504 final DataTreeModification mod = source.takeSnapshot().newModification();
505 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
507 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
508 payloadForModification(source, mod)));
511 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
512 new ApplyJournalEntries(nListEntries));
514 testRecovery(listEntryKeys);
518 public void testConcurrentThreePhaseCommits() throws Throwable {
519 new ShardTestKit(getSystem()) {{
520 final TestActorRef<Shard> shard = actorFactory.createTestActor(
521 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
522 "testConcurrentThreePhaseCommits");
524 waitUntilLeader(shard);
526 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
528 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
530 final String transactionID1 = "tx1";
531 final MutableCompositeModification modification1 = new MutableCompositeModification();
532 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
533 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
535 final String transactionID2 = "tx2";
536 final MutableCompositeModification modification2 = new MutableCompositeModification();
537 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
538 TestModel.OUTER_LIST_PATH,
539 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
542 final String transactionID3 = "tx3";
543 final MutableCompositeModification modification3 = new MutableCompositeModification();
544 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
545 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
546 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
547 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
550 final long timeoutSec = 5;
551 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
552 final Timeout timeout = new Timeout(duration);
554 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
555 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
556 expectMsgClass(duration, ReadyTransactionReply.class));
557 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
559 // Send the CanCommitTransaction message for the first Tx.
561 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
562 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
563 expectMsgClass(duration, CanCommitTransactionReply.class));
564 assertEquals("Can commit", true, canCommitReply.getCanCommit());
566 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
567 expectMsgClass(duration, ReadyTransactionReply.class);
569 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
570 expectMsgClass(duration, ReadyTransactionReply.class);
572 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
573 // processed after the first Tx completes.
575 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
576 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
578 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
579 new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
581 // Send the CommitTransaction message for the first Tx. After it completes, it should
582 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
584 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
585 expectMsgClass(duration, CommitTransactionReply.class);
587 // Wait for the next 2 Tx's to complete.
589 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
590 final CountDownLatch commitLatch = new CountDownLatch(2);
592 class OnFutureComplete extends OnComplete<Object> {
593 private final Class<?> expRespType;
595 OnFutureComplete(final Class<?> expRespType) {
596 this.expRespType = expRespType;
600 public void onComplete(final Throwable error, final Object resp) {
602 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
605 assertEquals("Commit response type", expRespType, resp.getClass());
607 } catch (final Exception e) {
613 void onSuccess(final Object resp) throws Exception {
617 class OnCommitFutureComplete extends OnFutureComplete {
618 OnCommitFutureComplete() {
619 super(CommitTransactionReply.class);
623 public void onComplete(final Throwable error, final Object resp) {
624 super.onComplete(error, resp);
625 commitLatch.countDown();
629 class OnCanCommitFutureComplete extends OnFutureComplete {
630 private final String transactionID;
632 OnCanCommitFutureComplete(final String transactionID) {
633 super(CanCommitTransactionReply.class);
634 this.transactionID = transactionID;
638 void onSuccess(final Object resp) throws Exception {
639 final CanCommitTransactionReply canCommitReply =
640 CanCommitTransactionReply.fromSerializable(resp);
641 assertEquals("Can commit", true, canCommitReply.getCanCommit());
643 final Future<Object> commitFuture = Patterns.ask(shard,
644 new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
645 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
649 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
650 getSystem().dispatcher());
652 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
653 getSystem().dispatcher());
655 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
657 if(caughtEx.get() != null) {
658 throw caughtEx.get();
661 assertEquals("Commits complete", true, done);
663 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
664 inOrder.verify(cohort1).canCommit();
665 inOrder.verify(cohort1).preCommit();
666 inOrder.verify(cohort1).commit();
667 inOrder.verify(cohort2).canCommit();
668 inOrder.verify(cohort2).preCommit();
669 inOrder.verify(cohort2).commit();
670 inOrder.verify(cohort3).canCommit();
671 inOrder.verify(cohort3).preCommit();
672 inOrder.verify(cohort3).commit();
674 // Verify data in the data store.
676 verifyOuterListEntry(shard, 1);
678 verifyLastApplied(shard, 2);
683 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
684 new ShardTestKit(getSystem()) {{
685 final TestActorRef<Shard> shard = actorFactory.createTestActor(
686 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
687 "testBatchedModificationsWithNoCommitOnReady");
689 waitUntilLeader(shard);
691 final String transactionID = "tx";
692 final FiniteDuration duration = duration("5 seconds");
694 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
695 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
697 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
698 if(mockCohort.get() == null) {
699 mockCohort.set(createDelegatingMockCohort("cohort", actual));
702 return mockCohort.get();
706 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
708 // Send a BatchedModifications to start a transaction.
710 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
711 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
712 expectMsgClass(duration, BatchedModificationsReply.class);
714 // Send a couple more BatchedModifications.
716 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
717 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
718 expectMsgClass(duration, BatchedModificationsReply.class);
720 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
721 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
722 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
723 expectMsgClass(duration, ReadyTransactionReply.class);
725 // Send the CanCommitTransaction message.
727 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
728 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
729 expectMsgClass(duration, CanCommitTransactionReply.class));
730 assertEquals("Can commit", true, canCommitReply.getCanCommit());
732 // Send the CanCommitTransaction message.
734 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
735 expectMsgClass(duration, CommitTransactionReply.class);
737 final InOrder inOrder = inOrder(mockCohort.get());
738 inOrder.verify(mockCohort.get()).canCommit();
739 inOrder.verify(mockCohort.get()).preCommit();
740 inOrder.verify(mockCohort.get()).commit();
742 // Verify data in the data store.
744 verifyOuterListEntry(shard, 1);
749 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
750 new ShardTestKit(getSystem()) {{
751 final TestActorRef<Shard> shard = actorFactory.createTestActor(
752 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
753 "testBatchedModificationsWithCommitOnReady");
755 waitUntilLeader(shard);
757 final String transactionID = "tx";
758 final FiniteDuration duration = duration("5 seconds");
760 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
761 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
763 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
764 if(mockCohort.get() == null) {
765 mockCohort.set(createDelegatingMockCohort("cohort", actual));
768 return mockCohort.get();
772 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
774 // Send a BatchedModifications to start a transaction.
776 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
777 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
778 expectMsgClass(duration, BatchedModificationsReply.class);
780 // Send a couple more BatchedModifications.
782 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
783 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
784 expectMsgClass(duration, BatchedModificationsReply.class);
786 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
787 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
788 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
790 expectMsgClass(duration, CommitTransactionReply.class);
792 final InOrder inOrder = inOrder(mockCohort.get());
793 inOrder.verify(mockCohort.get()).canCommit();
794 inOrder.verify(mockCohort.get()).preCommit();
795 inOrder.verify(mockCohort.get()).commit();
797 // Verify data in the data store.
799 verifyOuterListEntry(shard, 1);
803 @Test(expected=IllegalStateException.class)
804 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
805 new ShardTestKit(getSystem()) {{
806 final TestActorRef<Shard> shard = actorFactory.createTestActor(
807 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
808 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
810 waitUntilLeader(shard);
812 final String transactionID = "tx1";
813 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
814 batched.setReady(true);
815 batched.setTotalMessagesSent(2);
817 shard.tell(batched, getRef());
819 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
821 if(failure != null) {
822 throw failure.cause();
828 public void testBatchedModificationsWithOperationFailure() throws Throwable {
829 new ShardTestKit(getSystem()) {{
830 final TestActorRef<Shard> shard = actorFactory.createTestActor(
831 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
832 "testBatchedModificationsWithOperationFailure");
834 waitUntilLeader(shard);
836 // Test merge with invalid data. An exception should occur when the merge is applied. Note that
837 // write will not validate the children for performance reasons.
839 String transactionID = "tx1";
841 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
842 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
843 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
845 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
846 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
847 shard.tell(batched, getRef());
848 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
850 Throwable cause = failure.cause();
852 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
853 batched.setReady(true);
854 batched.setTotalMessagesSent(2);
856 shard.tell(batched, getRef());
858 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
859 assertEquals("Failure cause", cause, failure.cause());
864 public void testBatchedModificationsOnTransactionChain() throws Throwable {
865 new ShardTestKit(getSystem()) {{
866 final TestActorRef<Shard> shard = actorFactory.createTestActor(
867 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
868 "testBatchedModificationsOnTransactionChain");
870 waitUntilLeader(shard);
872 final String transactionChainID = "txChain";
873 final String transactionID1 = "tx1";
874 final String transactionID2 = "tx2";
876 final FiniteDuration duration = duration("5 seconds");
878 // Send a BatchedModifications to start a chained write transaction and ready it.
880 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
881 final YangInstanceIdentifier path = TestModel.TEST_PATH;
882 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
883 containerNode, true, false, 1), getRef());
884 expectMsgClass(duration, ReadyTransactionReply.class);
886 // Create a read Tx on the same chain.
888 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
889 transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
891 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
893 getSystem().actorSelection(createReply.getTransactionPath()).tell(
894 new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
895 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
896 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
898 // Commit the write transaction.
900 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
901 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
902 expectMsgClass(duration, CanCommitTransactionReply.class));
903 assertEquals("Can commit", true, canCommitReply.getCanCommit());
905 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
906 expectMsgClass(duration, CommitTransactionReply.class);
908 // Verify data in the data store.
910 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
911 assertEquals("Stored node", containerNode, actualNode);
916 public void testOnBatchedModificationsWhenNotLeader() {
917 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
918 new ShardTestKit(getSystem()) {{
919 final Creator<Shard> creator = new Creator<Shard>() {
920 private static final long serialVersionUID = 1L;
923 public Shard create() throws Exception {
924 return new Shard(newShardBuilder()) {
926 protected boolean isLeader() {
927 return overrideLeaderCalls.get() ? false : super.isLeader();
931 protected ActorSelection getLeader() {
932 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
939 final TestActorRef<Shard> shard = actorFactory.createTestActor(
940 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
942 waitUntilLeader(shard);
944 overrideLeaderCalls.set(true);
946 final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
948 shard.tell(batched, ActorRef.noSender());
950 expectMsgEquals(batched);
955 public void testTransactionMessagesWithNoLeader() {
956 new ShardTestKit(getSystem()) {{
957 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
958 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
959 final TestActorRef<Shard> shard = actorFactory.createTestActor(
960 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
961 "testTransactionMessagesWithNoLeader");
963 waitUntilNoLeader(shard);
965 shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
966 Failure failure = expectMsgClass(Failure.class);
967 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
969 shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
970 DataStoreVersions.CURRENT_VERSION, true), getRef());
971 failure = expectMsgClass(Failure.class);
972 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
974 shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
975 failure = expectMsgClass(Failure.class);
976 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
981 public void testReadyWithReadWriteImmediateCommit() throws Exception{
982 testReadyWithImmediateCommit(true);
986 public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
987 testReadyWithImmediateCommit(false);
990 private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
991 new ShardTestKit(getSystem()) {{
992 final TestActorRef<Shard> shard = actorFactory.createTestActor(
993 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
994 "testReadyWithImmediateCommit-" + readWrite);
996 waitUntilLeader(shard);
998 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1000 final String transactionID = "tx1";
1001 final MutableCompositeModification modification = new MutableCompositeModification();
1002 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1003 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1004 TestModel.TEST_PATH, containerNode, modification);
1006 final FiniteDuration duration = duration("5 seconds");
1008 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1010 expectMsgClass(duration, CommitTransactionReply.class);
1012 final InOrder inOrder = inOrder(cohort);
1013 inOrder.verify(cohort).canCommit();
1014 inOrder.verify(cohort).preCommit();
1015 inOrder.verify(cohort).commit();
1017 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1018 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1023 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1024 new ShardTestKit(getSystem()) {{
1025 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1026 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1027 "testReadyLocalTransactionWithImmediateCommit");
1029 waitUntilLeader(shard);
1031 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1033 final DataTreeModification modification = dataStore.newModification();
1035 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1036 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1037 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1038 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1040 final String txId = "tx1";
1041 modification.ready();
1042 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1044 shard.tell(readyMessage, getRef());
1046 expectMsgClass(CommitTransactionReply.class);
1048 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1049 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1054 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1055 new ShardTestKit(getSystem()) {{
1056 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1057 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1058 "testReadyLocalTransactionWithThreePhaseCommit");
1060 waitUntilLeader(shard);
1062 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1064 final DataTreeModification modification = dataStore.newModification();
1066 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1067 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1068 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1069 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1071 final String txId = "tx1";
1072 modification.ready();
1073 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1075 shard.tell(readyMessage, getRef());
1077 expectMsgClass(ReadyTransactionReply.class);
1079 // Send the CanCommitTransaction message.
1081 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1082 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1083 expectMsgClass(CanCommitTransactionReply.class));
1084 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1086 // Send the CanCommitTransaction message.
1088 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1089 expectMsgClass(CommitTransactionReply.class);
1091 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1092 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1097 public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
1098 testCommitWithPersistenceDisabled(true);
1102 public void testWriteOnlyCommitWithPersistenceDisabled() throws Throwable {
1103 testCommitWithPersistenceDisabled(true);
1106 private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1107 dataStoreContextBuilder.persistent(false);
1108 new ShardTestKit(getSystem()) {{
1109 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1110 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1111 "testCommitWithPersistenceDisabled-" + readWrite);
1113 waitUntilLeader(shard);
1115 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1117 // Setup a simulated transactions with a mock cohort.
1119 final String transactionID = "tx";
1120 final MutableCompositeModification modification = new MutableCompositeModification();
1121 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1122 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1123 TestModel.TEST_PATH, containerNode, modification);
1125 final FiniteDuration duration = duration("5 seconds");
1127 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1128 expectMsgClass(duration, ReadyTransactionReply.class);
1130 // Send the CanCommitTransaction message.
1132 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1133 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1134 expectMsgClass(duration, CanCommitTransactionReply.class));
1135 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1137 // Send the CanCommitTransaction message.
1139 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1140 expectMsgClass(duration, CommitTransactionReply.class);
1142 final InOrder inOrder = inOrder(cohort);
1143 inOrder.verify(cohort).canCommit();
1144 inOrder.verify(cohort).preCommit();
1145 inOrder.verify(cohort).commit();
1147 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1148 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1153 public void testReadWriteCommitWhenTransactionHasNoModifications() {
1154 testCommitWhenTransactionHasNoModifications(true);
1158 public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
1159 testCommitWhenTransactionHasNoModifications(false);
1162 private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1163 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1164 // but here that need not happen
1165 new ShardTestKit(getSystem()) {
1167 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1168 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1169 "testCommitWhenTransactionHasNoModifications-" + readWrite);
1171 waitUntilLeader(shard);
1173 final String transactionID = "tx1";
1174 final MutableCompositeModification modification = new MutableCompositeModification();
1175 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1176 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1177 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1178 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1179 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1181 final FiniteDuration duration = duration("5 seconds");
1183 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1184 expectMsgClass(duration, ReadyTransactionReply.class);
1186 // Send the CanCommitTransaction message.
1188 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1189 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1190 expectMsgClass(duration, CanCommitTransactionReply.class));
1191 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1193 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1194 expectMsgClass(duration, CommitTransactionReply.class);
1196 final InOrder inOrder = inOrder(cohort);
1197 inOrder.verify(cohort).canCommit();
1198 inOrder.verify(cohort).preCommit();
1199 inOrder.verify(cohort).commit();
1201 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1202 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1204 // Use MBean for verification
1205 // Committed transaction count should increase as usual
1206 assertEquals(1,shardStats.getCommittedTransactionsCount());
1208 // Commit index should not advance because this does not go into the journal
1209 assertEquals(-1, shardStats.getCommitIndex());
1215 public void testReadWriteCommitWhenTransactionHasModifications() {
1216 testCommitWhenTransactionHasModifications(true);
1220 public void testWriteOnlyCommitWhenTransactionHasModifications() {
1221 testCommitWhenTransactionHasModifications(false);
1224 private void testCommitWhenTransactionHasModifications(final boolean readWrite){
1225 new ShardTestKit(getSystem()) {
1227 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1228 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1229 "testCommitWhenTransactionHasModifications-" + readWrite);
1231 waitUntilLeader(shard);
1233 final String transactionID = "tx1";
1234 final MutableCompositeModification modification = new MutableCompositeModification();
1235 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1236 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1237 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1238 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1239 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1240 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1242 final FiniteDuration duration = duration("5 seconds");
1244 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1245 expectMsgClass(duration, ReadyTransactionReply.class);
1247 // Send the CanCommitTransaction message.
1249 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1250 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1251 expectMsgClass(duration, CanCommitTransactionReply.class));
1252 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1254 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1255 expectMsgClass(duration, CommitTransactionReply.class);
1257 final InOrder inOrder = inOrder(cohort);
1258 inOrder.verify(cohort).canCommit();
1259 inOrder.verify(cohort).preCommit();
1260 inOrder.verify(cohort).commit();
1262 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1263 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1265 // Use MBean for verification
1266 // Committed transaction count should increase as usual
1267 assertEquals(1, shardStats.getCommittedTransactionsCount());
1269 // Commit index should advance as we do not have an empty modification
1270 assertEquals(0, shardStats.getCommitIndex());
1276 public void testCommitPhaseFailure() throws Throwable {
1277 testCommitPhaseFailure(true);
1278 testCommitPhaseFailure(false);
1281 private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1282 new ShardTestKit(getSystem()) {{
1283 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1284 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1285 "testCommitPhaseFailure-" + readWrite);
1287 waitUntilLeader(shard);
1289 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1292 final String transactionID1 = "tx1";
1293 final MutableCompositeModification modification1 = new MutableCompositeModification();
1294 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1295 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1296 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1297 doReturn(Futures.immediateFailedFuture(new RuntimeException("mock"))).when(cohort1).commit();
1298 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1300 final String transactionID2 = "tx2";
1301 final MutableCompositeModification modification2 = new MutableCompositeModification();
1302 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1303 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1305 final FiniteDuration duration = duration("5 seconds");
1306 final Timeout timeout = new Timeout(duration);
1308 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1309 expectMsgClass(duration, ReadyTransactionReply.class);
1311 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1312 expectMsgClass(duration, ReadyTransactionReply.class);
1314 // Send the CanCommitTransaction message for the first Tx.
1316 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1317 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1318 expectMsgClass(duration, CanCommitTransactionReply.class));
1319 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1321 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1322 // processed after the first Tx completes.
1324 final Future<Object> canCommitFuture = Patterns.ask(shard,
1325 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1327 // Send the CommitTransaction message for the first Tx. This should send back an error
1328 // and trigger the 2nd Tx to proceed.
1330 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1331 expectMsgClass(duration, akka.actor.Status.Failure.class);
1333 // Wait for the 2nd Tx to complete the canCommit phase.
1335 final CountDownLatch latch = new CountDownLatch(1);
1336 canCommitFuture.onComplete(new OnComplete<Object>() {
1338 public void onComplete(final Throwable t, final Object resp) {
1341 }, getSystem().dispatcher());
1343 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1345 final InOrder inOrder = inOrder(cohort1, cohort2);
1346 inOrder.verify(cohort1).canCommit();
1347 inOrder.verify(cohort1).preCommit();
1348 inOrder.verify(cohort1).commit();
1349 inOrder.verify(cohort2).canCommit();
1354 public void testPreCommitPhaseFailure() throws Throwable {
1355 testPreCommitPhaseFailure(true);
1356 testPreCommitPhaseFailure(false);
1359 private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1360 new ShardTestKit(getSystem()) {{
1361 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1362 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1363 "testPreCommitPhaseFailure-" + readWrite);
1365 waitUntilLeader(shard);
1367 final String transactionID1 = "tx1";
1368 final MutableCompositeModification modification1 = new MutableCompositeModification();
1369 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1370 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1371 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1373 final String transactionID2 = "tx2";
1374 final MutableCompositeModification modification2 = new MutableCompositeModification();
1375 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1376 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1378 final FiniteDuration duration = duration("5 seconds");
1379 final Timeout timeout = new Timeout(duration);
1381 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1382 expectMsgClass(duration, ReadyTransactionReply.class);
1384 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1385 expectMsgClass(duration, ReadyTransactionReply.class);
1387 // Send the CanCommitTransaction message for the first Tx.
1389 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1390 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1391 expectMsgClass(duration, CanCommitTransactionReply.class));
1392 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1394 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1395 // processed after the first Tx completes.
1397 final Future<Object> canCommitFuture = Patterns.ask(shard,
1398 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1400 // Send the CommitTransaction message for the first Tx. This should send back an error
1401 // and trigger the 2nd Tx to proceed.
1403 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1404 expectMsgClass(duration, akka.actor.Status.Failure.class);
1406 // Wait for the 2nd Tx to complete the canCommit phase.
1408 final CountDownLatch latch = new CountDownLatch(1);
1409 canCommitFuture.onComplete(new OnComplete<Object>() {
1411 public void onComplete(final Throwable t, final Object resp) {
1414 }, getSystem().dispatcher());
1416 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1418 final InOrder inOrder = inOrder(cohort1, cohort2);
1419 inOrder.verify(cohort1).canCommit();
1420 inOrder.verify(cohort1).preCommit();
1421 inOrder.verify(cohort2).canCommit();
1426 public void testCanCommitPhaseFailure() throws Throwable {
1427 testCanCommitPhaseFailure(true);
1428 testCanCommitPhaseFailure(false);
1431 private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1432 new ShardTestKit(getSystem()) {{
1433 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1434 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1435 "testCanCommitPhaseFailure-" + readWrite);
1437 waitUntilLeader(shard);
1439 final FiniteDuration duration = duration("5 seconds");
1441 final String transactionID1 = "tx1";
1442 final MutableCompositeModification modification = new MutableCompositeModification();
1443 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1444 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1446 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1447 expectMsgClass(duration, ReadyTransactionReply.class);
1449 // Send the CanCommitTransaction message.
1451 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1452 expectMsgClass(duration, akka.actor.Status.Failure.class);
1454 // Send another can commit to ensure the failed one got cleaned up.
1458 final String transactionID2 = "tx2";
1459 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1461 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1462 expectMsgClass(duration, ReadyTransactionReply.class);
1464 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1465 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1466 expectMsgClass(CanCommitTransactionReply.class));
1467 assertEquals("getCanCommit", true, reply.getCanCommit());
1472 public void testCanCommitPhaseFalseResponse() throws Throwable {
1473 testCanCommitPhaseFalseResponse(true);
1474 testCanCommitPhaseFalseResponse(false);
1477 private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1478 new ShardTestKit(getSystem()) {{
1479 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1480 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1481 "testCanCommitPhaseFalseResponse-" + readWrite);
1483 waitUntilLeader(shard);
1485 final FiniteDuration duration = duration("5 seconds");
1487 final String transactionID1 = "tx1";
1488 final MutableCompositeModification modification = new MutableCompositeModification();
1489 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1490 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1492 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1493 expectMsgClass(duration, ReadyTransactionReply.class);
1495 // Send the CanCommitTransaction message.
1497 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1498 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1499 expectMsgClass(CanCommitTransactionReply.class));
1500 assertEquals("getCanCommit", false, reply.getCanCommit());
1502 // Send another can commit to ensure the failed one got cleaned up.
1506 final String transactionID2 = "tx2";
1507 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1509 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1510 expectMsgClass(duration, ReadyTransactionReply.class);
1512 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1513 reply = CanCommitTransactionReply.fromSerializable(
1514 expectMsgClass(CanCommitTransactionReply.class));
1515 assertEquals("getCanCommit", true, reply.getCanCommit());
1520 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1521 testImmediateCommitWithCanCommitPhaseFailure(true);
1522 testImmediateCommitWithCanCommitPhaseFailure(false);
1525 private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1526 new ShardTestKit(getSystem()) {{
1527 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1528 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1529 "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1531 waitUntilLeader(shard);
1533 final FiniteDuration duration = duration("5 seconds");
1535 final String transactionID1 = "tx1";
1536 final MutableCompositeModification modification = new MutableCompositeModification();
1537 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1538 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1540 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1542 expectMsgClass(duration, akka.actor.Status.Failure.class);
1544 // Send another can commit to ensure the failed one got cleaned up.
1548 final String transactionID2 = "tx2";
1549 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1550 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1551 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1552 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1553 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1554 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1555 doReturn(candidateRoot).when(candidate).getRootNode();
1556 doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
1557 doReturn(candidate).when(cohort).getCandidate();
1559 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1561 expectMsgClass(duration, CommitTransactionReply.class);
1566 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1567 testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1568 testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1571 private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1572 new ShardTestKit(getSystem()) {{
1573 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1574 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1575 "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1577 waitUntilLeader(shard);
1579 final FiniteDuration duration = duration("5 seconds");
1581 final String transactionID = "tx1";
1582 final MutableCompositeModification modification = new MutableCompositeModification();
1583 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1584 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1586 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1588 expectMsgClass(duration, akka.actor.Status.Failure.class);
1590 // Send another can commit to ensure the failed one got cleaned up.
1594 final String transactionID2 = "tx2";
1595 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1596 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1597 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1598 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1599 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1600 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1601 doReturn(candidateRoot).when(candidate).getRootNode();
1602 doReturn(YangInstanceIdentifier.EMPTY).when(candidate).getRootPath();
1603 doReturn(candidate).when(cohort).getCandidate();
1605 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1607 expectMsgClass(duration, CommitTransactionReply.class);
1612 public void testAbortBeforeFinishCommit() throws Throwable {
1613 testAbortBeforeFinishCommit(true);
1614 testAbortBeforeFinishCommit(false);
1617 private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1618 new ShardTestKit(getSystem()) {{
1619 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1620 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1621 "testAbortBeforeFinishCommit-" + readWrite);
1623 waitUntilLeader(shard);
1625 final FiniteDuration duration = duration("5 seconds");
1626 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1628 final String transactionID = "tx1";
1629 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1630 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1632 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1633 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1635 // Simulate an AbortTransaction message occurring during replication, after
1636 // persisting and before finishing the commit to the in-memory store.
1637 // We have no followers so due to optimizations in the RaftActor, it does not
1638 // attempt replication and thus we can't send an AbortTransaction message b/c
1639 // it would be processed too late after CommitTransaction completes. So we'll
1640 // simulate an AbortTransaction message occurring during replication by calling
1641 // the shard directly.
1643 shard.underlyingActor().doAbortTransaction(transactionID, null);
1645 return preCommitFuture;
1649 final MutableCompositeModification modification = new MutableCompositeModification();
1650 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1651 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1652 modification, preCommit);
1654 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1655 expectMsgClass(duration, ReadyTransactionReply.class);
1657 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1658 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1659 expectMsgClass(duration, CanCommitTransactionReply.class));
1660 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1662 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1663 expectMsgClass(duration, CommitTransactionReply.class);
1665 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1667 // Since we're simulating an abort occurring during replication and before finish commit,
1668 // the data should still get written to the in-memory store since we've gotten past
1669 // canCommit and preCommit and persisted the data.
1670 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1675 public void testTransactionCommitTimeout() throws Throwable {
1676 testTransactionCommitTimeout(true);
1677 testTransactionCommitTimeout(false);
1680 private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1681 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1683 new ShardTestKit(getSystem()) {{
1684 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1685 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1686 "testTransactionCommitTimeout-" + readWrite);
1688 waitUntilLeader(shard);
1690 final FiniteDuration duration = duration("5 seconds");
1692 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1694 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1695 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1696 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1698 // Create 1st Tx - will timeout
1700 final String transactionID1 = "tx1";
1701 final MutableCompositeModification modification1 = new MutableCompositeModification();
1702 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1703 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1704 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1705 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1710 final String transactionID2 = "tx3";
1711 final MutableCompositeModification modification2 = new MutableCompositeModification();
1712 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1713 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1714 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1716 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1721 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1722 expectMsgClass(duration, ReadyTransactionReply.class);
1724 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1725 expectMsgClass(duration, ReadyTransactionReply.class);
1727 // canCommit 1st Tx. We don't send the commit so it should timeout.
1729 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1730 expectMsgClass(duration, CanCommitTransactionReply.class);
1732 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1734 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1735 expectMsgClass(duration, CanCommitTransactionReply.class);
1737 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1739 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1740 expectMsgClass(duration, akka.actor.Status.Failure.class);
1742 // Commit the 2nd Tx.
1744 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1745 expectMsgClass(duration, CommitTransactionReply.class);
1747 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1748 assertNotNull(listNodePath + " not found", node);
1753 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1754 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1756 new ShardTestKit(getSystem()) {{
1757 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1758 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1759 "testTransactionCommitQueueCapacityExceeded");
1761 waitUntilLeader(shard);
1763 final FiniteDuration duration = duration("5 seconds");
1765 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1767 final String transactionID1 = "tx1";
1768 final MutableCompositeModification modification1 = new MutableCompositeModification();
1769 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1770 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1772 final String transactionID2 = "tx2";
1773 final MutableCompositeModification modification2 = new MutableCompositeModification();
1774 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1775 TestModel.OUTER_LIST_PATH,
1776 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1779 final String transactionID3 = "tx3";
1780 final MutableCompositeModification modification3 = new MutableCompositeModification();
1781 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1782 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1786 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1787 expectMsgClass(duration, ReadyTransactionReply.class);
1789 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1790 expectMsgClass(duration, ReadyTransactionReply.class);
1792 // The 3rd Tx should exceed queue capacity and fail.
1794 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1795 expectMsgClass(duration, akka.actor.Status.Failure.class);
1797 // canCommit 1st Tx.
1799 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1800 expectMsgClass(duration, CanCommitTransactionReply.class);
1802 // canCommit the 2nd Tx - it should get queued.
1804 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1806 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1808 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1809 expectMsgClass(duration, akka.actor.Status.Failure.class);
1814 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1815 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1817 new ShardTestKit(getSystem()) {{
1818 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1819 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1820 "testTransactionCommitWithPriorExpiredCohortEntries");
1822 waitUntilLeader(shard);
1824 final FiniteDuration duration = duration("5 seconds");
1826 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1828 final String transactionID1 = "tx1";
1829 final MutableCompositeModification modification1 = new MutableCompositeModification();
1830 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1831 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1833 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1834 expectMsgClass(duration, ReadyTransactionReply.class);
1836 final String transactionID2 = "tx2";
1837 final MutableCompositeModification modification2 = new MutableCompositeModification();
1838 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1839 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1841 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1842 expectMsgClass(duration, ReadyTransactionReply.class);
1844 final String transactionID3 = "tx3";
1845 final MutableCompositeModification modification3 = new MutableCompositeModification();
1846 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1847 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1849 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1850 expectMsgClass(duration, ReadyTransactionReply.class);
1852 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1853 // should expire from the queue and the last one should be processed.
1855 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1856 expectMsgClass(duration, CanCommitTransactionReply.class);
1861 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1862 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1864 new ShardTestKit(getSystem()) {{
1865 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1866 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1867 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1869 waitUntilLeader(shard);
1871 final FiniteDuration duration = duration("5 seconds");
1873 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1875 final String transactionID1 = "tx1";
1876 final MutableCompositeModification modification1 = new MutableCompositeModification();
1877 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1878 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1880 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1881 expectMsgClass(duration, ReadyTransactionReply.class);
1883 // CanCommit the first one so it's the current in-progress CohortEntry.
1885 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1886 expectMsgClass(duration, CanCommitTransactionReply.class);
1888 // Ready the second Tx.
1890 final String transactionID2 = "tx2";
1891 final MutableCompositeModification modification2 = new MutableCompositeModification();
1892 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1893 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1895 shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1896 expectMsgClass(duration, ReadyTransactionReply.class);
1898 // Ready the third Tx.
1900 final String transactionID3 = "tx3";
1901 final DataTreeModification modification3 = dataStore.newModification();
1902 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1903 .apply(modification3);
1904 modification3.ready();
1905 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1907 shard.tell(readyMessage, getRef());
1909 // Commit the first Tx. After completing, the second should expire from the queue and the third
1912 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1913 expectMsgClass(duration, CommitTransactionReply.class);
1915 // Expect commit reply from the third Tx.
1917 expectMsgClass(duration, CommitTransactionReply.class);
1919 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1920 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1925 public void testCanCommitBeforeReadyFailure() throws Throwable {
1926 new ShardTestKit(getSystem()) {{
1927 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1928 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1929 "testCanCommitBeforeReadyFailure");
1931 shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
1932 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1937 public void testAbortCurrentTransaction() throws Throwable {
1938 testAbortCurrentTransaction(true);
1939 testAbortCurrentTransaction(false);
1942 private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
1943 new ShardTestKit(getSystem()) {{
1944 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1945 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1946 "testAbortCurrentTransaction-" + readWrite);
1948 waitUntilLeader(shard);
1950 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1952 final String transactionID1 = "tx1";
1953 final MutableCompositeModification modification1 = new MutableCompositeModification();
1954 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1955 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1956 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1958 final String transactionID2 = "tx2";
1959 final MutableCompositeModification modification2 = new MutableCompositeModification();
1960 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1961 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1963 final FiniteDuration duration = duration("5 seconds");
1964 final Timeout timeout = new Timeout(duration);
1966 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1967 expectMsgClass(duration, ReadyTransactionReply.class);
1969 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1970 expectMsgClass(duration, ReadyTransactionReply.class);
1972 // Send the CanCommitTransaction message for the first Tx.
1974 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1975 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1976 expectMsgClass(duration, CanCommitTransactionReply.class));
1977 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1979 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1980 // processed after the first Tx completes.
1982 final Future<Object> canCommitFuture = Patterns.ask(shard,
1983 new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1985 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1988 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1989 expectMsgClass(duration, AbortTransactionReply.class);
1991 // Wait for the 2nd Tx to complete the canCommit phase.
1993 Await.ready(canCommitFuture, duration);
1995 final InOrder inOrder = inOrder(cohort1, cohort2);
1996 inOrder.verify(cohort1).canCommit();
1997 inOrder.verify(cohort2).canCommit();
2002 public void testAbortQueuedTransaction() throws Throwable {
2003 testAbortQueuedTransaction(true);
2004 testAbortQueuedTransaction(false);
2007 private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
2008 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2009 new ShardTestKit(getSystem()) {{
2010 final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2011 @SuppressWarnings("serial")
2012 final Creator<Shard> creator = new Creator<Shard>() {
2014 public Shard create() throws Exception {
2015 return new Shard(newShardBuilder()) {
2017 public void handleCommand(final Object message) {
2018 super.handleCommand(message);
2019 if(TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
2020 if(cleaupCheckLatch.get() != null) {
2021 cleaupCheckLatch.get().countDown();
2029 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2030 Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2031 Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2033 waitUntilLeader(shard);
2035 final String transactionID = "tx1";
2037 final MutableCompositeModification modification = new MutableCompositeModification();
2038 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2039 doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2041 final FiniteDuration duration = duration("5 seconds");
2045 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2046 expectMsgClass(duration, ReadyTransactionReply.class);
2048 assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2050 // Send the AbortTransaction message.
2052 shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2053 expectMsgClass(duration, AbortTransactionReply.class);
2055 verify(cohort).abort();
2057 // Verify the tx cohort is removed from queue at the cleanup check interval.
2059 cleaupCheckLatch.set(new CountDownLatch(1));
2060 assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2061 cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2063 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2065 // Now send CanCommitTransaction - should fail.
2067 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2069 Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2070 assertTrue("Failure type", failure instanceof IllegalStateException);
2075 public void testCreateSnapshot() throws Exception {
2076 testCreateSnapshot(true, "testCreateSnapshot");
2080 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2081 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2084 @SuppressWarnings("serial")
2085 private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2087 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2089 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2090 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2091 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2096 public void saveSnapshot(final Object o) {
2097 savedSnapshot.set(o);
2098 super.saveSnapshot(o);
2102 dataStoreContextBuilder.persistent(persistent);
2104 new ShardTestKit(getSystem()) {{
2105 class TestShard extends Shard {
2107 protected TestShard(AbstractBuilder<?, ?> builder) {
2109 setPersistence(new TestPersistentDataProvider(super.persistence()));
2113 public void handleCommand(final Object message) {
2114 super.handleCommand(message);
2116 // XXX: commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
2117 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
2118 latch.get().countDown();
2123 public RaftActorContext getRaftActorContext() {
2124 return super.getRaftActorContext();
2128 final Creator<Shard> creator = new Creator<Shard>() {
2130 public Shard create() throws Exception {
2131 return new TestShard(newShardBuilder());
2135 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2136 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2138 waitUntilLeader(shard);
2139 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2141 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2143 // Trigger creation of a snapshot by ensuring
2144 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2145 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2146 awaitAndValidateSnapshot(expectedRoot);
2148 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2149 awaitAndValidateSnapshot(expectedRoot);
2152 private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
2153 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2155 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2156 savedSnapshot.get() instanceof Snapshot);
2158 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2160 latch.set(new CountDownLatch(1));
2161 savedSnapshot.set(null);
2164 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2166 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2167 assertEquals("Root node", expectedRoot, actual);
2173 * This test simply verifies that the applySnapShot logic will work
2174 * @throws ReadFailedException
2175 * @throws DataValidationFailedException
2178 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2179 final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2180 store.setSchemaContext(SCHEMA_CONTEXT);
2182 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2183 putTransaction.write(TestModel.TEST_PATH,
2184 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2185 commitTransaction(store, putTransaction);
2188 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2190 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2192 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2193 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2195 commitTransaction(store, writeTransaction);
2197 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2199 assertEquals(expected, actual);
2203 public void testRecoveryApplicable(){
2205 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2206 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2208 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2209 schemaContext(SCHEMA_CONTEXT).props();
2211 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2212 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2214 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2215 schemaContext(SCHEMA_CONTEXT).props();
2217 new ShardTestKit(getSystem()) {{
2218 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2220 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2222 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2224 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2229 public void testOnDatastoreContext() {
2230 new ShardTestKit(getSystem()) {{
2231 dataStoreContextBuilder.persistent(true);
2233 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
2235 assertEquals("isRecoveryApplicable", true,
2236 shard.underlyingActor().persistence().isRecoveryApplicable());
2238 waitUntilLeader(shard);
2240 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2242 assertEquals("isRecoveryApplicable", false,
2243 shard.underlyingActor().persistence().isRecoveryApplicable());
2245 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2247 assertEquals("isRecoveryApplicable", true,
2248 shard.underlyingActor().persistence().isRecoveryApplicable());
2253 public void testRegisterRoleChangeListener() throws Exception {
2254 new ShardTestKit(getSystem()) {
2256 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2257 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2258 "testRegisterRoleChangeListener");
2260 waitUntilLeader(shard);
2262 final TestActorRef<MessageCollectorActor> listener =
2263 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2265 shard.tell(new RegisterRoleChangeListener(), listener);
2267 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2269 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2270 ShardLeaderStateChanged.class);
2271 assertEquals("getLocalShardDataTree present", true,
2272 leaderStateChanged.getLocalShardDataTree().isPresent());
2273 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2274 leaderStateChanged.getLocalShardDataTree().get());
2276 MessageCollectorActor.clearMessages(listener);
2278 // Force a leader change
2280 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2282 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2283 ShardLeaderStateChanged.class);
2284 assertEquals("getLocalShardDataTree present", false,
2285 leaderStateChanged.getLocalShardDataTree().isPresent());
2291 public void testFollowerInitialSyncStatus() throws Exception {
2292 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2293 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2294 "testFollowerInitialSyncStatus");
2296 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2298 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2300 shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2302 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2306 public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2307 new ShardTestKit(getSystem()) {{
2308 String testName = "testClusteredDataChangeListenerDelayedRegistration";
2309 dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2310 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2312 final MockDataChangeListener listener = new MockDataChangeListener(1);
2313 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2314 actorFactory.generateActorId(testName + "-DataChangeListener"));
2316 setupInMemorySnapshotStore();
2318 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2319 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2320 actorFactory.generateActorId(testName + "-shard"));
2322 waitUntilNoLeader(shard);
2324 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2326 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2327 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2328 RegisterChangeListenerReply.class);
2329 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2331 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2332 customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2334 listener.waitForChangeEvents();
2339 public void testClusteredDataChangeListenerRegistration() throws Exception {
2340 new ShardTestKit(getSystem()) {{
2341 String testName = "testClusteredDataChangeListenerRegistration";
2342 final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2343 actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2345 final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2346 actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2348 final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2349 Shard.builder().id(followerShardID).
2350 datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2351 peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2352 "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2353 withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2355 final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2356 Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2357 peerAddresses(Collections.singletonMap(followerShardID.toString(),
2358 "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2359 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2361 leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
2362 String leaderPath = waitUntilLeader(followerShard);
2363 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2365 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2366 final MockDataChangeListener listener = new MockDataChangeListener(1);
2367 final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2368 actorFactory.generateActorId(testName + "-DataChangeListener"));
2370 followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2371 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2372 RegisterChangeListenerReply.class);
2373 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2375 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2377 listener.waitForChangeEvents();
2382 public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2383 new ShardTestKit(getSystem()) {{
2384 String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2385 dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2386 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2388 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2389 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2390 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2392 setupInMemorySnapshotStore();
2394 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2395 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2396 actorFactory.generateActorId(testName + "-shard"));
2398 waitUntilNoLeader(shard);
2400 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2402 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2403 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2404 RegisterDataTreeChangeListenerReply.class);
2405 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2407 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2408 customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2410 listener.waitForChangeEvents();
2415 public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2416 new ShardTestKit(getSystem()) {{
2417 String testName = "testClusteredDataTreeChangeListenerRegistration";
2418 final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2419 actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2421 final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2422 actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2424 final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2425 Shard.builder().id(followerShardID).
2426 datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2427 peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2428 "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2429 withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2431 final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2432 Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2433 peerAddresses(Collections.singletonMap(followerShardID.toString(),
2434 "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2435 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2437 leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
2438 String leaderPath = waitUntilLeader(followerShard);
2439 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2441 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2442 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2443 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2444 actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2446 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2447 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2448 RegisterDataTreeChangeListenerReply.class);
2449 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2451 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2453 listener.waitForChangeEvents();
2458 public void testServerRemoved() throws Exception {
2459 final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2461 final ActorRef shard = parent.underlyingActor().context().actorOf(
2462 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2463 "testServerRemoved");
2465 shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2467 MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);