2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference;
49 import org.junit.Test;
50 import org.mockito.InOrder;
51 import org.opendaylight.controller.cluster.DataPersistenceProvider;
52 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
76 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.Modification;
79 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
80 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
81 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.raft.RaftActorContext;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
89 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
90 import org.opendaylight.controller.cluster.raft.Snapshot;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
93 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
94 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
97 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
98 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
99 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
100 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
101 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
102 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
103 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
104 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
105 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
106 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
107 import org.opendaylight.yangtools.yang.common.QName;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
125 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
126 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
127 import scala.concurrent.Await;
128 import scala.concurrent.Future;
129 import scala.concurrent.duration.FiniteDuration;
131 public class ShardTest extends AbstractShardTest {
132 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
134 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
137 public void testRegisterChangeListener() throws Exception {
138 new ShardTestKit(getSystem()) {{
139 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140 newShardProps(), "testRegisterChangeListener");
142 waitUntilLeader(shard);
144 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
146 final MockDataChangeListener listener = new MockDataChangeListener(1);
147 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148 "testRegisterChangeListener-DataChangeListener");
150 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151 dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
153 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154 RegisterChangeListenerReply.class);
155 final String replyPath = reply.getListenerRegistrationPath().toString();
156 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
159 final YangInstanceIdentifier path = TestModel.TEST_PATH;
160 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
162 listener.waitForChangeEvents(path);
164 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
169 @SuppressWarnings("serial")
171 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172 // This test tests the timing window in which a change listener is registered before the
173 // shard becomes the leader. We verify that the listener is registered and notified of the
174 // existing data when the shard becomes the leader.
175 new ShardTestKit(getSystem()) {{
176 // For this test, we want to send the RegisterChangeListener message after the shard
177 // has recovered from persistence and before it becomes the leader. So we subclass
178 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179 // we know that the shard has been initialized to a follower and has started the
180 // election process. The following 2 CountDownLatches are used to coordinate the
181 // ElectionTimeout with the sending of the RegisterChangeListener message.
182 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184 final Creator<Shard> creator = new Creator<Shard>() {
185 boolean firstElectionTimeout = true;
188 public Shard create() throws Exception {
189 // Use a non persistent provider because this test actually invokes persist on the journal
190 // this will cause all other messages to not be queued properly after that.
191 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
192 // it does do a persist)
193 return new Shard(shardID, Collections.<String,String>emptyMap(),
194 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
196 public void onReceiveCommand(final Object message) throws Exception {
197 if(message instanceof ElectionTimeout && firstElectionTimeout) {
198 // Got the first ElectionTimeout. We don't forward it to the
199 // base Shard yet until we've sent the RegisterChangeListener
200 // message. So we signal the onFirstElectionTimeout latch to tell
201 // the main thread to send the RegisterChangeListener message and
202 // start a thread to wait on the onChangeListenerRegistered latch,
203 // which the main thread signals after it has sent the message.
204 // After the onChangeListenerRegistered is triggered, we send the
205 // original ElectionTimeout message to proceed with the election.
206 firstElectionTimeout = false;
207 final ActorRef self = getSelf();
211 Uninterruptibles.awaitUninterruptibly(
212 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
213 self.tell(message, self);
217 onFirstElectionTimeout.countDown();
219 super.onReceiveCommand(message);
226 final MockDataChangeListener listener = new MockDataChangeListener(1);
227 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
228 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
230 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
231 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
232 "testRegisterChangeListenerWhenNotLeaderInitially");
234 // Write initial data into the in-memory store.
235 final YangInstanceIdentifier path = TestModel.TEST_PATH;
236 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
238 // Wait until the shard receives the first ElectionTimeout message.
239 assertEquals("Got first ElectionTimeout", true,
240 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
242 // Now send the RegisterChangeListener and wait for the reply.
243 shard.tell(new RegisterChangeListener(path, dclActor,
244 AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
246 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
247 RegisterChangeListenerReply.class);
248 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
250 // Sanity check - verify the shard is not the leader yet.
251 shard.tell(new FindLeader(), getRef());
252 final FindLeaderReply findLeadeReply =
253 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
254 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
256 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
257 // with the election process.
258 onChangeListenerRegistered.countDown();
260 // Wait for the shard to become the leader and notify our listener with the existing
261 // data in the store.
262 listener.waitForChangeEvents(path);
264 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
265 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
270 public void testRegisterDataTreeChangeListener() throws Exception {
271 new ShardTestKit(getSystem()) {{
272 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
273 newShardProps(), "testRegisterDataTreeChangeListener");
275 waitUntilLeader(shard);
277 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
279 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
280 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
281 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
283 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
285 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
286 RegisterDataTreeChangeListenerReply.class);
287 final String replyPath = reply.getListenerRegistrationPath().toString();
288 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
289 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
291 final YangInstanceIdentifier path = TestModel.TEST_PATH;
292 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
294 listener.waitForChangeEvents();
296 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
297 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
301 @SuppressWarnings("serial")
303 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
304 new ShardTestKit(getSystem()) {{
305 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
306 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
307 final Creator<Shard> creator = new Creator<Shard>() {
308 boolean firstElectionTimeout = true;
311 public Shard create() throws Exception {
312 return new Shard(shardID, Collections.<String,String>emptyMap(),
313 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
315 public void onReceiveCommand(final Object message) throws Exception {
316 if(message instanceof ElectionTimeout && firstElectionTimeout) {
317 firstElectionTimeout = false;
318 final ActorRef self = getSelf();
322 Uninterruptibles.awaitUninterruptibly(
323 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
324 self.tell(message, self);
328 onFirstElectionTimeout.countDown();
330 super.onReceiveCommand(message);
337 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
338 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
339 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
341 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
342 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
343 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
345 final YangInstanceIdentifier path = TestModel.TEST_PATH;
346 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
348 assertEquals("Got first ElectionTimeout", true,
349 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
351 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
352 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
353 RegisterDataTreeChangeListenerReply.class);
354 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
356 shard.tell(new FindLeader(), getRef());
357 final FindLeaderReply findLeadeReply =
358 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
359 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
361 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
363 onChangeListenerRegistered.countDown();
365 // TODO: investigate why we do not receive data chage events
366 listener.waitForChangeEvents();
368 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
369 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
374 public void testCreateTransaction(){
375 new ShardTestKit(getSystem()) {{
376 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
378 waitUntilLeader(shard);
380 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
382 shard.tell(new CreateTransaction("txn-1",
383 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
385 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
386 CreateTransactionReply.class);
388 final String path = reply.getTransactionActorPath().toString();
389 assertTrue("Unexpected transaction path " + path,
390 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
392 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
397 public void testCreateTransactionOnChain(){
398 new ShardTestKit(getSystem()) {{
399 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
401 waitUntilLeader(shard);
403 shard.tell(new CreateTransaction("txn-1",
404 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
407 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
408 CreateTransactionReply.class);
410 final String path = reply.getTransactionActorPath().toString();
411 assertTrue("Unexpected transaction path " + path,
412 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
414 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
418 @SuppressWarnings("serial")
420 public void testPeerAddressResolved() throws Exception {
421 new ShardTestKit(getSystem()) {{
422 final CountDownLatch recoveryComplete = new CountDownLatch(1);
423 class TestShard extends Shard {
425 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
426 newDatastoreContext(), SCHEMA_CONTEXT);
429 String getPeerAddress(String id) {
430 return getRaftActorContext().getPeerAddress(id);
434 protected void onRecoveryComplete() {
436 super.onRecoveryComplete();
438 recoveryComplete.countDown();
443 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
444 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
446 public TestShard create() throws Exception {
447 return new TestShard();
449 })), "testPeerAddressResolved");
451 assertEquals("Recovery complete", true,
452 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
454 final String address = "akka://foobar";
455 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
457 assertEquals("getPeerAddress", address,
458 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
460 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
465 public void testApplySnapshot() throws Exception {
467 ShardTestKit testkit = new ShardTestKit(getSystem());
469 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
470 "testApplySnapshot");
472 testkit.waitUntilLeader(shard);
474 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
475 store.setSchemaContext(SCHEMA_CONTEXT);
477 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
478 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
479 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
480 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
482 writeToStore(store, TestModel.TEST_PATH, container);
484 final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
485 final NormalizedNode<?,?> expected = readStore(store, root);
487 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
488 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
490 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
492 final NormalizedNode<?,?> actual = readStore(shard, root);
494 assertEquals("Root node", expected, actual);
496 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
500 public void testApplyState() throws Exception {
502 ShardTestKit testkit = new ShardTestKit(getSystem());
504 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
506 testkit.waitUntilLeader(shard);
508 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
510 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
511 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
513 shard.underlyingActor().onReceiveCommand(applyState);
515 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
516 assertEquals("Applied state", node, actual);
518 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
522 public void testApplyStateWithCandidatePayload() throws Exception {
524 ShardTestKit testkit = new ShardTestKit(getSystem());
526 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
528 testkit.waitUntilLeader(shard);
530 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
531 final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
533 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
534 DataTreeCandidatePayload.create(candidate)));
536 shard.underlyingActor().onReceiveCommand(applyState);
538 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
539 assertEquals("Applied state", node, actual);
541 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
544 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
545 final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
546 testStore.setSchemaContext(SCHEMA_CONTEXT);
548 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
550 final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
552 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
553 SerializationUtils.serializeNormalizedNode(root),
554 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
558 private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
559 source.validate(mod);
560 final DataTreeCandidate candidate = source.prepare(mod);
561 source.commit(candidate);
562 return DataTreeCandidatePayload.create(candidate);
566 public void testDataTreeCandidateRecovery() throws Exception {
567 // Set up the InMemorySnapshotStore.
568 final DataTree source = setupInMemorySnapshotStore();
570 final DataTreeModification writeMod = source.takeSnapshot().newModification();
571 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
573 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
575 // Set up the InMemoryJournal.
576 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
578 final int nListEntries = 16;
579 final Set<Integer> listEntryKeys = new HashSet<>();
581 // Add some ModificationPayload entries
582 for (int i = 1; i <= nListEntries; i++) {
583 listEntryKeys.add(Integer.valueOf(i));
585 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
586 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
588 final DataTreeModification mod = source.takeSnapshot().newModification();
589 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
591 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
592 payloadForModification(source, mod)));
595 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
596 new ApplyJournalEntries(nListEntries));
598 testRecovery(listEntryKeys);
602 public void testModicationRecovery() throws Exception {
604 // Set up the InMemorySnapshotStore.
605 setupInMemorySnapshotStore();
607 // Set up the InMemoryJournal.
609 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
611 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
612 new WriteModification(TestModel.OUTER_LIST_PATH,
613 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
615 final int nListEntries = 16;
616 final Set<Integer> listEntryKeys = new HashSet<>();
618 // Add some ModificationPayload entries
619 for(int i = 1; i <= nListEntries; i++) {
620 listEntryKeys.add(Integer.valueOf(i));
621 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
622 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
623 final Modification mod = new MergeModification(path,
624 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
625 InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
626 newModificationPayload(mod)));
629 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
630 new ApplyJournalEntries(nListEntries));
632 testRecovery(listEntryKeys);
635 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
636 final MutableCompositeModification compMod = new MutableCompositeModification();
637 for(final Modification mod: mods) {
638 compMod.addModification(mod);
641 return new ModificationPayload(compMod);
645 public void testConcurrentThreePhaseCommits() throws Throwable {
646 new ShardTestKit(getSystem()) {{
647 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
648 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
649 "testConcurrentThreePhaseCommits");
651 waitUntilLeader(shard);
653 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
655 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
657 final String transactionID1 = "tx1";
658 final MutableCompositeModification modification1 = new MutableCompositeModification();
659 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
660 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
662 final String transactionID2 = "tx2";
663 final MutableCompositeModification modification2 = new MutableCompositeModification();
664 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
665 TestModel.OUTER_LIST_PATH,
666 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
669 final String transactionID3 = "tx3";
670 final MutableCompositeModification modification3 = new MutableCompositeModification();
671 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
672 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
673 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
674 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
677 final long timeoutSec = 5;
678 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
679 final Timeout timeout = new Timeout(duration);
681 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
682 // by the ShardTransaction.
684 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
685 cohort1, modification1, true, false), getRef());
686 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
687 expectMsgClass(duration, ReadyTransactionReply.class));
688 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
690 // Send the CanCommitTransaction message for the first Tx.
692 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
693 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
694 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
695 assertEquals("Can commit", true, canCommitReply.getCanCommit());
697 // Send the ForwardedReadyTransaction for the next 2 Tx's.
699 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
700 cohort2, modification2, true, false), getRef());
701 expectMsgClass(duration, ReadyTransactionReply.class);
703 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
704 cohort3, modification3, true, false), getRef());
705 expectMsgClass(duration, ReadyTransactionReply.class);
707 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
708 // processed after the first Tx completes.
710 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
711 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
713 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
714 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
716 // Send the CommitTransaction message for the first Tx. After it completes, it should
717 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
719 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
720 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
722 // Wait for the next 2 Tx's to complete.
724 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
725 final CountDownLatch commitLatch = new CountDownLatch(2);
727 class OnFutureComplete extends OnComplete<Object> {
728 private final Class<?> expRespType;
730 OnFutureComplete(final Class<?> expRespType) {
731 this.expRespType = expRespType;
735 public void onComplete(final Throwable error, final Object resp) {
737 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
740 assertEquals("Commit response type", expRespType, resp.getClass());
742 } catch (final Exception e) {
748 void onSuccess(final Object resp) throws Exception {
752 class OnCommitFutureComplete extends OnFutureComplete {
753 OnCommitFutureComplete() {
754 super(CommitTransactionReply.SERIALIZABLE_CLASS);
758 public void onComplete(final Throwable error, final Object resp) {
759 super.onComplete(error, resp);
760 commitLatch.countDown();
764 class OnCanCommitFutureComplete extends OnFutureComplete {
765 private final String transactionID;
767 OnCanCommitFutureComplete(final String transactionID) {
768 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
769 this.transactionID = transactionID;
773 void onSuccess(final Object resp) throws Exception {
774 final CanCommitTransactionReply canCommitReply =
775 CanCommitTransactionReply.fromSerializable(resp);
776 assertEquals("Can commit", true, canCommitReply.getCanCommit());
778 final Future<Object> commitFuture = Patterns.ask(shard,
779 new CommitTransaction(transactionID).toSerializable(), timeout);
780 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
784 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
785 getSystem().dispatcher());
787 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
788 getSystem().dispatcher());
790 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
792 if(caughtEx.get() != null) {
793 throw caughtEx.get();
796 assertEquals("Commits complete", true, done);
798 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
799 inOrder.verify(cohort1).canCommit();
800 inOrder.verify(cohort1).preCommit();
801 inOrder.verify(cohort1).commit();
802 inOrder.verify(cohort2).canCommit();
803 inOrder.verify(cohort2).preCommit();
804 inOrder.verify(cohort2).commit();
805 inOrder.verify(cohort3).canCommit();
806 inOrder.verify(cohort3).preCommit();
807 inOrder.verify(cohort3).commit();
809 // Verify data in the data store.
811 verifyOuterListEntry(shard, 1);
813 verifyLastApplied(shard, 2);
815 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
819 private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
820 final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
821 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
824 private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
825 final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
826 final int messagesSent) {
827 final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
828 batched.addModification(new WriteModification(path, data));
829 batched.setReady(ready);
830 batched.setDoCommitOnReady(doCommitOnReady);
831 batched.setTotalMessagesSent(messagesSent);
836 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
837 new ShardTestKit(getSystem()) {{
838 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
840 "testBatchedModificationsWithNoCommitOnReady");
842 waitUntilLeader(shard);
844 final String transactionID = "tx";
845 final FiniteDuration duration = duration("5 seconds");
847 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
848 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
850 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
851 if(mockCohort.get() == null) {
852 mockCohort.set(createDelegatingMockCohort("cohort", actual));
855 return mockCohort.get();
859 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
861 // Send a BatchedModifications to start a transaction.
863 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
864 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
865 expectMsgClass(duration, BatchedModificationsReply.class);
867 // Send a couple more BatchedModifications.
869 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
870 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
871 expectMsgClass(duration, BatchedModificationsReply.class);
873 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
874 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
875 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
876 expectMsgClass(duration, ReadyTransactionReply.class);
878 // Send the CanCommitTransaction message.
880 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
881 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
882 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
883 assertEquals("Can commit", true, canCommitReply.getCanCommit());
885 // Send the CanCommitTransaction message.
887 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
888 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
890 final InOrder inOrder = inOrder(mockCohort.get());
891 inOrder.verify(mockCohort.get()).canCommit();
892 inOrder.verify(mockCohort.get()).preCommit();
893 inOrder.verify(mockCohort.get()).commit();
895 // Verify data in the data store.
897 verifyOuterListEntry(shard, 1);
899 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
904 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
905 new ShardTestKit(getSystem()) {{
906 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
907 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
908 "testBatchedModificationsWithCommitOnReady");
910 waitUntilLeader(shard);
912 final String transactionID = "tx";
913 final FiniteDuration duration = duration("5 seconds");
915 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
916 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
918 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
919 if(mockCohort.get() == null) {
920 mockCohort.set(createDelegatingMockCohort("cohort", actual));
923 return mockCohort.get();
927 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
929 // Send a BatchedModifications to start a transaction.
931 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
932 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
933 expectMsgClass(duration, BatchedModificationsReply.class);
935 // Send a couple more BatchedModifications.
937 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
938 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
939 expectMsgClass(duration, BatchedModificationsReply.class);
941 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
942 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
943 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
945 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
947 final InOrder inOrder = inOrder(mockCohort.get());
948 inOrder.verify(mockCohort.get()).canCommit();
949 inOrder.verify(mockCohort.get()).preCommit();
950 inOrder.verify(mockCohort.get()).commit();
952 // Verify data in the data store.
954 verifyOuterListEntry(shard, 1);
956 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
960 @Test(expected=IllegalStateException.class)
961 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
962 new ShardTestKit(getSystem()) {{
963 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
964 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
965 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
967 waitUntilLeader(shard);
969 final String transactionID = "tx1";
970 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
971 batched.setReady(true);
972 batched.setTotalMessagesSent(2);
974 shard.tell(batched, getRef());
976 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
978 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
980 if(failure != null) {
981 throw failure.cause();
987 public void testBatchedModificationsWithOperationFailure() throws Throwable {
988 new ShardTestKit(getSystem()) {{
989 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
990 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
991 "testBatchedModificationsWithOperationFailure");
993 waitUntilLeader(shard);
995 // Test merge with invalid data. An exception should occur when the merge is applied. Note that
996 // write will not validate the children for performance reasons.
998 String transactionID = "tx1";
1000 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1001 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1002 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1004 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1005 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1006 shard.tell(batched, getRef());
1007 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1009 Throwable cause = failure.cause();
1011 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1012 batched.setReady(true);
1013 batched.setTotalMessagesSent(2);
1015 shard.tell(batched, getRef());
1017 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1018 assertEquals("Failure cause", cause, failure.cause());
1020 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1024 @SuppressWarnings("unchecked")
1025 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1026 final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1027 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1028 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1029 outerList.getValue() instanceof Iterable);
1030 final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1031 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1032 entry instanceof MapEntryNode);
1033 final MapEntryNode mapEntry = (MapEntryNode)entry;
1034 final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1035 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1036 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1037 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1041 public void testBatchedModificationsOnTransactionChain() throws Throwable {
1042 new ShardTestKit(getSystem()) {{
1043 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1044 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1045 "testBatchedModificationsOnTransactionChain");
1047 waitUntilLeader(shard);
1049 final String transactionChainID = "txChain";
1050 final String transactionID1 = "tx1";
1051 final String transactionID2 = "tx2";
1053 final FiniteDuration duration = duration("5 seconds");
1055 // Send a BatchedModifications to start a chained write transaction and ready it.
1057 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1058 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1059 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1060 containerNode, true, false, 1), getRef());
1061 expectMsgClass(duration, ReadyTransactionReply.class);
1063 // Create a read Tx on the same chain.
1065 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1066 transactionChainID).toSerializable(), getRef());
1068 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1070 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1071 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1072 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1074 // Commit the write transaction.
1076 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1077 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1078 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1079 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1081 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1082 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1084 // Verify data in the data store.
1086 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1087 assertEquals("Stored node", containerNode, actualNode);
1089 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1094 public void testOnBatchedModificationsWhenNotLeader() {
1095 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1096 new ShardTestKit(getSystem()) {{
1097 final Creator<Shard> creator = new Creator<Shard>() {
1098 private static final long serialVersionUID = 1L;
1101 public Shard create() throws Exception {
1102 return new Shard(shardID, Collections.<String,String>emptyMap(),
1103 newDatastoreContext(), SCHEMA_CONTEXT) {
1105 protected boolean isLeader() {
1106 return overrideLeaderCalls.get() ? false : super.isLeader();
1110 protected ActorSelection getLeader() {
1111 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1118 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1119 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1121 waitUntilLeader(shard);
1123 overrideLeaderCalls.set(true);
1125 final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1127 shard.tell(batched, ActorRef.noSender());
1129 expectMsgEquals(batched);
1131 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1136 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1137 new ShardTestKit(getSystem()) {{
1138 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1139 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1140 "testForwardedReadyTransactionWithImmediateCommit");
1142 waitUntilLeader(shard);
1144 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1146 final String transactionID = "tx1";
1147 final MutableCompositeModification modification = new MutableCompositeModification();
1148 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1149 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1150 TestModel.TEST_PATH, containerNode, modification);
1152 final FiniteDuration duration = duration("5 seconds");
1154 // Simulate the ForwardedReadyTransaction messages that would be sent
1155 // by the ShardTransaction.
1157 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1158 cohort, modification, true, true), getRef());
1160 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1162 final InOrder inOrder = inOrder(cohort);
1163 inOrder.verify(cohort).canCommit();
1164 inOrder.verify(cohort).preCommit();
1165 inOrder.verify(cohort).commit();
1167 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1168 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1170 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1175 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1176 new ShardTestKit(getSystem()) {{
1177 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1178 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179 "testReadyLocalTransactionWithImmediateCommit");
1181 waitUntilLeader(shard);
1183 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1185 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1187 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1188 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1189 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1190 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1192 final String txId = "tx1";
1193 modification.ready();
1194 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1196 shard.tell(readyMessage, getRef());
1198 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1200 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1201 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1203 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1208 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1209 new ShardTestKit(getSystem()) {{
1210 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1211 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1212 "testReadyLocalTransactionWithThreePhaseCommit");
1214 waitUntilLeader(shard);
1216 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1218 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1220 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1221 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1222 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1223 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1225 final String txId = "tx1";
1226 modification.ready();
1227 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1229 shard.tell(readyMessage, getRef());
1231 expectMsgClass(ReadyTransactionReply.class);
1233 // Send the CanCommitTransaction message.
1235 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1236 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1237 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1238 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1240 // Send the CanCommitTransaction message.
1242 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1243 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1245 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1246 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1248 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1253 public void testCommitWithPersistenceDisabled() throws Throwable {
1254 dataStoreContextBuilder.persistent(false);
1255 new ShardTestKit(getSystem()) {{
1256 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1257 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1258 "testCommitWithPersistenceDisabled");
1260 waitUntilLeader(shard);
1262 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1264 // Setup a simulated transactions with a mock cohort.
1266 final String transactionID = "tx";
1267 final MutableCompositeModification modification = new MutableCompositeModification();
1268 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1269 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1270 TestModel.TEST_PATH, containerNode, modification);
1272 final FiniteDuration duration = duration("5 seconds");
1274 // Simulate the ForwardedReadyTransaction messages that would be sent
1275 // by the ShardTransaction.
1277 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1278 cohort, modification, true, false), getRef());
1279 expectMsgClass(duration, ReadyTransactionReply.class);
1281 // Send the CanCommitTransaction message.
1283 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1284 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1285 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1286 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1288 // Send the CanCommitTransaction message.
1290 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1291 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1293 final InOrder inOrder = inOrder(cohort);
1294 inOrder.verify(cohort).canCommit();
1295 inOrder.verify(cohort).preCommit();
1296 inOrder.verify(cohort).commit();
1298 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1299 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1301 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1305 private static DataTreeCandidateTip mockCandidate(final String name) {
1306 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1307 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1308 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1309 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1310 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1311 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1312 return mockCandidate;
1315 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1316 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1317 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1318 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1319 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1320 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1321 return mockCandidate;
1325 public void testCommitWhenTransactionHasNoModifications(){
1326 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1327 // but here that need not happen
1328 new ShardTestKit(getSystem()) {
1330 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1331 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1332 "testCommitWhenTransactionHasNoModifications");
1334 waitUntilLeader(shard);
1336 final String transactionID = "tx1";
1337 final MutableCompositeModification modification = new MutableCompositeModification();
1338 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1339 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1340 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1341 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1342 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1344 final FiniteDuration duration = duration("5 seconds");
1346 // Simulate the ForwardedReadyTransaction messages that would be sent
1347 // by the ShardTransaction.
1349 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1350 cohort, modification, true, false), getRef());
1351 expectMsgClass(duration, ReadyTransactionReply.class);
1353 // Send the CanCommitTransaction message.
1355 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1356 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1357 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1358 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1360 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1361 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1363 final InOrder inOrder = inOrder(cohort);
1364 inOrder.verify(cohort).canCommit();
1365 inOrder.verify(cohort).preCommit();
1366 inOrder.verify(cohort).commit();
1368 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1369 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1371 // Use MBean for verification
1372 // Committed transaction count should increase as usual
1373 assertEquals(1,shardStats.getCommittedTransactionsCount());
1375 // Commit index should not advance because this does not go into the journal
1376 assertEquals(-1, shardStats.getCommitIndex());
1378 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1385 public void testCommitWhenTransactionHasModifications(){
1386 new ShardTestKit(getSystem()) {
1388 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1389 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1390 "testCommitWhenTransactionHasModifications");
1392 waitUntilLeader(shard);
1394 final String transactionID = "tx1";
1395 final MutableCompositeModification modification = new MutableCompositeModification();
1396 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1397 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1398 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1399 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1400 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1401 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1403 final FiniteDuration duration = duration("5 seconds");
1405 // Simulate the ForwardedReadyTransaction messages that would be sent
1406 // by the ShardTransaction.
1408 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1409 cohort, modification, true, false), getRef());
1410 expectMsgClass(duration, ReadyTransactionReply.class);
1412 // Send the CanCommitTransaction message.
1414 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1415 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1416 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1417 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1419 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1420 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1422 final InOrder inOrder = inOrder(cohort);
1423 inOrder.verify(cohort).canCommit();
1424 inOrder.verify(cohort).preCommit();
1425 inOrder.verify(cohort).commit();
1427 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1428 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1430 // Use MBean for verification
1431 // Committed transaction count should increase as usual
1432 assertEquals(1, shardStats.getCommittedTransactionsCount());
1434 // Commit index should advance as we do not have an empty modification
1435 assertEquals(0, shardStats.getCommitIndex());
1437 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1444 public void testCommitPhaseFailure() throws Throwable {
1445 new ShardTestKit(getSystem()) {{
1446 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1447 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1448 "testCommitPhaseFailure");
1450 waitUntilLeader(shard);
1452 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1455 final String transactionID1 = "tx1";
1456 final MutableCompositeModification modification1 = new MutableCompositeModification();
1457 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1458 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1459 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1460 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1461 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1463 final String transactionID2 = "tx2";
1464 final MutableCompositeModification modification2 = new MutableCompositeModification();
1465 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1466 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1468 final FiniteDuration duration = duration("5 seconds");
1469 final Timeout timeout = new Timeout(duration);
1471 // Simulate the ForwardedReadyTransaction messages that would be sent
1472 // by the ShardTransaction.
1474 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1475 cohort1, modification1, true, false), getRef());
1476 expectMsgClass(duration, ReadyTransactionReply.class);
1478 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1479 cohort2, modification2, true, false), getRef());
1480 expectMsgClass(duration, ReadyTransactionReply.class);
1482 // Send the CanCommitTransaction message for the first Tx.
1484 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1485 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1486 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1487 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1489 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1490 // processed after the first Tx completes.
1492 final Future<Object> canCommitFuture = Patterns.ask(shard,
1493 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1495 // Send the CommitTransaction message for the first Tx. This should send back an error
1496 // and trigger the 2nd Tx to proceed.
1498 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1499 expectMsgClass(duration, akka.actor.Status.Failure.class);
1501 // Wait for the 2nd Tx to complete the canCommit phase.
1503 final CountDownLatch latch = new CountDownLatch(1);
1504 canCommitFuture.onComplete(new OnComplete<Object>() {
1506 public void onComplete(final Throwable t, final Object resp) {
1509 }, getSystem().dispatcher());
1511 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1513 final InOrder inOrder = inOrder(cohort1, cohort2);
1514 inOrder.verify(cohort1).canCommit();
1515 inOrder.verify(cohort1).preCommit();
1516 inOrder.verify(cohort1).commit();
1517 inOrder.verify(cohort2).canCommit();
1519 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1524 public void testPreCommitPhaseFailure() throws Throwable {
1525 new ShardTestKit(getSystem()) {{
1526 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1527 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1528 "testPreCommitPhaseFailure");
1530 waitUntilLeader(shard);
1532 final String transactionID1 = "tx1";
1533 final MutableCompositeModification modification1 = new MutableCompositeModification();
1534 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1535 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1536 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1538 final String transactionID2 = "tx2";
1539 final MutableCompositeModification modification2 = new MutableCompositeModification();
1540 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1541 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1543 final FiniteDuration duration = duration("5 seconds");
1544 final Timeout timeout = new Timeout(duration);
1546 // Simulate the ForwardedReadyTransaction messages that would be sent
1547 // by the ShardTransaction.
1549 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1550 cohort1, modification1, true, false), getRef());
1551 expectMsgClass(duration, ReadyTransactionReply.class);
1553 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1554 cohort2, modification2, true, false), getRef());
1555 expectMsgClass(duration, ReadyTransactionReply.class);
1557 // Send the CanCommitTransaction message for the first Tx.
1559 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1560 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1561 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1562 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1564 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1565 // processed after the first Tx completes.
1567 final Future<Object> canCommitFuture = Patterns.ask(shard,
1568 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1570 // Send the CommitTransaction message for the first Tx. This should send back an error
1571 // and trigger the 2nd Tx to proceed.
1573 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1574 expectMsgClass(duration, akka.actor.Status.Failure.class);
1576 // Wait for the 2nd Tx to complete the canCommit phase.
1578 final CountDownLatch latch = new CountDownLatch(1);
1579 canCommitFuture.onComplete(new OnComplete<Object>() {
1581 public void onComplete(final Throwable t, final Object resp) {
1584 }, getSystem().dispatcher());
1586 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1588 final InOrder inOrder = inOrder(cohort1, cohort2);
1589 inOrder.verify(cohort1).canCommit();
1590 inOrder.verify(cohort1).preCommit();
1591 inOrder.verify(cohort2).canCommit();
1593 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1598 public void testCanCommitPhaseFailure() throws Throwable {
1599 new ShardTestKit(getSystem()) {{
1600 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1601 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1602 "testCanCommitPhaseFailure");
1604 waitUntilLeader(shard);
1606 final FiniteDuration duration = duration("5 seconds");
1608 final String transactionID1 = "tx1";
1609 final MutableCompositeModification modification = new MutableCompositeModification();
1610 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1611 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1613 // Simulate the ForwardedReadyTransaction messages that would be sent
1614 // by the ShardTransaction.
1616 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1617 cohort, modification, true, false), getRef());
1618 expectMsgClass(duration, ReadyTransactionReply.class);
1620 // Send the CanCommitTransaction message.
1622 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1623 expectMsgClass(duration, akka.actor.Status.Failure.class);
1625 // Send another can commit to ensure the failed one got cleaned up.
1629 final String transactionID2 = "tx2";
1630 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1632 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1633 cohort, modification, true, false), getRef());
1634 expectMsgClass(duration, ReadyTransactionReply.class);
1636 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1637 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1638 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1639 assertEquals("getCanCommit", true, reply.getCanCommit());
1641 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1646 public void testCanCommitPhaseFalseResponse() throws Throwable {
1647 new ShardTestKit(getSystem()) {{
1648 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1649 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1650 "testCanCommitPhaseFalseResponse");
1652 waitUntilLeader(shard);
1654 final FiniteDuration duration = duration("5 seconds");
1656 final String transactionID1 = "tx1";
1657 final MutableCompositeModification modification = new MutableCompositeModification();
1658 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1659 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1661 // Simulate the ForwardedReadyTransaction messages that would be sent
1662 // by the ShardTransaction.
1664 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1665 cohort, modification, true, false), getRef());
1666 expectMsgClass(duration, ReadyTransactionReply.class);
1668 // Send the CanCommitTransaction message.
1670 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1671 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1672 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1673 assertEquals("getCanCommit", false, reply.getCanCommit());
1675 // Send another can commit to ensure the failed one got cleaned up.
1679 final String transactionID2 = "tx2";
1680 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1682 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1683 cohort, modification, true, false), getRef());
1684 expectMsgClass(duration, ReadyTransactionReply.class);
1686 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1687 reply = CanCommitTransactionReply.fromSerializable(
1688 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1689 assertEquals("getCanCommit", true, reply.getCanCommit());
1691 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1696 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1697 new ShardTestKit(getSystem()) {{
1698 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1699 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1700 "testImmediateCommitWithCanCommitPhaseFailure");
1702 waitUntilLeader(shard);
1704 final FiniteDuration duration = duration("5 seconds");
1706 final String transactionID1 = "tx1";
1707 final MutableCompositeModification modification = new MutableCompositeModification();
1708 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1709 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1711 // Simulate the ForwardedReadyTransaction messages that would be sent
1712 // by the ShardTransaction.
1714 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1715 cohort, modification, true, true), getRef());
1717 expectMsgClass(duration, akka.actor.Status.Failure.class);
1719 // Send another can commit to ensure the failed one got cleaned up.
1723 final String transactionID2 = "tx2";
1724 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1725 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1726 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1727 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1728 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1729 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1730 doReturn(candidateRoot).when(candidate).getRootNode();
1731 doReturn(candidate).when(cohort).getCandidate();
1733 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1734 cohort, modification, true, true), getRef());
1736 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1738 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1743 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1744 new ShardTestKit(getSystem()) {{
1745 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1746 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1747 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1749 waitUntilLeader(shard);
1751 final FiniteDuration duration = duration("5 seconds");
1753 final String transactionID = "tx1";
1754 final MutableCompositeModification modification = new MutableCompositeModification();
1755 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1756 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1758 // Simulate the ForwardedReadyTransaction messages that would be sent
1759 // by the ShardTransaction.
1761 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1762 cohort, modification, true, true), getRef());
1764 expectMsgClass(duration, akka.actor.Status.Failure.class);
1766 // Send another can commit to ensure the failed one got cleaned up.
1770 final String transactionID2 = "tx2";
1771 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1772 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1773 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1774 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1775 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1776 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1777 doReturn(candidateRoot).when(candidate).getRootNode();
1778 doReturn(candidate).when(cohort).getCandidate();
1780 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1781 cohort, modification, true, true), getRef());
1783 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1785 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1790 public void testAbortBeforeFinishCommit() throws Throwable {
1791 new ShardTestKit(getSystem()) {{
1792 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1793 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1794 "testAbortBeforeFinishCommit");
1796 waitUntilLeader(shard);
1798 final FiniteDuration duration = duration("5 seconds");
1799 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1801 final String transactionID = "tx1";
1802 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1803 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1805 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1806 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1808 // Simulate an AbortTransaction message occurring during replication, after
1809 // persisting and before finishing the commit to the in-memory store.
1810 // We have no followers so due to optimizations in the RaftActor, it does not
1811 // attempt replication and thus we can't send an AbortTransaction message b/c
1812 // it would be processed too late after CommitTransaction completes. So we'll
1813 // simulate an AbortTransaction message occurring during replication by calling
1814 // the shard directly.
1816 shard.underlyingActor().doAbortTransaction(transactionID, null);
1818 return preCommitFuture;
1822 final MutableCompositeModification modification = new MutableCompositeModification();
1823 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1824 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1825 modification, preCommit);
1827 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1828 cohort, modification, true, false), getRef());
1829 expectMsgClass(duration, ReadyTransactionReply.class);
1831 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1832 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1833 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1834 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1836 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1837 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1839 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1841 // Since we're simulating an abort occurring during replication and before finish commit,
1842 // the data should still get written to the in-memory store since we've gotten past
1843 // canCommit and preCommit and persisted the data.
1844 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1846 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1851 public void testTransactionCommitTimeout() throws Throwable {
1852 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1854 new ShardTestKit(getSystem()) {{
1855 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1856 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1857 "testTransactionCommitTimeout");
1859 waitUntilLeader(shard);
1861 final FiniteDuration duration = duration("5 seconds");
1863 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1865 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1866 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1867 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1869 // Create 1st Tx - will timeout
1871 final String transactionID1 = "tx1";
1872 final MutableCompositeModification modification1 = new MutableCompositeModification();
1873 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1874 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1875 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1876 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1881 final String transactionID2 = "tx3";
1882 final MutableCompositeModification modification2 = new MutableCompositeModification();
1883 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1884 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1885 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1887 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1892 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1893 cohort1, modification1, true, false), getRef());
1894 expectMsgClass(duration, ReadyTransactionReply.class);
1896 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1897 cohort2, modification2, true, false), getRef());
1898 expectMsgClass(duration, ReadyTransactionReply.class);
1900 // canCommit 1st Tx. We don't send the commit so it should timeout.
1902 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1903 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1905 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1907 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1908 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1910 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1912 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1913 expectMsgClass(duration, akka.actor.Status.Failure.class);
1915 // Commit the 2nd Tx.
1917 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1918 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1920 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1921 assertNotNull(listNodePath + " not found", node);
1923 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1928 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1929 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1931 new ShardTestKit(getSystem()) {{
1932 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1933 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1934 "testTransactionCommitQueueCapacityExceeded");
1936 waitUntilLeader(shard);
1938 final FiniteDuration duration = duration("5 seconds");
1940 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1942 final String transactionID1 = "tx1";
1943 final MutableCompositeModification modification1 = new MutableCompositeModification();
1944 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1945 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1947 final String transactionID2 = "tx2";
1948 final MutableCompositeModification modification2 = new MutableCompositeModification();
1949 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1950 TestModel.OUTER_LIST_PATH,
1951 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1954 final String transactionID3 = "tx3";
1955 final MutableCompositeModification modification3 = new MutableCompositeModification();
1956 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1957 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1961 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1962 cohort1, modification1, true, false), getRef());
1963 expectMsgClass(duration, ReadyTransactionReply.class);
1965 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1966 cohort2, modification2, true, false), getRef());
1967 expectMsgClass(duration, ReadyTransactionReply.class);
1969 // The 3rd Tx should exceed queue capacity and fail.
1971 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1972 cohort3, modification3, true, false), getRef());
1973 expectMsgClass(duration, akka.actor.Status.Failure.class);
1975 // canCommit 1st Tx.
1977 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1978 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1980 // canCommit the 2nd Tx - it should get queued.
1982 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1984 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1986 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1987 expectMsgClass(duration, akka.actor.Status.Failure.class);
1989 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1994 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1995 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1997 new ShardTestKit(getSystem()) {{
1998 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1999 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2000 "testTransactionCommitWithPriorExpiredCohortEntries");
2002 waitUntilLeader(shard);
2004 final FiniteDuration duration = duration("5 seconds");
2006 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2008 final String transactionID1 = "tx1";
2009 final MutableCompositeModification modification1 = new MutableCompositeModification();
2010 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2011 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2013 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2014 cohort1, modification1, true, false), getRef());
2015 expectMsgClass(duration, ReadyTransactionReply.class);
2017 final String transactionID2 = "tx2";
2018 final MutableCompositeModification modification2 = new MutableCompositeModification();
2019 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2020 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2022 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2023 cohort2, modification2, true, false), getRef());
2024 expectMsgClass(duration, ReadyTransactionReply.class);
2026 final String transactionID3 = "tx3";
2027 final MutableCompositeModification modification3 = new MutableCompositeModification();
2028 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2029 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2031 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2032 cohort3, modification3, true, false), getRef());
2033 expectMsgClass(duration, ReadyTransactionReply.class);
2035 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2036 // should expire from the queue and the last one should be processed.
2038 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2039 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2041 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2046 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2047 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2049 new ShardTestKit(getSystem()) {{
2050 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2051 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2052 "testTransactionCommitWithSubsequentExpiredCohortEntry");
2054 waitUntilLeader(shard);
2056 final FiniteDuration duration = duration("5 seconds");
2058 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2060 final String transactionID1 = "tx1";
2061 final MutableCompositeModification modification1 = new MutableCompositeModification();
2062 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2063 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2065 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2066 cohort1, modification1, true, false), getRef());
2067 expectMsgClass(duration, ReadyTransactionReply.class);
2069 // CanCommit the first one so it's the current in-progress CohortEntry.
2071 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2072 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2074 // Ready the second Tx.
2076 final String transactionID2 = "tx2";
2077 final MutableCompositeModification modification2 = new MutableCompositeModification();
2078 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2079 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2081 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2082 cohort2, modification2, true, false), getRef());
2083 expectMsgClass(duration, ReadyTransactionReply.class);
2085 // Ready the third Tx.
2087 final String transactionID3 = "tx3";
2088 final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2089 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2090 .apply(modification3);
2091 modification3.ready();
2092 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2094 shard.tell(readyMessage, getRef());
2096 // Commit the first Tx. After completing, the second should expire from the queue and the third
2099 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2100 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2102 // Expect commit reply from the third Tx.
2104 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2106 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2107 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2109 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2114 public void testCanCommitBeforeReadyFailure() throws Throwable {
2115 new ShardTestKit(getSystem()) {{
2116 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2117 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2118 "testCanCommitBeforeReadyFailure");
2120 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2121 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2123 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2128 public void testAbortCurrentTransaction() throws Throwable {
2129 new ShardTestKit(getSystem()) {{
2130 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2131 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2132 "testAbortCurrentTransaction");
2134 waitUntilLeader(shard);
2136 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2138 final String transactionID1 = "tx1";
2139 final MutableCompositeModification modification1 = new MutableCompositeModification();
2140 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2141 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2142 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2144 final String transactionID2 = "tx2";
2145 final MutableCompositeModification modification2 = new MutableCompositeModification();
2146 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2147 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2149 final FiniteDuration duration = duration("5 seconds");
2150 final Timeout timeout = new Timeout(duration);
2152 // Simulate the ForwardedReadyTransaction messages that would be sent
2153 // by the ShardTransaction.
2155 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2156 cohort1, modification1, true, false), getRef());
2157 expectMsgClass(duration, ReadyTransactionReply.class);
2159 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2160 cohort2, modification2, true, false), getRef());
2161 expectMsgClass(duration, ReadyTransactionReply.class);
2163 // Send the CanCommitTransaction message for the first Tx.
2165 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2166 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2167 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2168 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2170 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2171 // processed after the first Tx completes.
2173 final Future<Object> canCommitFuture = Patterns.ask(shard,
2174 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2176 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2179 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2180 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2182 // Wait for the 2nd Tx to complete the canCommit phase.
2184 Await.ready(canCommitFuture, duration);
2186 final InOrder inOrder = inOrder(cohort1, cohort2);
2187 inOrder.verify(cohort1).canCommit();
2188 inOrder.verify(cohort2).canCommit();
2190 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2195 public void testAbortQueuedTransaction() throws Throwable {
2196 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2197 new ShardTestKit(getSystem()) {{
2198 final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2199 @SuppressWarnings("serial")
2200 final Creator<Shard> creator = new Creator<Shard>() {
2202 public Shard create() throws Exception {
2203 return new Shard(shardID, Collections.<String,String>emptyMap(),
2204 dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2206 public void onReceiveCommand(final Object message) throws Exception {
2207 super.onReceiveCommand(message);
2208 if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2209 if(cleaupCheckLatch.get() != null) {
2210 cleaupCheckLatch.get().countDown();
2218 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2219 Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2220 Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2222 waitUntilLeader(shard);
2224 final String transactionID = "tx1";
2226 final MutableCompositeModification modification = new MutableCompositeModification();
2227 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2228 doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2230 final FiniteDuration duration = duration("5 seconds");
2234 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2235 cohort, modification, true, false), getRef());
2236 expectMsgClass(duration, ReadyTransactionReply.class);
2238 assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2240 // Send the AbortTransaction message.
2242 shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2243 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2245 verify(cohort).abort();
2247 // Verify the tx cohort is removed from queue at the cleanup check interval.
2249 cleaupCheckLatch.set(new CountDownLatch(1));
2250 assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2251 cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2253 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2255 // Now send CanCommitTransaction - should fail.
2257 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2259 Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2260 assertTrue("Failure type", failure instanceof IllegalStateException);
2262 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2267 public void testCreateSnapshot() throws Exception {
2268 testCreateSnapshot(true, "testCreateSnapshot");
2272 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2273 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2276 @SuppressWarnings("serial")
2277 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2279 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2281 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2282 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2283 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2288 public void saveSnapshot(final Object o) {
2289 savedSnapshot.set(o);
2290 super.saveSnapshot(o);
2294 dataStoreContextBuilder.persistent(persistent);
2296 new ShardTestKit(getSystem()) {{
2297 class TestShard extends Shard {
2299 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2300 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2301 super(name, peerAddresses, datastoreContext, schemaContext);
2302 setPersistence(new TestPersistentDataProvider(super.persistence()));
2306 public void handleCommand(final Object message) {
2307 super.handleCommand(message);
2309 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2310 latch.get().countDown();
2315 public RaftActorContext getRaftActorContext() {
2316 return super.getRaftActorContext();
2320 final Creator<Shard> creator = new Creator<Shard>() {
2322 public Shard create() throws Exception {
2323 return new TestShard(shardID, Collections.<String,String>emptyMap(),
2324 newDatastoreContext(), SCHEMA_CONTEXT);
2328 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2329 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2331 waitUntilLeader(shard);
2333 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2335 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2337 // Trigger creation of a snapshot by ensuring
2338 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2339 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2341 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2343 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2344 savedSnapshot.get() instanceof Snapshot);
2346 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2348 latch.set(new CountDownLatch(1));
2349 savedSnapshot.set(null);
2351 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2353 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2355 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2356 savedSnapshot.get() instanceof Snapshot);
2358 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2360 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2363 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2365 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2366 assertEquals("Root node", expectedRoot, actual);
2372 * This test simply verifies that the applySnapShot logic will work
2373 * @throws ReadFailedException
2374 * @throws DataValidationFailedException
2377 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2378 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2379 store.setSchemaContext(SCHEMA_CONTEXT);
2381 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2382 putTransaction.write(TestModel.TEST_PATH,
2383 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2384 commitTransaction(store, putTransaction);
2387 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2389 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2391 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2392 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2394 commitTransaction(store, writeTransaction);
2396 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2398 assertEquals(expected, actual);
2402 public void testRecoveryApplicable(){
2404 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2405 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2407 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2408 persistentContext, SCHEMA_CONTEXT);
2410 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2411 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2413 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2414 nonPersistentContext, SCHEMA_CONTEXT);
2416 new ShardTestKit(getSystem()) {{
2417 final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2418 persistentProps, "testPersistence1");
2420 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2422 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2424 final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2425 nonPersistentProps, "testPersistence2");
2427 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2429 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2436 public void testOnDatastoreContext() {
2437 new ShardTestKit(getSystem()) {{
2438 dataStoreContextBuilder.persistent(true);
2440 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2442 assertEquals("isRecoveryApplicable", true,
2443 shard.underlyingActor().persistence().isRecoveryApplicable());
2445 waitUntilLeader(shard);
2447 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2449 assertEquals("isRecoveryApplicable", false,
2450 shard.underlyingActor().persistence().isRecoveryApplicable());
2452 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2454 assertEquals("isRecoveryApplicable", true,
2455 shard.underlyingActor().persistence().isRecoveryApplicable());
2457 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2462 public void testRegisterRoleChangeListener() throws Exception {
2463 new ShardTestKit(getSystem()) {
2465 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2466 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2467 "testRegisterRoleChangeListener");
2469 waitUntilLeader(shard);
2471 final TestActorRef<MessageCollectorActor> listener =
2472 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2474 shard.tell(new RegisterRoleChangeListener(), listener);
2476 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2478 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2479 ShardLeaderStateChanged.class);
2480 assertEquals("getLocalShardDataTree present", true,
2481 leaderStateChanged.getLocalShardDataTree().isPresent());
2482 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2483 leaderStateChanged.getLocalShardDataTree().get());
2485 MessageCollectorActor.clearMessages(listener);
2487 // Force a leader change
2489 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2491 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2492 ShardLeaderStateChanged.class);
2493 assertEquals("getLocalShardDataTree present", false,
2494 leaderStateChanged.getLocalShardDataTree().isPresent());
2496 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2502 public void testFollowerInitialSyncStatus() throws Exception {
2503 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2504 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2505 "testFollowerInitialSyncStatus");
2507 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2509 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2511 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2513 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2515 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2518 private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2519 modification.ready();
2520 store.validate(modification);
2521 store.commit(store.prepare(modification));
2525 public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2526 new ShardTestKit(getSystem()) {{
2527 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2528 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2529 final Creator<Shard> creator = new Creator<Shard>() {
2530 boolean firstElectionTimeout = true;
2533 public Shard create() throws Exception {
2534 return new Shard(shardID, Collections.<String,String>emptyMap(),
2535 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2537 public void onReceiveCommand(final Object message) throws Exception {
2538 if(message instanceof ElectionTimeout && firstElectionTimeout) {
2539 firstElectionTimeout = false;
2540 final ActorRef self = getSelf();
2544 Uninterruptibles.awaitUninterruptibly(
2545 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2546 self.tell(message, self);
2550 onFirstElectionTimeout.countDown();
2552 super.onReceiveCommand(message);
2559 final MockDataChangeListener listener = new MockDataChangeListener(1);
2560 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2561 "testDataChangeListenerOnFollower-DataChangeListener");
2563 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2564 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
2565 withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
2567 assertEquals("Got first ElectionTimeout", true,
2568 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2570 shard.tell(new FindLeader(), getRef());
2571 final FindLeaderReply findLeadeReply =
2572 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2573 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2575 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2577 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2578 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2579 RegisterChangeListenerReply.class);
2580 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2582 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2584 onChangeListenerRegistered.countDown();
2586 listener.waitForChangeEvents();
2588 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2589 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2594 public void testClusteredDataChangeListernerRegistration() throws Exception {
2595 new ShardTestKit(getSystem()) {{
2596 final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2597 .shardName("inventory").type("config").build();
2599 final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2600 .shardName("inventory").type("config").build();
2601 final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2604 public Shard create() throws Exception {
2605 return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
2606 "akka://test/user/" + member2ShardID.toString()),
2607 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2609 public void onReceiveCommand(final Object message) throws Exception {
2611 if(!(message instanceof ElectionTimeout)) {
2612 super.onReceiveCommand(message);
2619 final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2622 public Shard create() throws Exception {
2623 return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
2624 "akka://test/user/" + member1ShardID.toString()),
2625 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
2630 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2631 Props.create(new DelegatingShardCreator(followerShardCreator)),
2632 member1ShardID.toString());
2634 final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2635 Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
2636 member2ShardID.toString());
2637 // Sleep to let election happen
2638 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2640 shard.tell(new FindLeader(), getRef());
2641 final FindLeaderReply findLeaderReply =
2642 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2643 assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2645 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2646 final MockDataChangeListener listener = new MockDataChangeListener(1);
2647 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2648 "testDataChangeListenerOnFollower-DataChangeListener");
2650 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2651 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2652 RegisterChangeListenerReply.class);
2653 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2655 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2657 listener.waitForChangeEvents();
2659 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2660 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());