2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Test;
49 import org.mockito.InOrder;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
54 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
74 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
75 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
76 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
77 import org.opendaylight.controller.cluster.datastore.modification.Modification;
78 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
79 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
80 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
96 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
97 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
98 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
99 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
100 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
101 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
102 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
103 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
104 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
105 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
122 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
123 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
124 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
125 import scala.concurrent.Await;
126 import scala.concurrent.Future;
127 import scala.concurrent.duration.FiniteDuration;
129 public class ShardTest extends AbstractShardTest {
130 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
132 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
135 public void testRegisterChangeListener() throws Exception {
136 new ShardTestKit(getSystem()) {{
137 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
138 newShardProps(), "testRegisterChangeListener");
140 waitUntilLeader(shard);
142 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
144 final MockDataChangeListener listener = new MockDataChangeListener(1);
145 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
146 "testRegisterChangeListener-DataChangeListener");
148 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
149 dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
151 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
152 RegisterChangeListenerReply.class);
153 final String replyPath = reply.getListenerRegistrationPath().toString();
154 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
155 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
157 final YangInstanceIdentifier path = TestModel.TEST_PATH;
158 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
160 listener.waitForChangeEvents(path);
162 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
163 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
167 @SuppressWarnings("serial")
169 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
170 // This test tests the timing window in which a change listener is registered before the
171 // shard becomes the leader. We verify that the listener is registered and notified of the
172 // existing data when the shard becomes the leader.
173 new ShardTestKit(getSystem()) {{
174 // For this test, we want to send the RegisterChangeListener message after the shard
175 // has recovered from persistence and before it becomes the leader. So we subclass
176 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
177 // we know that the shard has been initialized to a follower and has started the
178 // election process. The following 2 CountDownLatches are used to coordinate the
179 // ElectionTimeout with the sending of the RegisterChangeListener message.
180 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
181 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
182 final Creator<Shard> creator = new Creator<Shard>() {
183 boolean firstElectionTimeout = true;
186 public Shard create() throws Exception {
187 // Use a non persistent provider because this test actually invokes persist on the journal
188 // this will cause all other messages to not be queued properly after that.
189 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
190 // it does do a persist)
191 return new Shard(newShardBuilder()) {
193 public void onReceiveCommand(final Object message) throws Exception {
194 if(message instanceof ElectionTimeout && firstElectionTimeout) {
195 // Got the first ElectionTimeout. We don't forward it to the
196 // base Shard yet until we've sent the RegisterChangeListener
197 // message. So we signal the onFirstElectionTimeout latch to tell
198 // the main thread to send the RegisterChangeListener message and
199 // start a thread to wait on the onChangeListenerRegistered latch,
200 // which the main thread signals after it has sent the message.
201 // After the onChangeListenerRegistered is triggered, we send the
202 // original ElectionTimeout message to proceed with the election.
203 firstElectionTimeout = false;
204 final ActorRef self = getSelf();
208 Uninterruptibles.awaitUninterruptibly(
209 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
210 self.tell(message, self);
214 onFirstElectionTimeout.countDown();
216 super.onReceiveCommand(message);
223 final MockDataChangeListener listener = new MockDataChangeListener(1);
224 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
225 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
227 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
228 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
229 "testRegisterChangeListenerWhenNotLeaderInitially");
231 // Write initial data into the in-memory store.
232 final YangInstanceIdentifier path = TestModel.TEST_PATH;
233 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
235 // Wait until the shard receives the first ElectionTimeout message.
236 assertEquals("Got first ElectionTimeout", true,
237 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
239 // Now send the RegisterChangeListener and wait for the reply.
240 shard.tell(new RegisterChangeListener(path, dclActor,
241 AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
243 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
244 RegisterChangeListenerReply.class);
245 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
247 // Sanity check - verify the shard is not the leader yet.
248 shard.tell(new FindLeader(), getRef());
249 final FindLeaderReply findLeadeReply =
250 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
251 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
253 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
254 // with the election process.
255 onChangeListenerRegistered.countDown();
257 // Wait for the shard to become the leader and notify our listener with the existing
258 // data in the store.
259 listener.waitForChangeEvents(path);
261 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
262 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
267 public void testRegisterDataTreeChangeListener() throws Exception {
268 new ShardTestKit(getSystem()) {{
269 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
270 newShardProps(), "testRegisterDataTreeChangeListener");
272 waitUntilLeader(shard);
274 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
276 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
277 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
278 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
280 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
282 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
283 RegisterDataTreeChangeListenerReply.class);
284 final String replyPath = reply.getListenerRegistrationPath().toString();
285 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
286 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
288 final YangInstanceIdentifier path = TestModel.TEST_PATH;
289 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
291 listener.waitForChangeEvents();
293 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
294 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
298 @SuppressWarnings("serial")
300 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
301 new ShardTestKit(getSystem()) {{
302 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
303 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
304 final Creator<Shard> creator = new Creator<Shard>() {
305 boolean firstElectionTimeout = true;
308 public Shard create() throws Exception {
309 return new Shard(Shard.builder().id(shardID).datastoreContext(
310 dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
312 public void onReceiveCommand(final Object message) throws Exception {
313 if(message instanceof ElectionTimeout && firstElectionTimeout) {
314 firstElectionTimeout = false;
315 final ActorRef self = getSelf();
319 Uninterruptibles.awaitUninterruptibly(
320 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
321 self.tell(message, self);
325 onFirstElectionTimeout.countDown();
327 super.onReceiveCommand(message);
334 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
335 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
336 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
338 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
339 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
340 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
342 final YangInstanceIdentifier path = TestModel.TEST_PATH;
343 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
345 assertEquals("Got first ElectionTimeout", true,
346 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
348 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
349 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
350 RegisterDataTreeChangeListenerReply.class);
351 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
353 shard.tell(new FindLeader(), getRef());
354 final FindLeaderReply findLeadeReply =
355 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
356 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
358 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
360 onChangeListenerRegistered.countDown();
362 // TODO: investigate why we do not receive data chage events
363 listener.waitForChangeEvents();
365 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
366 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
371 public void testCreateTransaction(){
372 new ShardTestKit(getSystem()) {{
373 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
375 waitUntilLeader(shard);
377 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
379 shard.tell(new CreateTransaction("txn-1",
380 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
382 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
383 CreateTransactionReply.class);
385 final String path = reply.getTransactionActorPath().toString();
386 assertTrue("Unexpected transaction path " + path,
387 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
389 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
394 public void testCreateTransactionOnChain(){
395 new ShardTestKit(getSystem()) {{
396 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
398 waitUntilLeader(shard);
400 shard.tell(new CreateTransaction("txn-1",
401 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
404 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
405 CreateTransactionReply.class);
407 final String path = reply.getTransactionActorPath().toString();
408 assertTrue("Unexpected transaction path " + path,
409 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
411 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
415 @SuppressWarnings("serial")
417 public void testPeerAddressResolved() throws Exception {
418 new ShardTestKit(getSystem()) {{
419 final CountDownLatch recoveryComplete = new CountDownLatch(1);
420 class TestShard extends Shard {
422 super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
423 peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
424 schemaContext(SCHEMA_CONTEXT));
427 String getPeerAddress(String id) {
428 return getRaftActorContext().getPeerAddress(id);
432 protected void onRecoveryComplete() {
434 super.onRecoveryComplete();
436 recoveryComplete.countDown();
441 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
442 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
444 public TestShard create() throws Exception {
445 return new TestShard();
447 })), "testPeerAddressResolved");
449 assertEquals("Recovery complete", true,
450 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
452 final String address = "akka://foobar";
453 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
455 assertEquals("getPeerAddress", address,
456 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
458 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
463 public void testApplySnapshot() throws Exception {
465 ShardTestKit testkit = new ShardTestKit(getSystem());
467 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
468 "testApplySnapshot");
470 testkit.waitUntilLeader(shard);
472 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
473 store.setSchemaContext(SCHEMA_CONTEXT);
475 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
476 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
477 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
478 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
480 writeToStore(store, TestModel.TEST_PATH, container);
482 final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
483 final NormalizedNode<?,?> expected = readStore(store, root);
485 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
486 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
488 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
490 final NormalizedNode<?,?> actual = readStore(shard, root);
492 assertEquals("Root node", expected, actual);
494 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
498 public void testApplyState() throws Exception {
500 ShardTestKit testkit = new ShardTestKit(getSystem());
502 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
504 testkit.waitUntilLeader(shard);
506 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
508 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
509 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
511 shard.underlyingActor().onReceiveCommand(applyState);
513 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
514 assertEquals("Applied state", node, actual);
516 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
520 public void testApplyStateWithCandidatePayload() throws Exception {
522 ShardTestKit testkit = new ShardTestKit(getSystem());
524 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
526 testkit.waitUntilLeader(shard);
528 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
529 final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
531 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
532 DataTreeCandidatePayload.create(candidate)));
534 shard.underlyingActor().onReceiveCommand(applyState);
536 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
537 assertEquals("Applied state", node, actual);
539 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
542 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
543 final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
544 testStore.setSchemaContext(SCHEMA_CONTEXT);
546 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
548 final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
550 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
551 SerializationUtils.serializeNormalizedNode(root),
552 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
556 private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
557 source.validate(mod);
558 final DataTreeCandidate candidate = source.prepare(mod);
559 source.commit(candidate);
560 return DataTreeCandidatePayload.create(candidate);
564 public void testDataTreeCandidateRecovery() throws Exception {
565 // Set up the InMemorySnapshotStore.
566 final DataTree source = setupInMemorySnapshotStore();
568 final DataTreeModification writeMod = source.takeSnapshot().newModification();
569 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
571 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
573 // Set up the InMemoryJournal.
574 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
576 final int nListEntries = 16;
577 final Set<Integer> listEntryKeys = new HashSet<>();
579 // Add some ModificationPayload entries
580 for (int i = 1; i <= nListEntries; i++) {
581 listEntryKeys.add(Integer.valueOf(i));
583 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
584 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
586 final DataTreeModification mod = source.takeSnapshot().newModification();
587 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
589 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
590 payloadForModification(source, mod)));
593 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
594 new ApplyJournalEntries(nListEntries));
596 testRecovery(listEntryKeys);
600 public void testModicationRecovery() throws Exception {
602 // Set up the InMemorySnapshotStore.
603 setupInMemorySnapshotStore();
605 // Set up the InMemoryJournal.
607 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
609 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
610 new WriteModification(TestModel.OUTER_LIST_PATH,
611 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
613 final int nListEntries = 16;
614 final Set<Integer> listEntryKeys = new HashSet<>();
616 // Add some ModificationPayload entries
617 for(int i = 1; i <= nListEntries; i++) {
618 listEntryKeys.add(Integer.valueOf(i));
619 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
620 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
621 final Modification mod = new MergeModification(path,
622 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
623 InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
624 newModificationPayload(mod)));
627 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
628 new ApplyJournalEntries(nListEntries));
630 testRecovery(listEntryKeys);
633 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
634 final MutableCompositeModification compMod = new MutableCompositeModification();
635 for(final Modification mod: mods) {
636 compMod.addModification(mod);
639 return new ModificationPayload(compMod);
643 public void testConcurrentThreePhaseCommits() throws Throwable {
644 new ShardTestKit(getSystem()) {{
645 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
646 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
647 "testConcurrentThreePhaseCommits");
649 waitUntilLeader(shard);
651 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
653 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
655 final String transactionID1 = "tx1";
656 final MutableCompositeModification modification1 = new MutableCompositeModification();
657 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
658 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
660 final String transactionID2 = "tx2";
661 final MutableCompositeModification modification2 = new MutableCompositeModification();
662 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
663 TestModel.OUTER_LIST_PATH,
664 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
667 final String transactionID3 = "tx3";
668 final MutableCompositeModification modification3 = new MutableCompositeModification();
669 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
670 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
671 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
672 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
675 final long timeoutSec = 5;
676 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
677 final Timeout timeout = new Timeout(duration);
679 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
680 // by the ShardTransaction.
682 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
683 cohort1, modification1, true, false), getRef());
684 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
685 expectMsgClass(duration, ReadyTransactionReply.class));
686 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
688 // Send the CanCommitTransaction message for the first Tx.
690 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
691 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
692 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
693 assertEquals("Can commit", true, canCommitReply.getCanCommit());
695 // Send the ForwardedReadyTransaction for the next 2 Tx's.
697 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
698 cohort2, modification2, true, false), getRef());
699 expectMsgClass(duration, ReadyTransactionReply.class);
701 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
702 cohort3, modification3, true, false), getRef());
703 expectMsgClass(duration, ReadyTransactionReply.class);
705 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
706 // processed after the first Tx completes.
708 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
709 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
711 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
712 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
714 // Send the CommitTransaction message for the first Tx. After it completes, it should
715 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
717 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
718 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
720 // Wait for the next 2 Tx's to complete.
722 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
723 final CountDownLatch commitLatch = new CountDownLatch(2);
725 class OnFutureComplete extends OnComplete<Object> {
726 private final Class<?> expRespType;
728 OnFutureComplete(final Class<?> expRespType) {
729 this.expRespType = expRespType;
733 public void onComplete(final Throwable error, final Object resp) {
735 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
738 assertEquals("Commit response type", expRespType, resp.getClass());
740 } catch (final Exception e) {
746 void onSuccess(final Object resp) throws Exception {
750 class OnCommitFutureComplete extends OnFutureComplete {
751 OnCommitFutureComplete() {
752 super(CommitTransactionReply.SERIALIZABLE_CLASS);
756 public void onComplete(final Throwable error, final Object resp) {
757 super.onComplete(error, resp);
758 commitLatch.countDown();
762 class OnCanCommitFutureComplete extends OnFutureComplete {
763 private final String transactionID;
765 OnCanCommitFutureComplete(final String transactionID) {
766 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
767 this.transactionID = transactionID;
771 void onSuccess(final Object resp) throws Exception {
772 final CanCommitTransactionReply canCommitReply =
773 CanCommitTransactionReply.fromSerializable(resp);
774 assertEquals("Can commit", true, canCommitReply.getCanCommit());
776 final Future<Object> commitFuture = Patterns.ask(shard,
777 new CommitTransaction(transactionID).toSerializable(), timeout);
778 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
782 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
783 getSystem().dispatcher());
785 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
786 getSystem().dispatcher());
788 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
790 if(caughtEx.get() != null) {
791 throw caughtEx.get();
794 assertEquals("Commits complete", true, done);
796 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
797 inOrder.verify(cohort1).canCommit();
798 inOrder.verify(cohort1).preCommit();
799 inOrder.verify(cohort1).commit();
800 inOrder.verify(cohort2).canCommit();
801 inOrder.verify(cohort2).preCommit();
802 inOrder.verify(cohort2).commit();
803 inOrder.verify(cohort3).canCommit();
804 inOrder.verify(cohort3).preCommit();
805 inOrder.verify(cohort3).commit();
807 // Verify data in the data store.
809 verifyOuterListEntry(shard, 1);
811 verifyLastApplied(shard, 2);
813 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
817 private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
818 final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
819 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
822 private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
823 final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
824 final int messagesSent) {
825 final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
826 batched.addModification(new WriteModification(path, data));
827 batched.setReady(ready);
828 batched.setDoCommitOnReady(doCommitOnReady);
829 batched.setTotalMessagesSent(messagesSent);
834 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
835 new ShardTestKit(getSystem()) {{
836 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
837 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
838 "testBatchedModificationsWithNoCommitOnReady");
840 waitUntilLeader(shard);
842 final String transactionID = "tx";
843 final FiniteDuration duration = duration("5 seconds");
845 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
846 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
848 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
849 if(mockCohort.get() == null) {
850 mockCohort.set(createDelegatingMockCohort("cohort", actual));
853 return mockCohort.get();
857 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
859 // Send a BatchedModifications to start a transaction.
861 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
862 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
863 expectMsgClass(duration, BatchedModificationsReply.class);
865 // Send a couple more BatchedModifications.
867 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
868 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
869 expectMsgClass(duration, BatchedModificationsReply.class);
871 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
872 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
873 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
874 expectMsgClass(duration, ReadyTransactionReply.class);
876 // Send the CanCommitTransaction message.
878 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
879 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
880 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
881 assertEquals("Can commit", true, canCommitReply.getCanCommit());
883 // Send the CanCommitTransaction message.
885 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
886 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
888 final InOrder inOrder = inOrder(mockCohort.get());
889 inOrder.verify(mockCohort.get()).canCommit();
890 inOrder.verify(mockCohort.get()).preCommit();
891 inOrder.verify(mockCohort.get()).commit();
893 // Verify data in the data store.
895 verifyOuterListEntry(shard, 1);
897 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
902 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
903 new ShardTestKit(getSystem()) {{
904 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
905 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
906 "testBatchedModificationsWithCommitOnReady");
908 waitUntilLeader(shard);
910 final String transactionID = "tx";
911 final FiniteDuration duration = duration("5 seconds");
913 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
914 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
916 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
917 if(mockCohort.get() == null) {
918 mockCohort.set(createDelegatingMockCohort("cohort", actual));
921 return mockCohort.get();
925 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
927 // Send a BatchedModifications to start a transaction.
929 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
930 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
931 expectMsgClass(duration, BatchedModificationsReply.class);
933 // Send a couple more BatchedModifications.
935 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
936 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
937 expectMsgClass(duration, BatchedModificationsReply.class);
939 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
940 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
941 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
943 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
945 final InOrder inOrder = inOrder(mockCohort.get());
946 inOrder.verify(mockCohort.get()).canCommit();
947 inOrder.verify(mockCohort.get()).preCommit();
948 inOrder.verify(mockCohort.get()).commit();
950 // Verify data in the data store.
952 verifyOuterListEntry(shard, 1);
954 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
958 @Test(expected=IllegalStateException.class)
959 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
960 new ShardTestKit(getSystem()) {{
961 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
962 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
963 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
965 waitUntilLeader(shard);
967 final String transactionID = "tx1";
968 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
969 batched.setReady(true);
970 batched.setTotalMessagesSent(2);
972 shard.tell(batched, getRef());
974 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
976 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
978 if(failure != null) {
979 throw failure.cause();
985 public void testBatchedModificationsWithOperationFailure() throws Throwable {
986 new ShardTestKit(getSystem()) {{
987 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
988 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
989 "testBatchedModificationsWithOperationFailure");
991 waitUntilLeader(shard);
993 // Test merge with invalid data. An exception should occur when the merge is applied. Note that
994 // write will not validate the children for performance reasons.
996 String transactionID = "tx1";
998 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
999 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1000 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1002 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1003 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1004 shard.tell(batched, getRef());
1005 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1007 Throwable cause = failure.cause();
1009 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1010 batched.setReady(true);
1011 batched.setTotalMessagesSent(2);
1013 shard.tell(batched, getRef());
1015 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1016 assertEquals("Failure cause", cause, failure.cause());
1018 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1022 @SuppressWarnings("unchecked")
1023 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1024 final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1025 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1026 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1027 outerList.getValue() instanceof Iterable);
1028 final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1029 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1030 entry instanceof MapEntryNode);
1031 final MapEntryNode mapEntry = (MapEntryNode)entry;
1032 final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1033 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1034 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1035 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1039 public void testBatchedModificationsOnTransactionChain() throws Throwable {
1040 new ShardTestKit(getSystem()) {{
1041 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1042 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1043 "testBatchedModificationsOnTransactionChain");
1045 waitUntilLeader(shard);
1047 final String transactionChainID = "txChain";
1048 final String transactionID1 = "tx1";
1049 final String transactionID2 = "tx2";
1051 final FiniteDuration duration = duration("5 seconds");
1053 // Send a BatchedModifications to start a chained write transaction and ready it.
1055 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1056 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1057 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1058 containerNode, true, false, 1), getRef());
1059 expectMsgClass(duration, ReadyTransactionReply.class);
1061 // Create a read Tx on the same chain.
1063 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1064 transactionChainID).toSerializable(), getRef());
1066 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1068 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1069 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1070 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1072 // Commit the write transaction.
1074 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1075 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1076 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1077 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1079 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1080 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1082 // Verify data in the data store.
1084 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1085 assertEquals("Stored node", containerNode, actualNode);
1087 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1092 public void testOnBatchedModificationsWhenNotLeader() {
1093 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1094 new ShardTestKit(getSystem()) {{
1095 final Creator<Shard> creator = new Creator<Shard>() {
1096 private static final long serialVersionUID = 1L;
1099 public Shard create() throws Exception {
1100 return new Shard(newShardBuilder()) {
1102 protected boolean isLeader() {
1103 return overrideLeaderCalls.get() ? false : super.isLeader();
1107 protected ActorSelection getLeader() {
1108 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1115 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1116 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1118 waitUntilLeader(shard);
1120 overrideLeaderCalls.set(true);
1122 final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1124 shard.tell(batched, ActorRef.noSender());
1126 expectMsgEquals(batched);
1128 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1133 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1134 new ShardTestKit(getSystem()) {{
1135 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1136 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1137 "testForwardedReadyTransactionWithImmediateCommit");
1139 waitUntilLeader(shard);
1141 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1143 final String transactionID = "tx1";
1144 final MutableCompositeModification modification = new MutableCompositeModification();
1145 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1146 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1147 TestModel.TEST_PATH, containerNode, modification);
1149 final FiniteDuration duration = duration("5 seconds");
1151 // Simulate the ForwardedReadyTransaction messages that would be sent
1152 // by the ShardTransaction.
1154 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1155 cohort, modification, true, true), getRef());
1157 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1159 final InOrder inOrder = inOrder(cohort);
1160 inOrder.verify(cohort).canCommit();
1161 inOrder.verify(cohort).preCommit();
1162 inOrder.verify(cohort).commit();
1164 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1165 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1167 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1172 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1173 new ShardTestKit(getSystem()) {{
1174 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1175 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1176 "testReadyLocalTransactionWithImmediateCommit");
1178 waitUntilLeader(shard);
1180 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1182 final DataTreeModification modification = dataStore.newModification();
1184 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1185 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1186 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1187 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1189 final String txId = "tx1";
1190 modification.ready();
1191 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1193 shard.tell(readyMessage, getRef());
1195 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1197 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1198 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1200 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1205 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1206 new ShardTestKit(getSystem()) {{
1207 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1208 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1209 "testReadyLocalTransactionWithThreePhaseCommit");
1211 waitUntilLeader(shard);
1213 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1215 final DataTreeModification modification = dataStore.newModification();
1217 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1218 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1219 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1220 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1222 final String txId = "tx1";
1223 modification.ready();
1224 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1226 shard.tell(readyMessage, getRef());
1228 expectMsgClass(ReadyTransactionReply.class);
1230 // Send the CanCommitTransaction message.
1232 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1233 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1234 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1235 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1237 // Send the CanCommitTransaction message.
1239 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1240 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1242 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1243 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1245 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1250 public void testCommitWithPersistenceDisabled() throws Throwable {
1251 dataStoreContextBuilder.persistent(false);
1252 new ShardTestKit(getSystem()) {{
1253 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1254 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1255 "testCommitWithPersistenceDisabled");
1257 waitUntilLeader(shard);
1259 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1261 // Setup a simulated transactions with a mock cohort.
1263 final String transactionID = "tx";
1264 final MutableCompositeModification modification = new MutableCompositeModification();
1265 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1266 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1267 TestModel.TEST_PATH, containerNode, modification);
1269 final FiniteDuration duration = duration("5 seconds");
1271 // Simulate the ForwardedReadyTransaction messages that would be sent
1272 // by the ShardTransaction.
1274 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1275 cohort, modification, true, false), getRef());
1276 expectMsgClass(duration, ReadyTransactionReply.class);
1278 // Send the CanCommitTransaction message.
1280 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1281 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1282 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1283 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1285 // Send the CanCommitTransaction message.
1287 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1288 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1290 final InOrder inOrder = inOrder(cohort);
1291 inOrder.verify(cohort).canCommit();
1292 inOrder.verify(cohort).preCommit();
1293 inOrder.verify(cohort).commit();
1295 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1296 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1298 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1302 private static DataTreeCandidateTip mockCandidate(final String name) {
1303 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1304 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1305 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1306 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1307 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1308 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1309 return mockCandidate;
1312 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1313 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1314 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1315 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1316 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1317 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1318 return mockCandidate;
1322 public void testCommitWhenTransactionHasNoModifications(){
1323 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1324 // but here that need not happen
1325 new ShardTestKit(getSystem()) {
1327 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1328 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1329 "testCommitWhenTransactionHasNoModifications");
1331 waitUntilLeader(shard);
1333 final String transactionID = "tx1";
1334 final MutableCompositeModification modification = new MutableCompositeModification();
1335 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1336 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1337 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1338 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1339 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1341 final FiniteDuration duration = duration("5 seconds");
1343 // Simulate the ForwardedReadyTransaction messages that would be sent
1344 // by the ShardTransaction.
1346 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1347 cohort, modification, true, false), getRef());
1348 expectMsgClass(duration, ReadyTransactionReply.class);
1350 // Send the CanCommitTransaction message.
1352 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1353 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1354 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1355 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1357 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1358 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1360 final InOrder inOrder = inOrder(cohort);
1361 inOrder.verify(cohort).canCommit();
1362 inOrder.verify(cohort).preCommit();
1363 inOrder.verify(cohort).commit();
1365 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1366 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1368 // Use MBean for verification
1369 // Committed transaction count should increase as usual
1370 assertEquals(1,shardStats.getCommittedTransactionsCount());
1372 // Commit index should not advance because this does not go into the journal
1373 assertEquals(-1, shardStats.getCommitIndex());
1375 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1382 public void testCommitWhenTransactionHasModifications(){
1383 new ShardTestKit(getSystem()) {
1385 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1386 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1387 "testCommitWhenTransactionHasModifications");
1389 waitUntilLeader(shard);
1391 final String transactionID = "tx1";
1392 final MutableCompositeModification modification = new MutableCompositeModification();
1393 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1394 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1395 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1396 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1397 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1398 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1400 final FiniteDuration duration = duration("5 seconds");
1402 // Simulate the ForwardedReadyTransaction messages that would be sent
1403 // by the ShardTransaction.
1405 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1406 cohort, modification, true, false), getRef());
1407 expectMsgClass(duration, ReadyTransactionReply.class);
1409 // Send the CanCommitTransaction message.
1411 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1412 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1413 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1414 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1416 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1417 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1419 final InOrder inOrder = inOrder(cohort);
1420 inOrder.verify(cohort).canCommit();
1421 inOrder.verify(cohort).preCommit();
1422 inOrder.verify(cohort).commit();
1424 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1425 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1427 // Use MBean for verification
1428 // Committed transaction count should increase as usual
1429 assertEquals(1, shardStats.getCommittedTransactionsCount());
1431 // Commit index should advance as we do not have an empty modification
1432 assertEquals(0, shardStats.getCommitIndex());
1434 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1441 public void testCommitPhaseFailure() throws Throwable {
1442 new ShardTestKit(getSystem()) {{
1443 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1444 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1445 "testCommitPhaseFailure");
1447 waitUntilLeader(shard);
1449 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1452 final String transactionID1 = "tx1";
1453 final MutableCompositeModification modification1 = new MutableCompositeModification();
1454 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1455 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1456 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1457 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1458 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1460 final String transactionID2 = "tx2";
1461 final MutableCompositeModification modification2 = new MutableCompositeModification();
1462 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1463 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1465 final FiniteDuration duration = duration("5 seconds");
1466 final Timeout timeout = new Timeout(duration);
1468 // Simulate the ForwardedReadyTransaction messages that would be sent
1469 // by the ShardTransaction.
1471 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1472 cohort1, modification1, true, false), getRef());
1473 expectMsgClass(duration, ReadyTransactionReply.class);
1475 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1476 cohort2, modification2, true, false), getRef());
1477 expectMsgClass(duration, ReadyTransactionReply.class);
1479 // Send the CanCommitTransaction message for the first Tx.
1481 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1482 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1483 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1484 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1486 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1487 // processed after the first Tx completes.
1489 final Future<Object> canCommitFuture = Patterns.ask(shard,
1490 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1492 // Send the CommitTransaction message for the first Tx. This should send back an error
1493 // and trigger the 2nd Tx to proceed.
1495 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1496 expectMsgClass(duration, akka.actor.Status.Failure.class);
1498 // Wait for the 2nd Tx to complete the canCommit phase.
1500 final CountDownLatch latch = new CountDownLatch(1);
1501 canCommitFuture.onComplete(new OnComplete<Object>() {
1503 public void onComplete(final Throwable t, final Object resp) {
1506 }, getSystem().dispatcher());
1508 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1510 final InOrder inOrder = inOrder(cohort1, cohort2);
1511 inOrder.verify(cohort1).canCommit();
1512 inOrder.verify(cohort1).preCommit();
1513 inOrder.verify(cohort1).commit();
1514 inOrder.verify(cohort2).canCommit();
1516 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1521 public void testPreCommitPhaseFailure() throws Throwable {
1522 new ShardTestKit(getSystem()) {{
1523 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1524 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1525 "testPreCommitPhaseFailure");
1527 waitUntilLeader(shard);
1529 final String transactionID1 = "tx1";
1530 final MutableCompositeModification modification1 = new MutableCompositeModification();
1531 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1532 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1533 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1535 final String transactionID2 = "tx2";
1536 final MutableCompositeModification modification2 = new MutableCompositeModification();
1537 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1538 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1540 final FiniteDuration duration = duration("5 seconds");
1541 final Timeout timeout = new Timeout(duration);
1543 // Simulate the ForwardedReadyTransaction messages that would be sent
1544 // by the ShardTransaction.
1546 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1547 cohort1, modification1, true, false), getRef());
1548 expectMsgClass(duration, ReadyTransactionReply.class);
1550 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1551 cohort2, modification2, true, false), getRef());
1552 expectMsgClass(duration, ReadyTransactionReply.class);
1554 // Send the CanCommitTransaction message for the first Tx.
1556 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1557 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1558 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1559 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1561 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1562 // processed after the first Tx completes.
1564 final Future<Object> canCommitFuture = Patterns.ask(shard,
1565 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1567 // Send the CommitTransaction message for the first Tx. This should send back an error
1568 // and trigger the 2nd Tx to proceed.
1570 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1571 expectMsgClass(duration, akka.actor.Status.Failure.class);
1573 // Wait for the 2nd Tx to complete the canCommit phase.
1575 final CountDownLatch latch = new CountDownLatch(1);
1576 canCommitFuture.onComplete(new OnComplete<Object>() {
1578 public void onComplete(final Throwable t, final Object resp) {
1581 }, getSystem().dispatcher());
1583 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1585 final InOrder inOrder = inOrder(cohort1, cohort2);
1586 inOrder.verify(cohort1).canCommit();
1587 inOrder.verify(cohort1).preCommit();
1588 inOrder.verify(cohort2).canCommit();
1590 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1595 public void testCanCommitPhaseFailure() throws Throwable {
1596 new ShardTestKit(getSystem()) {{
1597 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1598 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1599 "testCanCommitPhaseFailure");
1601 waitUntilLeader(shard);
1603 final FiniteDuration duration = duration("5 seconds");
1605 final String transactionID1 = "tx1";
1606 final MutableCompositeModification modification = new MutableCompositeModification();
1607 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1608 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1610 // Simulate the ForwardedReadyTransaction messages that would be sent
1611 // by the ShardTransaction.
1613 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1614 cohort, modification, true, false), getRef());
1615 expectMsgClass(duration, ReadyTransactionReply.class);
1617 // Send the CanCommitTransaction message.
1619 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1620 expectMsgClass(duration, akka.actor.Status.Failure.class);
1622 // Send another can commit to ensure the failed one got cleaned up.
1626 final String transactionID2 = "tx2";
1627 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1629 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1630 cohort, modification, true, false), getRef());
1631 expectMsgClass(duration, ReadyTransactionReply.class);
1633 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1634 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1635 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1636 assertEquals("getCanCommit", true, reply.getCanCommit());
1638 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1643 public void testCanCommitPhaseFalseResponse() throws Throwable {
1644 new ShardTestKit(getSystem()) {{
1645 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1646 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1647 "testCanCommitPhaseFalseResponse");
1649 waitUntilLeader(shard);
1651 final FiniteDuration duration = duration("5 seconds");
1653 final String transactionID1 = "tx1";
1654 final MutableCompositeModification modification = new MutableCompositeModification();
1655 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1656 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1658 // Simulate the ForwardedReadyTransaction messages that would be sent
1659 // by the ShardTransaction.
1661 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1662 cohort, modification, true, false), getRef());
1663 expectMsgClass(duration, ReadyTransactionReply.class);
1665 // Send the CanCommitTransaction message.
1667 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1668 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1669 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1670 assertEquals("getCanCommit", false, reply.getCanCommit());
1672 // Send another can commit to ensure the failed one got cleaned up.
1676 final String transactionID2 = "tx2";
1677 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1679 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1680 cohort, modification, true, false), getRef());
1681 expectMsgClass(duration, ReadyTransactionReply.class);
1683 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1684 reply = CanCommitTransactionReply.fromSerializable(
1685 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1686 assertEquals("getCanCommit", true, reply.getCanCommit());
1688 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1693 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1694 new ShardTestKit(getSystem()) {{
1695 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1696 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1697 "testImmediateCommitWithCanCommitPhaseFailure");
1699 waitUntilLeader(shard);
1701 final FiniteDuration duration = duration("5 seconds");
1703 final String transactionID1 = "tx1";
1704 final MutableCompositeModification modification = new MutableCompositeModification();
1705 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1706 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1708 // Simulate the ForwardedReadyTransaction messages that would be sent
1709 // by the ShardTransaction.
1711 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1712 cohort, modification, true, true), getRef());
1714 expectMsgClass(duration, akka.actor.Status.Failure.class);
1716 // Send another can commit to ensure the failed one got cleaned up.
1720 final String transactionID2 = "tx2";
1721 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1722 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1723 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1724 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1725 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1726 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1727 doReturn(candidateRoot).when(candidate).getRootNode();
1728 doReturn(candidate).when(cohort).getCandidate();
1730 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1731 cohort, modification, true, true), getRef());
1733 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1735 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1740 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1741 new ShardTestKit(getSystem()) {{
1742 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1743 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1744 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1746 waitUntilLeader(shard);
1748 final FiniteDuration duration = duration("5 seconds");
1750 final String transactionID = "tx1";
1751 final MutableCompositeModification modification = new MutableCompositeModification();
1752 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1753 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1755 // Simulate the ForwardedReadyTransaction messages that would be sent
1756 // by the ShardTransaction.
1758 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1759 cohort, modification, true, true), getRef());
1761 expectMsgClass(duration, akka.actor.Status.Failure.class);
1763 // Send another can commit to ensure the failed one got cleaned up.
1767 final String transactionID2 = "tx2";
1768 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1769 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1770 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1771 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1772 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1773 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1774 doReturn(candidateRoot).when(candidate).getRootNode();
1775 doReturn(candidate).when(cohort).getCandidate();
1777 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1778 cohort, modification, true, true), getRef());
1780 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1782 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1787 public void testAbortBeforeFinishCommit() throws Throwable {
1788 new ShardTestKit(getSystem()) {{
1789 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1790 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1791 "testAbortBeforeFinishCommit");
1793 waitUntilLeader(shard);
1795 final FiniteDuration duration = duration("5 seconds");
1796 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1798 final String transactionID = "tx1";
1799 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1800 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1802 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1803 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1805 // Simulate an AbortTransaction message occurring during replication, after
1806 // persisting and before finishing the commit to the in-memory store.
1807 // We have no followers so due to optimizations in the RaftActor, it does not
1808 // attempt replication and thus we can't send an AbortTransaction message b/c
1809 // it would be processed too late after CommitTransaction completes. So we'll
1810 // simulate an AbortTransaction message occurring during replication by calling
1811 // the shard directly.
1813 shard.underlyingActor().doAbortTransaction(transactionID, null);
1815 return preCommitFuture;
1819 final MutableCompositeModification modification = new MutableCompositeModification();
1820 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1821 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1822 modification, preCommit);
1824 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1825 cohort, modification, true, false), getRef());
1826 expectMsgClass(duration, ReadyTransactionReply.class);
1828 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1829 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1830 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1831 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1833 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1834 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1836 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1838 // Since we're simulating an abort occurring during replication and before finish commit,
1839 // the data should still get written to the in-memory store since we've gotten past
1840 // canCommit and preCommit and persisted the data.
1841 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1843 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1848 public void testTransactionCommitTimeout() throws Throwable {
1849 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1851 new ShardTestKit(getSystem()) {{
1852 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1853 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1854 "testTransactionCommitTimeout");
1856 waitUntilLeader(shard);
1858 final FiniteDuration duration = duration("5 seconds");
1860 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1862 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1863 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1864 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1866 // Create 1st Tx - will timeout
1868 final String transactionID1 = "tx1";
1869 final MutableCompositeModification modification1 = new MutableCompositeModification();
1870 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1871 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1872 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1873 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1878 final String transactionID2 = "tx3";
1879 final MutableCompositeModification modification2 = new MutableCompositeModification();
1880 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1881 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1882 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1884 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1889 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1890 cohort1, modification1, true, false), getRef());
1891 expectMsgClass(duration, ReadyTransactionReply.class);
1893 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1894 cohort2, modification2, true, false), getRef());
1895 expectMsgClass(duration, ReadyTransactionReply.class);
1897 // canCommit 1st Tx. We don't send the commit so it should timeout.
1899 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1900 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1902 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1904 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1905 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1907 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1909 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1910 expectMsgClass(duration, akka.actor.Status.Failure.class);
1912 // Commit the 2nd Tx.
1914 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1915 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1917 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1918 assertNotNull(listNodePath + " not found", node);
1920 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1925 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1926 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1928 new ShardTestKit(getSystem()) {{
1929 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1930 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1931 "testTransactionCommitQueueCapacityExceeded");
1933 waitUntilLeader(shard);
1935 final FiniteDuration duration = duration("5 seconds");
1937 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1939 final String transactionID1 = "tx1";
1940 final MutableCompositeModification modification1 = new MutableCompositeModification();
1941 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1942 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1944 final String transactionID2 = "tx2";
1945 final MutableCompositeModification modification2 = new MutableCompositeModification();
1946 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1947 TestModel.OUTER_LIST_PATH,
1948 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1951 final String transactionID3 = "tx3";
1952 final MutableCompositeModification modification3 = new MutableCompositeModification();
1953 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1954 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1958 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1959 cohort1, modification1, true, false), getRef());
1960 expectMsgClass(duration, ReadyTransactionReply.class);
1962 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1963 cohort2, modification2, true, false), getRef());
1964 expectMsgClass(duration, ReadyTransactionReply.class);
1966 // The 3rd Tx should exceed queue capacity and fail.
1968 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1969 cohort3, modification3, true, false), getRef());
1970 expectMsgClass(duration, akka.actor.Status.Failure.class);
1972 // canCommit 1st Tx.
1974 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1975 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1977 // canCommit the 2nd Tx - it should get queued.
1979 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1981 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1983 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1984 expectMsgClass(duration, akka.actor.Status.Failure.class);
1986 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1991 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1992 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1994 new ShardTestKit(getSystem()) {{
1995 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1996 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1997 "testTransactionCommitWithPriorExpiredCohortEntries");
1999 waitUntilLeader(shard);
2001 final FiniteDuration duration = duration("5 seconds");
2003 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2005 final String transactionID1 = "tx1";
2006 final MutableCompositeModification modification1 = new MutableCompositeModification();
2007 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2008 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2010 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2011 cohort1, modification1, true, false), getRef());
2012 expectMsgClass(duration, ReadyTransactionReply.class);
2014 final String transactionID2 = "tx2";
2015 final MutableCompositeModification modification2 = new MutableCompositeModification();
2016 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2017 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2019 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2020 cohort2, modification2, true, false), getRef());
2021 expectMsgClass(duration, ReadyTransactionReply.class);
2023 final String transactionID3 = "tx3";
2024 final MutableCompositeModification modification3 = new MutableCompositeModification();
2025 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2026 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2028 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2029 cohort3, modification3, true, false), getRef());
2030 expectMsgClass(duration, ReadyTransactionReply.class);
2032 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2033 // should expire from the queue and the last one should be processed.
2035 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2036 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2038 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2043 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2044 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2046 new ShardTestKit(getSystem()) {{
2047 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2048 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2049 "testTransactionCommitWithSubsequentExpiredCohortEntry");
2051 waitUntilLeader(shard);
2053 final FiniteDuration duration = duration("5 seconds");
2055 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2057 final String transactionID1 = "tx1";
2058 final MutableCompositeModification modification1 = new MutableCompositeModification();
2059 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2060 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2062 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2063 cohort1, modification1, true, false), getRef());
2064 expectMsgClass(duration, ReadyTransactionReply.class);
2066 // CanCommit the first one so it's the current in-progress CohortEntry.
2068 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2069 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2071 // Ready the second Tx.
2073 final String transactionID2 = "tx2";
2074 final MutableCompositeModification modification2 = new MutableCompositeModification();
2075 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2076 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2078 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2079 cohort2, modification2, true, false), getRef());
2080 expectMsgClass(duration, ReadyTransactionReply.class);
2082 // Ready the third Tx.
2084 final String transactionID3 = "tx3";
2085 final DataTreeModification modification3 = dataStore.newModification();
2086 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2087 .apply(modification3);
2088 modification3.ready();
2089 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2091 shard.tell(readyMessage, getRef());
2093 // Commit the first Tx. After completing, the second should expire from the queue and the third
2096 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2097 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2099 // Expect commit reply from the third Tx.
2101 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2103 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2104 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2106 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2111 public void testCanCommitBeforeReadyFailure() throws Throwable {
2112 new ShardTestKit(getSystem()) {{
2113 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2114 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2115 "testCanCommitBeforeReadyFailure");
2117 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2118 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2120 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2125 public void testAbortCurrentTransaction() throws Throwable {
2126 new ShardTestKit(getSystem()) {{
2127 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2128 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2129 "testAbortCurrentTransaction");
2131 waitUntilLeader(shard);
2133 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2135 final String transactionID1 = "tx1";
2136 final MutableCompositeModification modification1 = new MutableCompositeModification();
2137 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2138 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2139 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2141 final String transactionID2 = "tx2";
2142 final MutableCompositeModification modification2 = new MutableCompositeModification();
2143 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2144 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2146 final FiniteDuration duration = duration("5 seconds");
2147 final Timeout timeout = new Timeout(duration);
2149 // Simulate the ForwardedReadyTransaction messages that would be sent
2150 // by the ShardTransaction.
2152 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2153 cohort1, modification1, true, false), getRef());
2154 expectMsgClass(duration, ReadyTransactionReply.class);
2156 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2157 cohort2, modification2, true, false), getRef());
2158 expectMsgClass(duration, ReadyTransactionReply.class);
2160 // Send the CanCommitTransaction message for the first Tx.
2162 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2163 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2164 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2165 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2167 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2168 // processed after the first Tx completes.
2170 final Future<Object> canCommitFuture = Patterns.ask(shard,
2171 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2173 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2176 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2177 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2179 // Wait for the 2nd Tx to complete the canCommit phase.
2181 Await.ready(canCommitFuture, duration);
2183 final InOrder inOrder = inOrder(cohort1, cohort2);
2184 inOrder.verify(cohort1).canCommit();
2185 inOrder.verify(cohort2).canCommit();
2187 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2192 public void testAbortQueuedTransaction() throws Throwable {
2193 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2194 new ShardTestKit(getSystem()) {{
2195 final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2196 @SuppressWarnings("serial")
2197 final Creator<Shard> creator = new Creator<Shard>() {
2199 public Shard create() throws Exception {
2200 return new Shard(newShardBuilder()) {
2202 public void onReceiveCommand(final Object message) throws Exception {
2203 super.onReceiveCommand(message);
2204 if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2205 if(cleaupCheckLatch.get() != null) {
2206 cleaupCheckLatch.get().countDown();
2214 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2215 Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2216 Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2218 waitUntilLeader(shard);
2220 final String transactionID = "tx1";
2222 final MutableCompositeModification modification = new MutableCompositeModification();
2223 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2224 doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2226 final FiniteDuration duration = duration("5 seconds");
2230 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2231 cohort, modification, true, false), getRef());
2232 expectMsgClass(duration, ReadyTransactionReply.class);
2234 assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2236 // Send the AbortTransaction message.
2238 shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2239 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2241 verify(cohort).abort();
2243 // Verify the tx cohort is removed from queue at the cleanup check interval.
2245 cleaupCheckLatch.set(new CountDownLatch(1));
2246 assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2247 cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2249 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2251 // Now send CanCommitTransaction - should fail.
2253 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2255 Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2256 assertTrue("Failure type", failure instanceof IllegalStateException);
2258 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2263 public void testCreateSnapshot() throws Exception {
2264 testCreateSnapshot(true, "testCreateSnapshot");
2268 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2269 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2272 @SuppressWarnings("serial")
2273 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2275 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2277 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2278 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2279 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2284 public void saveSnapshot(final Object o) {
2285 savedSnapshot.set(o);
2286 super.saveSnapshot(o);
2290 dataStoreContextBuilder.persistent(persistent);
2292 new ShardTestKit(getSystem()) {{
2293 class TestShard extends Shard {
2295 protected TestShard(AbstractBuilder<?, ?> builder) {
2297 setPersistence(new TestPersistentDataProvider(super.persistence()));
2301 public void handleCommand(final Object message) {
2302 super.handleCommand(message);
2304 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2305 latch.get().countDown();
2310 public RaftActorContext getRaftActorContext() {
2311 return super.getRaftActorContext();
2315 final Creator<Shard> creator = new Creator<Shard>() {
2317 public Shard create() throws Exception {
2318 return new TestShard(newShardBuilder());
2322 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2323 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2325 waitUntilLeader(shard);
2326 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2328 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2330 // Trigger creation of a snapshot by ensuring
2331 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2332 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2333 awaitAndValidateSnapshot(expectedRoot);
2335 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2336 awaitAndValidateSnapshot(expectedRoot);
2338 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2341 private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2342 ) throws InterruptedException {
2343 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2344 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2346 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2347 savedSnapshot.get() instanceof Snapshot);
2349 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2351 latch.set(new CountDownLatch(1));
2352 savedSnapshot.set(null);
2355 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2357 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2358 assertEquals("Root node", expectedRoot, actual);
2365 * This test simply verifies that the applySnapShot logic will work
2366 * @throws ReadFailedException
2367 * @throws DataValidationFailedException
2370 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2371 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2372 store.setSchemaContext(SCHEMA_CONTEXT);
2374 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2375 putTransaction.write(TestModel.TEST_PATH,
2376 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2377 commitTransaction(store, putTransaction);
2380 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2382 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2384 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2385 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2387 commitTransaction(store, writeTransaction);
2389 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2391 assertEquals(expected, actual);
2395 public void testRecoveryApplicable(){
2397 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2398 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2400 final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2401 schemaContext(SCHEMA_CONTEXT).props();
2403 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2404 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2406 final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2407 schemaContext(SCHEMA_CONTEXT).props();
2409 new ShardTestKit(getSystem()) {{
2410 final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2411 persistentProps, "testPersistence1");
2413 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2415 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2417 final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2418 nonPersistentProps, "testPersistence2");
2420 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2422 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2429 public void testOnDatastoreContext() {
2430 new ShardTestKit(getSystem()) {{
2431 dataStoreContextBuilder.persistent(true);
2433 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2435 assertEquals("isRecoveryApplicable", true,
2436 shard.underlyingActor().persistence().isRecoveryApplicable());
2438 waitUntilLeader(shard);
2440 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2442 assertEquals("isRecoveryApplicable", false,
2443 shard.underlyingActor().persistence().isRecoveryApplicable());
2445 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2447 assertEquals("isRecoveryApplicable", true,
2448 shard.underlyingActor().persistence().isRecoveryApplicable());
2450 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2455 public void testRegisterRoleChangeListener() throws Exception {
2456 new ShardTestKit(getSystem()) {
2458 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2459 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2460 "testRegisterRoleChangeListener");
2462 waitUntilLeader(shard);
2464 final TestActorRef<MessageCollectorActor> listener =
2465 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2467 shard.tell(new RegisterRoleChangeListener(), listener);
2469 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2471 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2472 ShardLeaderStateChanged.class);
2473 assertEquals("getLocalShardDataTree present", true,
2474 leaderStateChanged.getLocalShardDataTree().isPresent());
2475 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2476 leaderStateChanged.getLocalShardDataTree().get());
2478 MessageCollectorActor.clearMessages(listener);
2480 // Force a leader change
2482 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2484 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2485 ShardLeaderStateChanged.class);
2486 assertEquals("getLocalShardDataTree present", false,
2487 leaderStateChanged.getLocalShardDataTree().isPresent());
2489 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2495 public void testFollowerInitialSyncStatus() throws Exception {
2496 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2497 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2498 "testFollowerInitialSyncStatus");
2500 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2502 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2504 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2506 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2508 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2511 private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2512 modification.ready();
2513 store.validate(modification);
2514 store.commit(store.prepare(modification));
2518 public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2519 new ShardTestKit(getSystem()) {{
2520 dataStoreContextBuilder.persistent(false);
2521 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2522 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2523 final Creator<Shard> creator = new Creator<Shard>() {
2524 private static final long serialVersionUID = 1L;
2525 boolean firstElectionTimeout = true;
2528 public Shard create() throws Exception {
2529 return new Shard(newShardBuilder()) {
2531 public void onReceiveCommand(final Object message) throws Exception {
2532 if(message instanceof ElectionTimeout && firstElectionTimeout) {
2533 firstElectionTimeout = false;
2534 final ActorRef self = getSelf();
2538 Uninterruptibles.awaitUninterruptibly(
2539 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2540 self.tell(message, self);
2544 onFirstElectionTimeout.countDown();
2546 super.onReceiveCommand(message);
2553 final MockDataChangeListener listener = new MockDataChangeListener(1);
2554 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2555 "testDataChangeListenerOnFollower-DataChangeListener");
2557 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2558 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
2559 withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
2561 assertEquals("Got first ElectionTimeout", true,
2562 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2564 shard.tell(new FindLeader(), getRef());
2565 final FindLeaderReply findLeadeReply =
2566 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2567 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2569 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2571 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2572 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2573 RegisterChangeListenerReply.class);
2574 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2576 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2578 onChangeListenerRegistered.countDown();
2580 listener.waitForChangeEvents();
2582 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2583 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2588 public void testClusteredDataChangeListernerRegistration() throws Exception {
2589 dataStoreContextBuilder.persistent(false).build();
2590 new ShardTestKit(getSystem()) {{
2591 final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2592 .shardName("inventory").type("config").build();
2594 final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2595 .shardName("inventory").type("config").build();
2596 final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2597 private static final long serialVersionUID = 1L;
2600 public Shard create() throws Exception {
2601 return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
2602 peerAddresses(Collections.singletonMap(member2ShardID.toString(),
2603 "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
2605 public void onReceiveCommand(final Object message) throws Exception {
2607 if(!(message instanceof ElectionTimeout)) {
2608 super.onReceiveCommand(message);
2615 final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2616 private static final long serialVersionUID = 1L;
2619 public Shard create() throws Exception {
2620 return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
2621 peerAddresses(Collections.singletonMap(member1ShardID.toString(),
2622 "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
2627 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2628 Props.create(new DelegatingShardCreator(followerShardCreator)),
2629 member1ShardID.toString());
2631 final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2632 Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
2633 member2ShardID.toString());
2634 // Sleep to let election happen
2635 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2637 shard.tell(new FindLeader(), getRef());
2638 final FindLeaderReply findLeaderReply =
2639 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2640 assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2642 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2643 final MockDataChangeListener listener = new MockDataChangeListener(1);
2644 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2645 "testDataChangeListenerOnFollower-DataChangeListener");
2647 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2648 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2649 RegisterChangeListenerReply.class);
2650 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2652 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2654 listener.waitForChangeEvents();
2656 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2657 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());