1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.mockito.Mockito.reset;
12 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.persistence.SaveSnapshotSuccess;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
48 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
55 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
63 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
70 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
71 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
73 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
74 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
75 import org.opendaylight.controller.cluster.raft.RaftActorContext;
76 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
77 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
78 import org.opendaylight.controller.cluster.raft.Snapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
82 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
83 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
84 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
85 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
86 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
87 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
88 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
89 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
90 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
91 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
92 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
93 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
94 import org.opendaylight.yangtools.yang.common.QName;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
98 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
99 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
100 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
102 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
103 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
105 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
109 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
110 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
111 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
112 import scala.concurrent.Await;
113 import scala.concurrent.Future;
114 import scala.concurrent.duration.FiniteDuration;
116 public class ShardTest extends AbstractShardTest {
117 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
120 public void testRegisterChangeListener() throws Exception {
121 new ShardTestKit(getSystem()) {{
122 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
123 newShardProps(), "testRegisterChangeListener");
125 waitUntilLeader(shard);
127 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
129 MockDataChangeListener listener = new MockDataChangeListener(1);
130 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
131 "testRegisterChangeListener-DataChangeListener");
133 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
134 dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
136 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
137 RegisterChangeListenerReply.class);
138 String replyPath = reply.getListenerRegistrationPath().toString();
139 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
140 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
142 YangInstanceIdentifier path = TestModel.TEST_PATH;
143 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
145 listener.waitForChangeEvents(path);
147 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
148 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
152 @SuppressWarnings("serial")
154 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
155 // This test tests the timing window in which a change listener is registered before the
156 // shard becomes the leader. We verify that the listener is registered and notified of the
157 // existing data when the shard becomes the leader.
158 new ShardTestKit(getSystem()) {{
159 // For this test, we want to send the RegisterChangeListener message after the shard
160 // has recovered from persistence and before it becomes the leader. So we subclass
161 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
162 // we know that the shard has been initialized to a follower and has started the
163 // election process. The following 2 CountDownLatches are used to coordinate the
164 // ElectionTimeout with the sending of the RegisterChangeListener message.
165 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
166 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
167 Creator<Shard> creator = new Creator<Shard>() {
168 boolean firstElectionTimeout = true;
171 public Shard create() throws Exception {
172 // Use a non persistent provider because this test actually invokes persist on the journal
173 // this will cause all other messages to not be queued properly after that.
174 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
175 // it does do a persist)
176 return new Shard(shardID, Collections.<String,String>emptyMap(),
177 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
179 public void onReceiveCommand(final Object message) throws Exception {
180 if(message instanceof ElectionTimeout && firstElectionTimeout) {
181 // Got the first ElectionTimeout. We don't forward it to the
182 // base Shard yet until we've sent the RegisterChangeListener
183 // message. So we signal the onFirstElectionTimeout latch to tell
184 // the main thread to send the RegisterChangeListener message and
185 // start a thread to wait on the onChangeListenerRegistered latch,
186 // which the main thread signals after it has sent the message.
187 // After the onChangeListenerRegistered is triggered, we send the
188 // original ElectionTimeout message to proceed with the election.
189 firstElectionTimeout = false;
190 final ActorRef self = getSelf();
194 Uninterruptibles.awaitUninterruptibly(
195 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
196 self.tell(message, self);
200 onFirstElectionTimeout.countDown();
202 super.onReceiveCommand(message);
209 MockDataChangeListener listener = new MockDataChangeListener(1);
210 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
211 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
213 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
214 Props.create(new DelegatingShardCreator(creator)),
215 "testRegisterChangeListenerWhenNotLeaderInitially");
217 // Write initial data into the in-memory store.
218 YangInstanceIdentifier path = TestModel.TEST_PATH;
219 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
221 // Wait until the shard receives the first ElectionTimeout message.
222 assertEquals("Got first ElectionTimeout", true,
223 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
225 // Now send the RegisterChangeListener and wait for the reply.
226 shard.tell(new RegisterChangeListener(path, dclActor,
227 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
229 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
230 RegisterChangeListenerReply.class);
231 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
233 // Sanity check - verify the shard is not the leader yet.
234 shard.tell(new FindLeader(), getRef());
235 FindLeaderReply findLeadeReply =
236 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
237 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
239 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
240 // with the election process.
241 onChangeListenerRegistered.countDown();
243 // Wait for the shard to become the leader and notify our listener with the existing
244 // data in the store.
245 listener.waitForChangeEvents(path);
247 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
248 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
253 public void testRegisterDataTreeChangeListener() throws Exception {
254 new ShardTestKit(getSystem()) {{
255 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
256 newShardProps(), "testRegisterDataTreeChangeListener");
258 waitUntilLeader(shard);
260 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
262 MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
263 ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
264 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
266 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
268 RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
269 RegisterDataTreeChangeListenerReply.class);
270 String replyPath = reply.getListenerRegistrationPath().toString();
271 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
272 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
274 YangInstanceIdentifier path = TestModel.TEST_PATH;
275 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
277 listener.waitForChangeEvents();
279 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
280 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
284 @SuppressWarnings("serial")
286 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
287 new ShardTestKit(getSystem()) {{
288 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
289 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
290 Creator<Shard> creator = new Creator<Shard>() {
291 boolean firstElectionTimeout = true;
294 public Shard create() throws Exception {
295 return new Shard(shardID, Collections.<String,String>emptyMap(),
296 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
298 public void onReceiveCommand(final Object message) throws Exception {
299 if(message instanceof ElectionTimeout && firstElectionTimeout) {
300 firstElectionTimeout = false;
301 final ActorRef self = getSelf();
305 Uninterruptibles.awaitUninterruptibly(
306 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
307 self.tell(message, self);
311 onFirstElectionTimeout.countDown();
313 super.onReceiveCommand(message);
320 MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
321 ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
322 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
324 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
325 Props.create(new DelegatingShardCreator(creator)),
326 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
328 YangInstanceIdentifier path = TestModel.TEST_PATH;
329 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
331 assertEquals("Got first ElectionTimeout", true,
332 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
334 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
335 RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
336 RegisterDataTreeChangeListenerReply.class);
337 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
339 shard.tell(new FindLeader(), getRef());
340 FindLeaderReply findLeadeReply =
341 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
342 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
344 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
346 onChangeListenerRegistered.countDown();
348 // TODO: investigate why we do not receive data chage events
349 listener.waitForChangeEvents();
351 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
352 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
357 public void testCreateTransaction(){
358 new ShardTestKit(getSystem()) {{
359 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
361 waitUntilLeader(shard);
363 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
365 shard.tell(new CreateTransaction("txn-1",
366 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
368 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
369 CreateTransactionReply.class);
371 String path = reply.getTransactionActorPath().toString();
372 assertTrue("Unexpected transaction path " + path,
373 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
375 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
380 public void testCreateTransactionOnChain(){
381 new ShardTestKit(getSystem()) {{
382 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
384 waitUntilLeader(shard);
386 shard.tell(new CreateTransaction("txn-1",
387 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
390 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
391 CreateTransactionReply.class);
393 String path = reply.getTransactionActorPath().toString();
394 assertTrue("Unexpected transaction path " + path,
395 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
397 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
401 @SuppressWarnings("serial")
403 public void testPeerAddressResolved() throws Exception {
404 new ShardTestKit(getSystem()) {{
405 final CountDownLatch recoveryComplete = new CountDownLatch(1);
406 class TestShard extends Shard {
408 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
409 newDatastoreContext(), SCHEMA_CONTEXT);
412 Map<String, String> getPeerAddresses() {
413 return getRaftActorContext().getPeerAddresses();
417 protected void onRecoveryComplete() {
419 super.onRecoveryComplete();
421 recoveryComplete.countDown();
426 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
427 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
429 public TestShard create() throws Exception {
430 return new TestShard();
432 })), "testPeerAddressResolved");
434 //waitUntilLeader(shard);
435 assertEquals("Recovery complete", true,
436 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
438 String address = "akka://foobar";
439 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
441 assertEquals("getPeerAddresses", address,
442 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
444 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
449 public void testApplySnapshot() throws Exception {
450 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
451 "testApplySnapshot");
453 DataTree store = InMemoryDataTreeFactory.getInstance().create();
454 store.setSchemaContext(SCHEMA_CONTEXT);
456 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
458 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
459 NormalizedNode<?,?> expected = readStore(store, root);
461 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
462 SerializationUtils.serializeNormalizedNode(expected),
463 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
465 shard.underlyingActor().onReceiveCommand(applySnapshot);
467 NormalizedNode<?,?> actual = readStore(shard, root);
469 assertEquals("Root node", expected, actual);
471 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
475 public void testApplyState() throws Exception {
477 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
479 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
481 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
482 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
484 shard.underlyingActor().onReceiveCommand(applyState);
486 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
487 assertEquals("Applied state", node, actual);
489 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
493 public void testApplyStateWithCandidatePayload() throws Exception {
495 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
497 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
498 DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
500 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
501 DataTreeCandidatePayload.create(candidate)));
503 shard.underlyingActor().onReceiveCommand(applyState);
505 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
506 assertEquals("Applied state", node, actual);
508 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
511 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
512 DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
513 testStore.setSchemaContext(SCHEMA_CONTEXT);
515 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
517 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
519 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
520 SerializationUtils.serializeNormalizedNode(root),
521 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
525 private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
526 source.validate(mod);
527 final DataTreeCandidate candidate = source.prepare(mod);
528 source.commit(candidate);
529 return DataTreeCandidatePayload.create(candidate);
533 public void testDataTreeCandidateRecovery() throws Exception {
534 // Set up the InMemorySnapshotStore.
535 final DataTree source = setupInMemorySnapshotStore();
537 final DataTreeModification writeMod = source.takeSnapshot().newModification();
538 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
540 // Set up the InMemoryJournal.
541 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
543 int nListEntries = 16;
544 Set<Integer> listEntryKeys = new HashSet<>();
546 // Add some ModificationPayload entries
547 for (int i = 1; i <= nListEntries; i++) {
548 listEntryKeys.add(Integer.valueOf(i));
550 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
551 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
553 final DataTreeModification mod = source.takeSnapshot().newModification();
554 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
556 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
557 payloadForModification(source, mod)));
560 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
561 new ApplyJournalEntries(nListEntries));
563 testRecovery(listEntryKeys);
567 public void testModicationRecovery() throws Exception {
569 // Set up the InMemorySnapshotStore.
570 setupInMemorySnapshotStore();
572 // Set up the InMemoryJournal.
574 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
575 new WriteModification(TestModel.OUTER_LIST_PATH,
576 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
578 int nListEntries = 16;
579 Set<Integer> listEntryKeys = new HashSet<>();
581 // Add some ModificationPayload entries
582 for(int i = 1; i <= nListEntries; i++) {
583 listEntryKeys.add(Integer.valueOf(i));
584 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
585 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
586 Modification mod = new MergeModification(path,
587 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
588 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
589 newModificationPayload(mod)));
592 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
593 new ApplyJournalEntries(nListEntries));
595 testRecovery(listEntryKeys);
598 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
599 MutableCompositeModification compMod = new MutableCompositeModification();
600 for(Modification mod: mods) {
601 compMod.addModification(mod);
604 return new ModificationPayload(compMod);
608 public void testConcurrentThreePhaseCommits() throws Throwable {
609 new ShardTestKit(getSystem()) {{
610 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
611 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
612 "testConcurrentThreePhaseCommits");
614 waitUntilLeader(shard);
616 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
618 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
620 String transactionID1 = "tx1";
621 MutableCompositeModification modification1 = new MutableCompositeModification();
622 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
623 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
625 String transactionID2 = "tx2";
626 MutableCompositeModification modification2 = new MutableCompositeModification();
627 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
628 TestModel.OUTER_LIST_PATH,
629 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
632 String transactionID3 = "tx3";
633 MutableCompositeModification modification3 = new MutableCompositeModification();
634 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
635 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
636 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
637 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
641 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
642 final Timeout timeout = new Timeout(duration);
644 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
645 // by the ShardTransaction.
647 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
648 cohort1, modification1, true, false), getRef());
649 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
650 expectMsgClass(duration, ReadyTransactionReply.class));
651 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
653 // Send the CanCommitTransaction message for the first Tx.
655 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
656 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
657 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
658 assertEquals("Can commit", true, canCommitReply.getCanCommit());
660 // Send the ForwardedReadyTransaction for the next 2 Tx's.
662 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
663 cohort2, modification2, true, false), getRef());
664 expectMsgClass(duration, ReadyTransactionReply.class);
666 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
667 cohort3, modification3, true, false), getRef());
668 expectMsgClass(duration, ReadyTransactionReply.class);
670 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
671 // processed after the first Tx completes.
673 Future<Object> canCommitFuture1 = Patterns.ask(shard,
674 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
676 Future<Object> canCommitFuture2 = Patterns.ask(shard,
677 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
679 // Send the CommitTransaction message for the first Tx. After it completes, it should
680 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
682 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
683 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
685 // Wait for the next 2 Tx's to complete.
687 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
688 final CountDownLatch commitLatch = new CountDownLatch(2);
690 class OnFutureComplete extends OnComplete<Object> {
691 private final Class<?> expRespType;
693 OnFutureComplete(final Class<?> expRespType) {
694 this.expRespType = expRespType;
698 public void onComplete(final Throwable error, final Object resp) {
700 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
703 assertEquals("Commit response type", expRespType, resp.getClass());
705 } catch (Exception e) {
711 void onSuccess(final Object resp) throws Exception {
715 class OnCommitFutureComplete extends OnFutureComplete {
716 OnCommitFutureComplete() {
717 super(CommitTransactionReply.SERIALIZABLE_CLASS);
721 public void onComplete(final Throwable error, final Object resp) {
722 super.onComplete(error, resp);
723 commitLatch.countDown();
727 class OnCanCommitFutureComplete extends OnFutureComplete {
728 private final String transactionID;
730 OnCanCommitFutureComplete(final String transactionID) {
731 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
732 this.transactionID = transactionID;
736 void onSuccess(final Object resp) throws Exception {
737 CanCommitTransactionReply canCommitReply =
738 CanCommitTransactionReply.fromSerializable(resp);
739 assertEquals("Can commit", true, canCommitReply.getCanCommit());
741 Future<Object> commitFuture = Patterns.ask(shard,
742 new CommitTransaction(transactionID).toSerializable(), timeout);
743 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
747 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
748 getSystem().dispatcher());
750 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
751 getSystem().dispatcher());
753 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
755 if(caughtEx.get() != null) {
756 throw caughtEx.get();
759 assertEquals("Commits complete", true, done);
761 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
762 inOrder.verify(cohort1).canCommit();
763 inOrder.verify(cohort1).preCommit();
764 inOrder.verify(cohort1).commit();
765 inOrder.verify(cohort2).canCommit();
766 inOrder.verify(cohort2).preCommit();
767 inOrder.verify(cohort2).commit();
768 inOrder.verify(cohort3).canCommit();
769 inOrder.verify(cohort3).preCommit();
770 inOrder.verify(cohort3).commit();
772 // Verify data in the data store.
774 verifyOuterListEntry(shard, 1);
776 verifyLastApplied(shard, 2);
778 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
782 private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
783 NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
784 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
787 private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
788 YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
789 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
790 batched.addModification(new WriteModification(path, data));
791 batched.setReady(ready);
792 batched.setDoCommitOnReady(doCommitOnReady);
797 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
798 new ShardTestKit(getSystem()) {{
799 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
800 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
801 "testBatchedModificationsWithNoCommitOnReady");
803 waitUntilLeader(shard);
805 final String transactionID = "tx";
806 FiniteDuration duration = duration("5 seconds");
808 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
809 ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
811 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
812 if(mockCohort.get() == null) {
813 mockCohort.set(createDelegatingMockCohort("cohort", actual));
816 return mockCohort.get();
820 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
822 // Send a BatchedModifications to start a transaction.
824 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
825 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
826 expectMsgClass(duration, BatchedModificationsReply.class);
828 // Send a couple more BatchedModifications.
830 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
831 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
832 expectMsgClass(duration, BatchedModificationsReply.class);
834 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
835 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
836 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
837 expectMsgClass(duration, ReadyTransactionReply.class);
839 // Send the CanCommitTransaction message.
841 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
842 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
843 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
844 assertEquals("Can commit", true, canCommitReply.getCanCommit());
846 // Send the CanCommitTransaction message.
848 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
849 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
851 InOrder inOrder = inOrder(mockCohort.get());
852 inOrder.verify(mockCohort.get()).canCommit();
853 inOrder.verify(mockCohort.get()).preCommit();
854 inOrder.verify(mockCohort.get()).commit();
856 // Verify data in the data store.
858 verifyOuterListEntry(shard, 1);
860 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
865 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
866 new ShardTestKit(getSystem()) {{
867 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
868 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
869 "testBatchedModificationsWithCommitOnReady");
871 waitUntilLeader(shard);
873 final String transactionID = "tx";
874 FiniteDuration duration = duration("5 seconds");
876 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
877 ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
879 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
880 if(mockCohort.get() == null) {
881 mockCohort.set(createDelegatingMockCohort("cohort", actual));
884 return mockCohort.get();
888 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
890 // Send a BatchedModifications to start a transaction.
892 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
893 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
894 expectMsgClass(duration, BatchedModificationsReply.class);
896 // Send a couple more BatchedModifications.
898 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
899 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
900 expectMsgClass(duration, BatchedModificationsReply.class);
902 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
903 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
904 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
906 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
908 InOrder inOrder = inOrder(mockCohort.get());
909 inOrder.verify(mockCohort.get()).canCommit();
910 inOrder.verify(mockCohort.get()).preCommit();
911 inOrder.verify(mockCohort.get()).commit();
913 // Verify data in the data store.
915 verifyOuterListEntry(shard, 1);
917 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
921 @SuppressWarnings("unchecked")
922 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
923 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
924 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
925 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
926 outerList.getValue() instanceof Iterable);
927 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
928 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
929 entry instanceof MapEntryNode);
930 MapEntryNode mapEntry = (MapEntryNode)entry;
931 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
932 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
933 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
934 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
938 public void testBatchedModificationsOnTransactionChain() throws Throwable {
939 new ShardTestKit(getSystem()) {{
940 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
941 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
942 "testBatchedModificationsOnTransactionChain");
944 waitUntilLeader(shard);
946 String transactionChainID = "txChain";
947 String transactionID1 = "tx1";
948 String transactionID2 = "tx2";
950 FiniteDuration duration = duration("5 seconds");
952 // Send a BatchedModifications to start a chained write transaction and ready it.
954 ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
955 YangInstanceIdentifier path = TestModel.TEST_PATH;
956 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
957 containerNode, true, false), getRef());
958 expectMsgClass(duration, ReadyTransactionReply.class);
960 // Create a read Tx on the same chain.
962 shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
963 transactionChainID).toSerializable(), getRef());
965 CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
967 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
968 ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
969 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
971 // Commit the write transaction.
973 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
974 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
975 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
976 assertEquals("Can commit", true, canCommitReply.getCanCommit());
978 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
979 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
981 // Verify data in the data store.
983 NormalizedNode<?, ?> actualNode = readStore(shard, path);
984 assertEquals("Stored node", containerNode, actualNode);
986 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
991 public void testOnBatchedModificationsWhenNotLeader() {
992 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
993 new ShardTestKit(getSystem()) {{
994 Creator<Shard> creator = new Creator<Shard>() {
995 private static final long serialVersionUID = 1L;
998 public Shard create() throws Exception {
999 return new Shard(shardID, Collections.<String,String>emptyMap(),
1000 newDatastoreContext(), SCHEMA_CONTEXT) {
1002 protected boolean isLeader() {
1003 return overrideLeaderCalls.get() ? false : super.isLeader();
1007 protected ActorSelection getLeader() {
1008 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1015 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1016 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1018 waitUntilLeader(shard);
1020 overrideLeaderCalls.set(true);
1022 BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1024 shard.tell(batched, ActorRef.noSender());
1026 expectMsgEquals(batched);
1028 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1033 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1034 new ShardTestKit(getSystem()) {{
1035 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1036 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1037 "testForwardedReadyTransactionWithImmediateCommit");
1039 waitUntilLeader(shard);
1041 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1043 String transactionID = "tx1";
1044 MutableCompositeModification modification = new MutableCompositeModification();
1045 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1046 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1047 TestModel.TEST_PATH, containerNode, modification);
1049 FiniteDuration duration = duration("5 seconds");
1051 // Simulate the ForwardedReadyTransaction messages that would be sent
1052 // by the ShardTransaction.
1054 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1055 cohort, modification, true, true), getRef());
1057 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1059 InOrder inOrder = inOrder(cohort);
1060 inOrder.verify(cohort).canCommit();
1061 inOrder.verify(cohort).preCommit();
1062 inOrder.verify(cohort).commit();
1064 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1065 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1067 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1072 public void testCommitWithPersistenceDisabled() throws Throwable {
1073 dataStoreContextBuilder.persistent(false);
1074 new ShardTestKit(getSystem()) {{
1075 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1076 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1077 "testCommitWithPersistenceDisabled");
1079 waitUntilLeader(shard);
1081 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1083 // Setup a simulated transactions with a mock cohort.
1085 String transactionID = "tx";
1086 MutableCompositeModification modification = new MutableCompositeModification();
1087 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1088 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1089 TestModel.TEST_PATH, containerNode, modification);
1091 FiniteDuration duration = duration("5 seconds");
1093 // Simulate the ForwardedReadyTransaction messages that would be sent
1094 // by the ShardTransaction.
1096 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1097 cohort, modification, true, false), getRef());
1098 expectMsgClass(duration, ReadyTransactionReply.class);
1100 // Send the CanCommitTransaction message.
1102 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1103 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1104 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1105 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1107 // Send the CanCommitTransaction message.
1109 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1110 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1112 InOrder inOrder = inOrder(cohort);
1113 inOrder.verify(cohort).canCommit();
1114 inOrder.verify(cohort).preCommit();
1115 inOrder.verify(cohort).commit();
1117 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1118 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1120 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1124 private static DataTreeCandidateTip mockCandidate(final String name) {
1125 DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1126 DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1127 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1128 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1129 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1130 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1131 return mockCandidate;
1134 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1135 DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1136 DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1137 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1138 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1139 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1140 return mockCandidate;
1144 public void testCommitWhenTransactionHasNoModifications(){
1145 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1146 // but here that need not happen
1147 new ShardTestKit(getSystem()) {
1149 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1150 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1151 "testCommitWhenTransactionHasNoModifications");
1153 waitUntilLeader(shard);
1155 String transactionID = "tx1";
1156 MutableCompositeModification modification = new MutableCompositeModification();
1157 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1158 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1159 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1160 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1161 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1163 FiniteDuration duration = duration("5 seconds");
1165 // Simulate the ForwardedReadyTransaction messages that would be sent
1166 // by the ShardTransaction.
1168 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1169 cohort, modification, true, false), getRef());
1170 expectMsgClass(duration, ReadyTransactionReply.class);
1172 // Send the CanCommitTransaction message.
1174 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1175 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1176 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1177 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1179 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1180 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1182 InOrder inOrder = inOrder(cohort);
1183 inOrder.verify(cohort).canCommit();
1184 inOrder.verify(cohort).preCommit();
1185 inOrder.verify(cohort).commit();
1187 // Use MBean for verification
1188 // Committed transaction count should increase as usual
1189 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1191 // Commit index should not advance because this does not go into the journal
1192 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
1194 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1201 public void testCommitWhenTransactionHasModifications(){
1202 new ShardTestKit(getSystem()) {
1204 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1205 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1206 "testCommitWhenTransactionHasModifications");
1208 waitUntilLeader(shard);
1210 String transactionID = "tx1";
1211 MutableCompositeModification modification = new MutableCompositeModification();
1212 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1213 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1214 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1215 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1216 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1217 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1219 FiniteDuration duration = duration("5 seconds");
1221 // Simulate the ForwardedReadyTransaction messages that would be sent
1222 // by the ShardTransaction.
1224 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1225 cohort, modification, true, false), getRef());
1226 expectMsgClass(duration, ReadyTransactionReply.class);
1228 // Send the CanCommitTransaction message.
1230 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1231 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1232 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1233 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1235 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1236 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1238 InOrder inOrder = inOrder(cohort);
1239 inOrder.verify(cohort).canCommit();
1240 inOrder.verify(cohort).preCommit();
1241 inOrder.verify(cohort).commit();
1243 // Use MBean for verification
1244 // Committed transaction count should increase as usual
1245 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1247 // Commit index should advance as we do not have an empty modification
1248 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
1250 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1257 public void testCommitPhaseFailure() throws Throwable {
1258 new ShardTestKit(getSystem()) {{
1259 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1260 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1261 "testCommitPhaseFailure");
1263 waitUntilLeader(shard);
1265 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1268 String transactionID1 = "tx1";
1269 MutableCompositeModification modification1 = new MutableCompositeModification();
1270 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1271 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1272 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1273 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1274 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1276 String transactionID2 = "tx2";
1277 MutableCompositeModification modification2 = new MutableCompositeModification();
1278 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1279 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1281 FiniteDuration duration = duration("5 seconds");
1282 final Timeout timeout = new Timeout(duration);
1284 // Simulate the ForwardedReadyTransaction messages that would be sent
1285 // by the ShardTransaction.
1287 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1288 cohort1, modification1, true, false), getRef());
1289 expectMsgClass(duration, ReadyTransactionReply.class);
1291 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1292 cohort2, modification2, true, false), getRef());
1293 expectMsgClass(duration, ReadyTransactionReply.class);
1295 // Send the CanCommitTransaction message for the first Tx.
1297 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1298 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1299 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1300 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1302 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1303 // processed after the first Tx completes.
1305 Future<Object> canCommitFuture = Patterns.ask(shard,
1306 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1308 // Send the CommitTransaction message for the first Tx. This should send back an error
1309 // and trigger the 2nd Tx to proceed.
1311 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1312 expectMsgClass(duration, akka.actor.Status.Failure.class);
1314 // Wait for the 2nd Tx to complete the canCommit phase.
1316 final CountDownLatch latch = new CountDownLatch(1);
1317 canCommitFuture.onComplete(new OnComplete<Object>() {
1319 public void onComplete(final Throwable t, final Object resp) {
1322 }, getSystem().dispatcher());
1324 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1326 InOrder inOrder = inOrder(cohort1, cohort2);
1327 inOrder.verify(cohort1).canCommit();
1328 inOrder.verify(cohort1).preCommit();
1329 inOrder.verify(cohort1).commit();
1330 inOrder.verify(cohort2).canCommit();
1332 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1337 public void testPreCommitPhaseFailure() throws Throwable {
1338 new ShardTestKit(getSystem()) {{
1339 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1340 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1341 "testPreCommitPhaseFailure");
1343 waitUntilLeader(shard);
1345 String transactionID1 = "tx1";
1346 MutableCompositeModification modification1 = new MutableCompositeModification();
1347 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1348 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1349 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1351 String transactionID2 = "tx2";
1352 MutableCompositeModification modification2 = new MutableCompositeModification();
1353 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1354 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1356 FiniteDuration duration = duration("5 seconds");
1357 final Timeout timeout = new Timeout(duration);
1359 // Simulate the ForwardedReadyTransaction messages that would be sent
1360 // by the ShardTransaction.
1362 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1363 cohort1, modification1, true, false), getRef());
1364 expectMsgClass(duration, ReadyTransactionReply.class);
1366 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1367 cohort2, modification2, true, false), getRef());
1368 expectMsgClass(duration, ReadyTransactionReply.class);
1370 // Send the CanCommitTransaction message for the first Tx.
1372 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1373 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1374 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1375 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1377 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1378 // processed after the first Tx completes.
1380 Future<Object> canCommitFuture = Patterns.ask(shard,
1381 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1383 // Send the CommitTransaction message for the first Tx. This should send back an error
1384 // and trigger the 2nd Tx to proceed.
1386 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1387 expectMsgClass(duration, akka.actor.Status.Failure.class);
1389 // Wait for the 2nd Tx to complete the canCommit phase.
1391 final CountDownLatch latch = new CountDownLatch(1);
1392 canCommitFuture.onComplete(new OnComplete<Object>() {
1394 public void onComplete(final Throwable t, final Object resp) {
1397 }, getSystem().dispatcher());
1399 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1401 InOrder inOrder = inOrder(cohort1, cohort2);
1402 inOrder.verify(cohort1).canCommit();
1403 inOrder.verify(cohort1).preCommit();
1404 inOrder.verify(cohort2).canCommit();
1406 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1411 public void testCanCommitPhaseFailure() throws Throwable {
1412 new ShardTestKit(getSystem()) {{
1413 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1414 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1415 "testCanCommitPhaseFailure");
1417 waitUntilLeader(shard);
1419 final FiniteDuration duration = duration("5 seconds");
1421 String transactionID1 = "tx1";
1422 MutableCompositeModification modification = new MutableCompositeModification();
1423 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1424 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1426 // Simulate the ForwardedReadyTransaction messages that would be sent
1427 // by the ShardTransaction.
1429 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1430 cohort, modification, true, false), getRef());
1431 expectMsgClass(duration, ReadyTransactionReply.class);
1433 // Send the CanCommitTransaction message.
1435 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1436 expectMsgClass(duration, akka.actor.Status.Failure.class);
1438 // Send another can commit to ensure the failed one got cleaned up.
1442 String transactionID2 = "tx2";
1443 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1445 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1446 cohort, modification, true, false), getRef());
1447 expectMsgClass(duration, ReadyTransactionReply.class);
1449 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1450 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1451 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1452 assertEquals("getCanCommit", true, reply.getCanCommit());
1454 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1459 public void testCanCommitPhaseFalseResponse() throws Throwable {
1460 new ShardTestKit(getSystem()) {{
1461 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1462 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1463 "testCanCommitPhaseFalseResponse");
1465 waitUntilLeader(shard);
1467 final FiniteDuration duration = duration("5 seconds");
1469 String transactionID1 = "tx1";
1470 MutableCompositeModification modification = new MutableCompositeModification();
1471 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1472 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1474 // Simulate the ForwardedReadyTransaction messages that would be sent
1475 // by the ShardTransaction.
1477 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1478 cohort, modification, true, false), getRef());
1479 expectMsgClass(duration, ReadyTransactionReply.class);
1481 // Send the CanCommitTransaction message.
1483 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1484 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1485 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1486 assertEquals("getCanCommit", false, reply.getCanCommit());
1488 // Send another can commit to ensure the failed one got cleaned up.
1492 String transactionID2 = "tx2";
1493 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1495 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1496 cohort, modification, true, false), getRef());
1497 expectMsgClass(duration, ReadyTransactionReply.class);
1499 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1500 reply = CanCommitTransactionReply.fromSerializable(
1501 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1502 assertEquals("getCanCommit", true, reply.getCanCommit());
1504 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1509 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1510 new ShardTestKit(getSystem()) {{
1511 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1512 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1513 "testImmediateCommitWithCanCommitPhaseFailure");
1515 waitUntilLeader(shard);
1517 final FiniteDuration duration = duration("5 seconds");
1519 String transactionID1 = "tx1";
1520 MutableCompositeModification modification = new MutableCompositeModification();
1521 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1522 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1524 // Simulate the ForwardedReadyTransaction messages that would be sent
1525 // by the ShardTransaction.
1527 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1528 cohort, modification, true, true), getRef());
1530 expectMsgClass(duration, akka.actor.Status.Failure.class);
1532 // Send another can commit to ensure the failed one got cleaned up.
1536 String transactionID2 = "tx2";
1537 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1538 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1539 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1540 DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1541 DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1542 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1543 doReturn(candidateRoot).when(candidate).getRootNode();
1544 doReturn(candidate).when(cohort).getCandidate();
1546 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1547 cohort, modification, true, true), getRef());
1549 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1551 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1556 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1557 new ShardTestKit(getSystem()) {{
1558 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1559 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1560 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1562 waitUntilLeader(shard);
1564 final FiniteDuration duration = duration("5 seconds");
1566 String transactionID = "tx1";
1567 MutableCompositeModification modification = new MutableCompositeModification();
1568 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1569 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1571 // Simulate the ForwardedReadyTransaction messages that would be sent
1572 // by the ShardTransaction.
1574 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1575 cohort, modification, true, true), getRef());
1577 expectMsgClass(duration, akka.actor.Status.Failure.class);
1579 // Send another can commit to ensure the failed one got cleaned up.
1583 String transactionID2 = "tx2";
1584 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1585 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1586 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1587 DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1588 DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1589 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1590 doReturn(candidateRoot).when(candidate).getRootNode();
1591 doReturn(candidate).when(cohort).getCandidate();
1593 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1594 cohort, modification, true, true), getRef());
1596 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1598 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1603 public void testAbortBeforeFinishCommit() throws Throwable {
1604 new ShardTestKit(getSystem()) {{
1605 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1606 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1607 "testAbortBeforeFinishCommit");
1609 waitUntilLeader(shard);
1611 final FiniteDuration duration = duration("5 seconds");
1612 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1614 final String transactionID = "tx1";
1615 Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1616 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1618 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1619 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1621 // Simulate an AbortTransaction message occurring during replication, after
1622 // persisting and before finishing the commit to the in-memory store.
1623 // We have no followers so due to optimizations in the RaftActor, it does not
1624 // attempt replication and thus we can't send an AbortTransaction message b/c
1625 // it would be processed too late after CommitTransaction completes. So we'll
1626 // simulate an AbortTransaction message occurring during replication by calling
1627 // the shard directly.
1629 shard.underlyingActor().doAbortTransaction(transactionID, null);
1631 return preCommitFuture;
1635 MutableCompositeModification modification = new MutableCompositeModification();
1636 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1637 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1638 modification, preCommit);
1640 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1641 cohort, modification, true, false), getRef());
1642 expectMsgClass(duration, ReadyTransactionReply.class);
1644 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1645 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1646 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1647 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1649 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1650 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1652 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1654 // Since we're simulating an abort occurring during replication and before finish commit,
1655 // the data should still get written to the in-memory store since we've gotten past
1656 // canCommit and preCommit and persisted the data.
1657 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1659 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1664 public void testTransactionCommitTimeout() throws Throwable {
1665 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1667 new ShardTestKit(getSystem()) {{
1668 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1669 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1670 "testTransactionCommitTimeout");
1672 waitUntilLeader(shard);
1674 final FiniteDuration duration = duration("5 seconds");
1676 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1678 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1679 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1680 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1682 // Create 1st Tx - will timeout
1684 String transactionID1 = "tx1";
1685 MutableCompositeModification modification1 = new MutableCompositeModification();
1686 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1687 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1688 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1689 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1694 String transactionID2 = "tx3";
1695 MutableCompositeModification modification2 = new MutableCompositeModification();
1696 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1697 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1698 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1700 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1705 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1706 cohort1, modification1, true, false), getRef());
1707 expectMsgClass(duration, ReadyTransactionReply.class);
1709 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1710 cohort2, modification2, true, false), getRef());
1711 expectMsgClass(duration, ReadyTransactionReply.class);
1713 // canCommit 1st Tx. We don't send the commit so it should timeout.
1715 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1716 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1718 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1720 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1721 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1723 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1725 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1726 expectMsgClass(duration, akka.actor.Status.Failure.class);
1728 // Commit the 2nd Tx.
1730 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1731 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1733 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1734 assertNotNull(listNodePath + " not found", node);
1736 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1741 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1742 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1744 new ShardTestKit(getSystem()) {{
1745 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1746 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1747 "testTransactionCommitQueueCapacityExceeded");
1749 waitUntilLeader(shard);
1751 final FiniteDuration duration = duration("5 seconds");
1753 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1755 String transactionID1 = "tx1";
1756 MutableCompositeModification modification1 = new MutableCompositeModification();
1757 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1758 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1760 String transactionID2 = "tx2";
1761 MutableCompositeModification modification2 = new MutableCompositeModification();
1762 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1763 TestModel.OUTER_LIST_PATH,
1764 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1767 String transactionID3 = "tx3";
1768 MutableCompositeModification modification3 = new MutableCompositeModification();
1769 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1770 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1774 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1775 cohort1, modification1, true, false), getRef());
1776 expectMsgClass(duration, ReadyTransactionReply.class);
1778 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1779 cohort2, modification2, true, false), getRef());
1780 expectMsgClass(duration, ReadyTransactionReply.class);
1782 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1783 cohort3, modification3, true, false), getRef());
1784 expectMsgClass(duration, ReadyTransactionReply.class);
1786 // canCommit 1st Tx.
1788 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1789 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1791 // canCommit the 2nd Tx - it should get queued.
1793 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1795 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1797 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1798 expectMsgClass(duration, akka.actor.Status.Failure.class);
1800 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1805 public void testCanCommitBeforeReadyFailure() throws Throwable {
1806 new ShardTestKit(getSystem()) {{
1807 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1808 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1809 "testCanCommitBeforeReadyFailure");
1811 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1812 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1814 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1819 public void testAbortTransaction() throws Throwable {
1820 new ShardTestKit(getSystem()) {{
1821 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1822 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1823 "testAbortTransaction");
1825 waitUntilLeader(shard);
1827 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1829 String transactionID1 = "tx1";
1830 MutableCompositeModification modification1 = new MutableCompositeModification();
1831 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1832 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1833 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1835 String transactionID2 = "tx2";
1836 MutableCompositeModification modification2 = new MutableCompositeModification();
1837 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1838 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1840 FiniteDuration duration = duration("5 seconds");
1841 final Timeout timeout = new Timeout(duration);
1843 // Simulate the ForwardedReadyTransaction messages that would be sent
1844 // by the ShardTransaction.
1846 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1847 cohort1, modification1, true, false), getRef());
1848 expectMsgClass(duration, ReadyTransactionReply.class);
1850 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1851 cohort2, modification2, true, false), getRef());
1852 expectMsgClass(duration, ReadyTransactionReply.class);
1854 // Send the CanCommitTransaction message for the first Tx.
1856 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1857 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1858 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1859 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1861 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1862 // processed after the first Tx completes.
1864 Future<Object> canCommitFuture = Patterns.ask(shard,
1865 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1867 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1870 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1871 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1873 // Wait for the 2nd Tx to complete the canCommit phase.
1875 Await.ready(canCommitFuture, duration);
1877 InOrder inOrder = inOrder(cohort1, cohort2);
1878 inOrder.verify(cohort1).canCommit();
1879 inOrder.verify(cohort2).canCommit();
1881 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1886 public void testCreateSnapshot() throws Exception {
1887 testCreateSnapshot(true, "testCreateSnapshot");
1891 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1892 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1895 @SuppressWarnings("serial")
1896 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1898 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1900 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1901 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1902 TestPersistentDataProvider(DataPersistenceProvider delegate) {
1907 public void saveSnapshot(Object o) {
1908 savedSnapshot.set(o);
1909 super.saveSnapshot(o);
1913 dataStoreContextBuilder.persistent(persistent);
1915 new ShardTestKit(getSystem()) {{
1916 class TestShard extends Shard {
1918 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1919 DatastoreContext datastoreContext, SchemaContext schemaContext) {
1920 super(name, peerAddresses, datastoreContext, schemaContext);
1921 setPersistence(new TestPersistentDataProvider(super.persistence()));
1925 public void handleCommand(Object message) {
1926 super.handleCommand(message);
1928 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
1929 latch.get().countDown();
1934 public RaftActorContext getRaftActorContext() {
1935 return super.getRaftActorContext();
1939 Creator<Shard> creator = new Creator<Shard>() {
1941 public Shard create() throws Exception {
1942 return new TestShard(shardID, Collections.<String,String>emptyMap(),
1943 newDatastoreContext(), SCHEMA_CONTEXT);
1947 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1948 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1950 waitUntilLeader(shard);
1952 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1954 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1956 // Trigger creation of a snapshot by ensuring
1957 RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1958 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1960 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1962 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1963 savedSnapshot.get() instanceof Snapshot);
1965 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1967 latch.set(new CountDownLatch(1));
1968 savedSnapshot.set(null);
1970 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1972 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1974 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1975 savedSnapshot.get() instanceof Snapshot);
1977 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1979 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1982 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1984 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1985 assertEquals("Root node", expectedRoot, actual);
1991 * This test simply verifies that the applySnapShot logic will work
1992 * @throws ReadFailedException
1993 * @throws DataValidationFailedException
1996 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
1997 DataTree store = InMemoryDataTreeFactory.getInstance().create();
1998 store.setSchemaContext(SCHEMA_CONTEXT);
2000 DataTreeModification putTransaction = store.takeSnapshot().newModification();
2001 putTransaction.write(TestModel.TEST_PATH,
2002 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2003 commitTransaction(store, putTransaction);
2006 NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2008 DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2010 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2011 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2013 commitTransaction(store, writeTransaction);
2015 NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2017 assertEquals(expected, actual);
2021 public void testRecoveryApplicable(){
2023 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2024 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2026 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2027 persistentContext, SCHEMA_CONTEXT);
2029 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2030 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2032 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2033 nonPersistentContext, SCHEMA_CONTEXT);
2035 new ShardTestKit(getSystem()) {{
2036 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2037 persistentProps, "testPersistence1");
2039 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2041 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2043 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2044 nonPersistentProps, "testPersistence2");
2046 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2048 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2055 public void testOnDatastoreContext() {
2056 new ShardTestKit(getSystem()) {{
2057 dataStoreContextBuilder.persistent(true);
2059 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2061 assertEquals("isRecoveryApplicable", true,
2062 shard.underlyingActor().persistence().isRecoveryApplicable());
2064 waitUntilLeader(shard);
2066 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2068 assertEquals("isRecoveryApplicable", false,
2069 shard.underlyingActor().persistence().isRecoveryApplicable());
2071 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2073 assertEquals("isRecoveryApplicable", true,
2074 shard.underlyingActor().persistence().isRecoveryApplicable());
2076 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2081 public void testRegisterRoleChangeListener() throws Exception {
2082 new ShardTestKit(getSystem()) {
2084 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2085 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2086 "testRegisterRoleChangeListener");
2088 waitUntilLeader(shard);
2090 TestActorRef<MessageCollectorActor> listener =
2091 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2093 shard.tell(new RegisterRoleChangeListener(), listener);
2095 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
2096 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
2098 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
2100 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
2102 assertEquals(1, allMatching.size());
2108 public void testFollowerInitialSyncStatus() throws Exception {
2109 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2110 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2111 "testFollowerInitialSyncStatus");
2113 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2115 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2117 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2119 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2121 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2124 private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2125 modification.ready();
2126 store.validate(modification);
2127 store.commit(store.prepare(modification));