2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
24 import akka.actor.ActorRef;
25 import akka.actor.ActorSelection;
26 import akka.actor.PoisonPill;
27 import akka.actor.Props;
28 import akka.actor.Status.Failure;
29 import akka.dispatch.Dispatchers;
30 import akka.dispatch.OnComplete;
31 import akka.japi.Creator;
32 import akka.pattern.Patterns;
33 import akka.persistence.SaveSnapshotSuccess;
34 import akka.testkit.TestActorRef;
35 import akka.util.Timeout;
36 import com.google.common.base.Function;
37 import com.google.common.base.Optional;
38 import com.google.common.util.concurrent.Futures;
39 import com.google.common.util.concurrent.ListenableFuture;
40 import com.google.common.util.concurrent.Uninterruptibles;
41 import java.io.IOException;
42 import java.util.Collections;
43 import java.util.HashSet;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicReference;
50 import org.junit.Test;
51 import org.mockito.InOrder;
52 import org.opendaylight.controller.cluster.DataPersistenceProvider;
53 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
78 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.Modification;
80 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
81 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
82 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
84 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
85 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
87 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
88 import org.opendaylight.controller.cluster.raft.RaftActorContext;
89 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
90 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
91 import org.opendaylight.controller.cluster.raft.Snapshot;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
93 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
94 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
95 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
97 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
98 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
99 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
100 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
101 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
105 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
106 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
107 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
108 import org.opendaylight.yangtools.yang.common.QName;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
113 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
115 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
123 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
124 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
125 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
126 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
127 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
128 import scala.concurrent.Await;
129 import scala.concurrent.Future;
130 import scala.concurrent.duration.FiniteDuration;
132 public class ShardTest extends AbstractShardTest {
133 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
135 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
138 public void testRegisterChangeListener() throws Exception {
139 new ShardTestKit(getSystem()) {{
140 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
141 newShardProps(), "testRegisterChangeListener");
143 waitUntilLeader(shard);
145 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
147 final MockDataChangeListener listener = new MockDataChangeListener(1);
148 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
149 "testRegisterChangeListener-DataChangeListener");
151 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
152 dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
154 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
155 RegisterChangeListenerReply.class);
156 final String replyPath = reply.getListenerRegistrationPath().toString();
157 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
158 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
160 final YangInstanceIdentifier path = TestModel.TEST_PATH;
161 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
163 listener.waitForChangeEvents(path);
165 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
166 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
170 @SuppressWarnings("serial")
172 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
173 // This test tests the timing window in which a change listener is registered before the
174 // shard becomes the leader. We verify that the listener is registered and notified of the
175 // existing data when the shard becomes the leader.
176 new ShardTestKit(getSystem()) {{
177 // For this test, we want to send the RegisterChangeListener message after the shard
178 // has recovered from persistence and before it becomes the leader. So we subclass
179 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
180 // we know that the shard has been initialized to a follower and has started the
181 // election process. The following 2 CountDownLatches are used to coordinate the
182 // ElectionTimeout with the sending of the RegisterChangeListener message.
183 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
184 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
185 final Creator<Shard> creator = new Creator<Shard>() {
186 boolean firstElectionTimeout = true;
189 public Shard create() throws Exception {
190 // Use a non persistent provider because this test actually invokes persist on the journal
191 // this will cause all other messages to not be queued properly after that.
192 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
193 // it does do a persist)
194 return new Shard(shardID, Collections.<String,String>emptyMap(),
195 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
197 public void onReceiveCommand(final Object message) throws Exception {
198 if(message instanceof ElectionTimeout && firstElectionTimeout) {
199 // Got the first ElectionTimeout. We don't forward it to the
200 // base Shard yet until we've sent the RegisterChangeListener
201 // message. So we signal the onFirstElectionTimeout latch to tell
202 // the main thread to send the RegisterChangeListener message and
203 // start a thread to wait on the onChangeListenerRegistered latch,
204 // which the main thread signals after it has sent the message.
205 // After the onChangeListenerRegistered is triggered, we send the
206 // original ElectionTimeout message to proceed with the election.
207 firstElectionTimeout = false;
208 final ActorRef self = getSelf();
212 Uninterruptibles.awaitUninterruptibly(
213 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
214 self.tell(message, self);
218 onFirstElectionTimeout.countDown();
220 super.onReceiveCommand(message);
227 final MockDataChangeListener listener = new MockDataChangeListener(1);
228 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
229 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
231 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
232 Props.create(new DelegatingShardCreator(creator)),
233 "testRegisterChangeListenerWhenNotLeaderInitially");
235 // Write initial data into the in-memory store.
236 final YangInstanceIdentifier path = TestModel.TEST_PATH;
237 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
239 // Wait until the shard receives the first ElectionTimeout message.
240 assertEquals("Got first ElectionTimeout", true,
241 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
243 // Now send the RegisterChangeListener and wait for the reply.
244 shard.tell(new RegisterChangeListener(path, dclActor,
245 AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
247 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
248 RegisterChangeListenerReply.class);
249 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
251 // Sanity check - verify the shard is not the leader yet.
252 shard.tell(new FindLeader(), getRef());
253 final FindLeaderReply findLeadeReply =
254 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
255 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
257 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
258 // with the election process.
259 onChangeListenerRegistered.countDown();
261 // Wait for the shard to become the leader and notify our listener with the existing
262 // data in the store.
263 listener.waitForChangeEvents(path);
265 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
266 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
271 public void testRegisterDataTreeChangeListener() throws Exception {
272 new ShardTestKit(getSystem()) {{
273 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
274 newShardProps(), "testRegisterDataTreeChangeListener");
276 waitUntilLeader(shard);
278 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
280 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
281 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
282 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
284 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
286 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
287 RegisterDataTreeChangeListenerReply.class);
288 final String replyPath = reply.getListenerRegistrationPath().toString();
289 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
290 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
292 final YangInstanceIdentifier path = TestModel.TEST_PATH;
293 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
295 listener.waitForChangeEvents();
297 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
298 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
302 @SuppressWarnings("serial")
304 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
305 new ShardTestKit(getSystem()) {{
306 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
307 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
308 final Creator<Shard> creator = new Creator<Shard>() {
309 boolean firstElectionTimeout = true;
312 public Shard create() throws Exception {
313 return new Shard(shardID, Collections.<String,String>emptyMap(),
314 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
316 public void onReceiveCommand(final Object message) throws Exception {
317 if(message instanceof ElectionTimeout && firstElectionTimeout) {
318 firstElectionTimeout = false;
319 final ActorRef self = getSelf();
323 Uninterruptibles.awaitUninterruptibly(
324 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
325 self.tell(message, self);
329 onFirstElectionTimeout.countDown();
331 super.onReceiveCommand(message);
338 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
339 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
340 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
342 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
343 Props.create(new DelegatingShardCreator(creator)),
344 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
346 final YangInstanceIdentifier path = TestModel.TEST_PATH;
347 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
349 assertEquals("Got first ElectionTimeout", true,
350 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
352 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
353 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
354 RegisterDataTreeChangeListenerReply.class);
355 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
357 shard.tell(new FindLeader(), getRef());
358 final FindLeaderReply findLeadeReply =
359 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
360 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
362 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
364 onChangeListenerRegistered.countDown();
366 // TODO: investigate why we do not receive data chage events
367 listener.waitForChangeEvents();
369 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
370 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
375 public void testCreateTransaction(){
376 new ShardTestKit(getSystem()) {{
377 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
379 waitUntilLeader(shard);
381 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
383 shard.tell(new CreateTransaction("txn-1",
384 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
386 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
387 CreateTransactionReply.class);
389 final String path = reply.getTransactionActorPath().toString();
390 assertTrue("Unexpected transaction path " + path,
391 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
393 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
398 public void testCreateTransactionOnChain(){
399 new ShardTestKit(getSystem()) {{
400 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
402 waitUntilLeader(shard);
404 shard.tell(new CreateTransaction("txn-1",
405 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
408 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
409 CreateTransactionReply.class);
411 final String path = reply.getTransactionActorPath().toString();
412 assertTrue("Unexpected transaction path " + path,
413 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
415 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
419 @SuppressWarnings("serial")
421 public void testPeerAddressResolved() throws Exception {
422 new ShardTestKit(getSystem()) {{
423 final CountDownLatch recoveryComplete = new CountDownLatch(1);
424 class TestShard extends Shard {
426 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
427 newDatastoreContext(), SCHEMA_CONTEXT);
430 Map<String, String> getPeerAddresses() {
431 return getRaftActorContext().getPeerAddresses();
435 protected void onRecoveryComplete() {
437 super.onRecoveryComplete();
439 recoveryComplete.countDown();
444 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
445 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
447 public TestShard create() throws Exception {
448 return new TestShard();
450 })), "testPeerAddressResolved");
452 //waitUntilLeader(shard);
453 assertEquals("Recovery complete", true,
454 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
456 final String address = "akka://foobar";
457 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
459 assertEquals("getPeerAddresses", address,
460 ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
462 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
467 public void testApplySnapshot() throws Exception {
469 ShardTestKit testkit = new ShardTestKit(getSystem());
471 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
472 "testApplySnapshot");
474 testkit.waitUntilLeader(shard);
476 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
477 store.setSchemaContext(SCHEMA_CONTEXT);
479 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
480 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
481 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
482 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
484 writeToStore(store, TestModel.TEST_PATH, container);
486 final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
487 final NormalizedNode<?,?> expected = readStore(store, root);
489 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
490 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
492 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
494 final NormalizedNode<?,?> actual = readStore(shard, root);
496 assertEquals("Root node", expected, actual);
498 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
502 public void testApplyState() throws Exception {
504 ShardTestKit testkit = new ShardTestKit(getSystem());
506 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
508 testkit.waitUntilLeader(shard);
510 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
512 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
513 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
515 shard.underlyingActor().onReceiveCommand(applyState);
517 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
518 assertEquals("Applied state", node, actual);
520 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
524 public void testApplyStateWithCandidatePayload() throws Exception {
526 ShardTestKit testkit = new ShardTestKit(getSystem());
528 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
530 testkit.waitUntilLeader(shard);
532 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
533 final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
535 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
536 DataTreeCandidatePayload.create(candidate)));
538 shard.underlyingActor().onReceiveCommand(applyState);
540 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
541 assertEquals("Applied state", node, actual);
543 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
546 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
547 final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
548 testStore.setSchemaContext(SCHEMA_CONTEXT);
550 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
552 final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
554 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
555 SerializationUtils.serializeNormalizedNode(root),
556 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
560 private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
561 source.validate(mod);
562 final DataTreeCandidate candidate = source.prepare(mod);
563 source.commit(candidate);
564 return DataTreeCandidatePayload.create(candidate);
568 public void testDataTreeCandidateRecovery() throws Exception {
569 // Set up the InMemorySnapshotStore.
570 final DataTree source = setupInMemorySnapshotStore();
572 final DataTreeModification writeMod = source.takeSnapshot().newModification();
573 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
575 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
577 // Set up the InMemoryJournal.
578 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
580 final int nListEntries = 16;
581 final Set<Integer> listEntryKeys = new HashSet<>();
583 // Add some ModificationPayload entries
584 for (int i = 1; i <= nListEntries; i++) {
585 listEntryKeys.add(Integer.valueOf(i));
587 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
588 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
590 final DataTreeModification mod = source.takeSnapshot().newModification();
591 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
593 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
594 payloadForModification(source, mod)));
597 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
598 new ApplyJournalEntries(nListEntries));
600 testRecovery(listEntryKeys);
604 public void testModicationRecovery() throws Exception {
606 // Set up the InMemorySnapshotStore.
607 setupInMemorySnapshotStore();
609 // Set up the InMemoryJournal.
611 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
613 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
614 new WriteModification(TestModel.OUTER_LIST_PATH,
615 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
617 final int nListEntries = 16;
618 final Set<Integer> listEntryKeys = new HashSet<>();
620 // Add some ModificationPayload entries
621 for(int i = 1; i <= nListEntries; i++) {
622 listEntryKeys.add(Integer.valueOf(i));
623 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
624 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
625 final Modification mod = new MergeModification(path,
626 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
627 InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
628 newModificationPayload(mod)));
631 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
632 new ApplyJournalEntries(nListEntries));
634 testRecovery(listEntryKeys);
637 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
638 final MutableCompositeModification compMod = new MutableCompositeModification();
639 for(final Modification mod: mods) {
640 compMod.addModification(mod);
643 return new ModificationPayload(compMod);
647 public void testConcurrentThreePhaseCommits() throws Throwable {
648 new ShardTestKit(getSystem()) {{
649 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
650 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
651 "testConcurrentThreePhaseCommits");
653 waitUntilLeader(shard);
655 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
657 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
659 final String transactionID1 = "tx1";
660 final MutableCompositeModification modification1 = new MutableCompositeModification();
661 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
662 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
664 final String transactionID2 = "tx2";
665 final MutableCompositeModification modification2 = new MutableCompositeModification();
666 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
667 TestModel.OUTER_LIST_PATH,
668 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
671 final String transactionID3 = "tx3";
672 final MutableCompositeModification modification3 = new MutableCompositeModification();
673 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
674 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
675 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
676 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
679 final long timeoutSec = 5;
680 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
681 final Timeout timeout = new Timeout(duration);
683 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
684 // by the ShardTransaction.
686 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
687 cohort1, modification1, true, false), getRef());
688 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
689 expectMsgClass(duration, ReadyTransactionReply.class));
690 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
692 // Send the CanCommitTransaction message for the first Tx.
694 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
695 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
696 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
697 assertEquals("Can commit", true, canCommitReply.getCanCommit());
699 // Send the ForwardedReadyTransaction for the next 2 Tx's.
701 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
702 cohort2, modification2, true, false), getRef());
703 expectMsgClass(duration, ReadyTransactionReply.class);
705 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
706 cohort3, modification3, true, false), getRef());
707 expectMsgClass(duration, ReadyTransactionReply.class);
709 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
710 // processed after the first Tx completes.
712 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
713 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
715 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
716 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
718 // Send the CommitTransaction message for the first Tx. After it completes, it should
719 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
721 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
722 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
724 // Wait for the next 2 Tx's to complete.
726 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
727 final CountDownLatch commitLatch = new CountDownLatch(2);
729 class OnFutureComplete extends OnComplete<Object> {
730 private final Class<?> expRespType;
732 OnFutureComplete(final Class<?> expRespType) {
733 this.expRespType = expRespType;
737 public void onComplete(final Throwable error, final Object resp) {
739 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
742 assertEquals("Commit response type", expRespType, resp.getClass());
744 } catch (final Exception e) {
750 void onSuccess(final Object resp) throws Exception {
754 class OnCommitFutureComplete extends OnFutureComplete {
755 OnCommitFutureComplete() {
756 super(CommitTransactionReply.SERIALIZABLE_CLASS);
760 public void onComplete(final Throwable error, final Object resp) {
761 super.onComplete(error, resp);
762 commitLatch.countDown();
766 class OnCanCommitFutureComplete extends OnFutureComplete {
767 private final String transactionID;
769 OnCanCommitFutureComplete(final String transactionID) {
770 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
771 this.transactionID = transactionID;
775 void onSuccess(final Object resp) throws Exception {
776 final CanCommitTransactionReply canCommitReply =
777 CanCommitTransactionReply.fromSerializable(resp);
778 assertEquals("Can commit", true, canCommitReply.getCanCommit());
780 final Future<Object> commitFuture = Patterns.ask(shard,
781 new CommitTransaction(transactionID).toSerializable(), timeout);
782 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
786 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
787 getSystem().dispatcher());
789 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
790 getSystem().dispatcher());
792 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
794 if(caughtEx.get() != null) {
795 throw caughtEx.get();
798 assertEquals("Commits complete", true, done);
800 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
801 inOrder.verify(cohort1).canCommit();
802 inOrder.verify(cohort1).preCommit();
803 inOrder.verify(cohort1).commit();
804 inOrder.verify(cohort2).canCommit();
805 inOrder.verify(cohort2).preCommit();
806 inOrder.verify(cohort2).commit();
807 inOrder.verify(cohort3).canCommit();
808 inOrder.verify(cohort3).preCommit();
809 inOrder.verify(cohort3).commit();
811 // Verify data in the data store.
813 verifyOuterListEntry(shard, 1);
815 verifyLastApplied(shard, 2);
817 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
821 private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
822 final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
823 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
826 private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
827 final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
828 final int messagesSent) {
829 final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
830 batched.addModification(new WriteModification(path, data));
831 batched.setReady(ready);
832 batched.setDoCommitOnReady(doCommitOnReady);
833 batched.setTotalMessagesSent(messagesSent);
838 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
839 new ShardTestKit(getSystem()) {{
840 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
841 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
842 "testBatchedModificationsWithNoCommitOnReady");
844 waitUntilLeader(shard);
846 final String transactionID = "tx";
847 final FiniteDuration duration = duration("5 seconds");
849 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
850 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
852 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
853 if(mockCohort.get() == null) {
854 mockCohort.set(createDelegatingMockCohort("cohort", actual));
857 return mockCohort.get();
861 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
863 // Send a BatchedModifications to start a transaction.
865 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
866 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
867 expectMsgClass(duration, BatchedModificationsReply.class);
869 // Send a couple more BatchedModifications.
871 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
872 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
873 expectMsgClass(duration, BatchedModificationsReply.class);
875 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
876 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
877 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
878 expectMsgClass(duration, ReadyTransactionReply.class);
880 // Send the CanCommitTransaction message.
882 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
883 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
884 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
885 assertEquals("Can commit", true, canCommitReply.getCanCommit());
887 // Send the CanCommitTransaction message.
889 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
890 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
892 final InOrder inOrder = inOrder(mockCohort.get());
893 inOrder.verify(mockCohort.get()).canCommit();
894 inOrder.verify(mockCohort.get()).preCommit();
895 inOrder.verify(mockCohort.get()).commit();
897 // Verify data in the data store.
899 verifyOuterListEntry(shard, 1);
901 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
906 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
907 new ShardTestKit(getSystem()) {{
908 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
909 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
910 "testBatchedModificationsWithCommitOnReady");
912 waitUntilLeader(shard);
914 final String transactionID = "tx";
915 final FiniteDuration duration = duration("5 seconds");
917 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
918 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
920 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
921 if(mockCohort.get() == null) {
922 mockCohort.set(createDelegatingMockCohort("cohort", actual));
925 return mockCohort.get();
929 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
931 // Send a BatchedModifications to start a transaction.
933 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
934 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
935 expectMsgClass(duration, BatchedModificationsReply.class);
937 // Send a couple more BatchedModifications.
939 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
940 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
941 expectMsgClass(duration, BatchedModificationsReply.class);
943 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
944 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
945 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
947 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
949 final InOrder inOrder = inOrder(mockCohort.get());
950 inOrder.verify(mockCohort.get()).canCommit();
951 inOrder.verify(mockCohort.get()).preCommit();
952 inOrder.verify(mockCohort.get()).commit();
954 // Verify data in the data store.
956 verifyOuterListEntry(shard, 1);
958 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
962 @Test(expected=IllegalStateException.class)
963 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
964 new ShardTestKit(getSystem()) {{
965 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
966 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
967 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
969 waitUntilLeader(shard);
971 final String transactionID = "tx1";
972 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
973 batched.setReady(true);
974 batched.setTotalMessagesSent(2);
976 shard.tell(batched, getRef());
978 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
980 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
982 if(failure != null) {
983 throw failure.cause();
989 public void testBatchedModificationsWithOperationFailure() throws Throwable {
990 new ShardTestKit(getSystem()) {{
991 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
992 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
993 "testBatchedModificationsWithOperationFailure");
995 waitUntilLeader(shard);
997 // Test merge with invalid data. An exception should occur when the merge is applied. Note that
998 // write will not validate the children for performance reasons.
1000 String transactionID = "tx1";
1002 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1003 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1004 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1006 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1007 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1008 shard.tell(batched, getRef());
1009 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1011 Throwable cause = failure.cause();
1013 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1014 batched.setReady(true);
1015 batched.setTotalMessagesSent(2);
1017 shard.tell(batched, getRef());
1019 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1020 assertEquals("Failure cause", cause, failure.cause());
1022 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1026 @SuppressWarnings("unchecked")
1027 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1028 final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1029 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1030 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1031 outerList.getValue() instanceof Iterable);
1032 final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1033 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1034 entry instanceof MapEntryNode);
1035 final MapEntryNode mapEntry = (MapEntryNode)entry;
1036 final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1037 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1038 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1039 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1043 public void testBatchedModificationsOnTransactionChain() throws Throwable {
1044 new ShardTestKit(getSystem()) {{
1045 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1046 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1047 "testBatchedModificationsOnTransactionChain");
1049 waitUntilLeader(shard);
1051 final String transactionChainID = "txChain";
1052 final String transactionID1 = "tx1";
1053 final String transactionID2 = "tx2";
1055 final FiniteDuration duration = duration("5 seconds");
1057 // Send a BatchedModifications to start a chained write transaction and ready it.
1059 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1060 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1061 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1062 containerNode, true, false, 1), getRef());
1063 expectMsgClass(duration, ReadyTransactionReply.class);
1065 // Create a read Tx on the same chain.
1067 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1068 transactionChainID).toSerializable(), getRef());
1070 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1072 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1073 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1074 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1076 // Commit the write transaction.
1078 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1079 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1080 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1081 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1083 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1084 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1086 // Verify data in the data store.
1088 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1089 assertEquals("Stored node", containerNode, actualNode);
1091 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1096 public void testOnBatchedModificationsWhenNotLeader() {
1097 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1098 new ShardTestKit(getSystem()) {{
1099 final Creator<Shard> creator = new Creator<Shard>() {
1100 private static final long serialVersionUID = 1L;
1103 public Shard create() throws Exception {
1104 return new Shard(shardID, Collections.<String,String>emptyMap(),
1105 newDatastoreContext(), SCHEMA_CONTEXT) {
1107 protected boolean isLeader() {
1108 return overrideLeaderCalls.get() ? false : super.isLeader();
1112 protected ActorSelection getLeader() {
1113 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1120 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1121 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1123 waitUntilLeader(shard);
1125 overrideLeaderCalls.set(true);
1127 final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1129 shard.tell(batched, ActorRef.noSender());
1131 expectMsgEquals(batched);
1133 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1138 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1139 new ShardTestKit(getSystem()) {{
1140 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1141 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1142 "testForwardedReadyTransactionWithImmediateCommit");
1144 waitUntilLeader(shard);
1146 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1148 final String transactionID = "tx1";
1149 final MutableCompositeModification modification = new MutableCompositeModification();
1150 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1151 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1152 TestModel.TEST_PATH, containerNode, modification);
1154 final FiniteDuration duration = duration("5 seconds");
1156 // Simulate the ForwardedReadyTransaction messages that would be sent
1157 // by the ShardTransaction.
1159 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1160 cohort, modification, true, true), getRef());
1162 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1164 final InOrder inOrder = inOrder(cohort);
1165 inOrder.verify(cohort).canCommit();
1166 inOrder.verify(cohort).preCommit();
1167 inOrder.verify(cohort).commit();
1169 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1170 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1172 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1177 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1178 new ShardTestKit(getSystem()) {{
1179 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1180 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1181 "testReadyLocalTransactionWithImmediateCommit");
1183 waitUntilLeader(shard);
1185 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1187 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1189 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1190 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1191 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1192 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1194 final String txId = "tx1";
1195 modification.ready();
1196 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1198 shard.tell(readyMessage, getRef());
1200 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1202 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1203 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1205 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1210 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1211 new ShardTestKit(getSystem()) {{
1212 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1213 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1214 "testReadyLocalTransactionWithThreePhaseCommit");
1216 waitUntilLeader(shard);
1218 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1220 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1222 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1223 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1224 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1225 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1227 final String txId = "tx1";
1228 modification.ready();
1229 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1231 shard.tell(readyMessage, getRef());
1233 expectMsgClass(ReadyTransactionReply.class);
1235 // Send the CanCommitTransaction message.
1237 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1238 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1239 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1240 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1242 // Send the CanCommitTransaction message.
1244 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1245 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1247 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1248 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1250 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1255 public void testCommitWithPersistenceDisabled() throws Throwable {
1256 dataStoreContextBuilder.persistent(false);
1257 new ShardTestKit(getSystem()) {{
1258 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1259 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1260 "testCommitWithPersistenceDisabled");
1262 waitUntilLeader(shard);
1264 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1266 // Setup a simulated transactions with a mock cohort.
1268 final String transactionID = "tx";
1269 final MutableCompositeModification modification = new MutableCompositeModification();
1270 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1271 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1272 TestModel.TEST_PATH, containerNode, modification);
1274 final FiniteDuration duration = duration("5 seconds");
1276 // Simulate the ForwardedReadyTransaction messages that would be sent
1277 // by the ShardTransaction.
1279 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1280 cohort, modification, true, false), getRef());
1281 expectMsgClass(duration, ReadyTransactionReply.class);
1283 // Send the CanCommitTransaction message.
1285 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1286 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1287 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1288 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1290 // Send the CanCommitTransaction message.
1292 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1293 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1295 final InOrder inOrder = inOrder(cohort);
1296 inOrder.verify(cohort).canCommit();
1297 inOrder.verify(cohort).preCommit();
1298 inOrder.verify(cohort).commit();
1300 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1301 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1303 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1307 private static DataTreeCandidateTip mockCandidate(final String name) {
1308 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1309 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1310 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1311 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1312 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1313 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1314 return mockCandidate;
1317 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1318 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1319 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1320 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1321 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1322 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1323 return mockCandidate;
1327 public void testCommitWhenTransactionHasNoModifications(){
1328 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1329 // but here that need not happen
1330 new ShardTestKit(getSystem()) {
1332 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1333 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1334 "testCommitWhenTransactionHasNoModifications");
1336 waitUntilLeader(shard);
1338 final String transactionID = "tx1";
1339 final MutableCompositeModification modification = new MutableCompositeModification();
1340 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1341 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1342 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1343 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1344 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1346 final FiniteDuration duration = duration("5 seconds");
1348 // Simulate the ForwardedReadyTransaction messages that would be sent
1349 // by the ShardTransaction.
1351 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1352 cohort, modification, true, false), getRef());
1353 expectMsgClass(duration, ReadyTransactionReply.class);
1355 // Send the CanCommitTransaction message.
1357 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1358 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1359 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1360 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1362 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1363 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1365 final InOrder inOrder = inOrder(cohort);
1366 inOrder.verify(cohort).canCommit();
1367 inOrder.verify(cohort).preCommit();
1368 inOrder.verify(cohort).commit();
1370 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1371 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1373 // Use MBean for verification
1374 // Committed transaction count should increase as usual
1375 assertEquals(1,shardStats.getCommittedTransactionsCount());
1377 // Commit index should not advance because this does not go into the journal
1378 assertEquals(-1, shardStats.getCommitIndex());
1380 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1387 public void testCommitWhenTransactionHasModifications(){
1388 new ShardTestKit(getSystem()) {
1390 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1391 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1392 "testCommitWhenTransactionHasModifications");
1394 waitUntilLeader(shard);
1396 final String transactionID = "tx1";
1397 final MutableCompositeModification modification = new MutableCompositeModification();
1398 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1399 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1400 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1401 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1402 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1403 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1405 final FiniteDuration duration = duration("5 seconds");
1407 // Simulate the ForwardedReadyTransaction messages that would be sent
1408 // by the ShardTransaction.
1410 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1411 cohort, modification, true, false), getRef());
1412 expectMsgClass(duration, ReadyTransactionReply.class);
1414 // Send the CanCommitTransaction message.
1416 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1417 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1418 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1419 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1421 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1422 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1424 final InOrder inOrder = inOrder(cohort);
1425 inOrder.verify(cohort).canCommit();
1426 inOrder.verify(cohort).preCommit();
1427 inOrder.verify(cohort).commit();
1429 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1430 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1432 // Use MBean for verification
1433 // Committed transaction count should increase as usual
1434 assertEquals(1, shardStats.getCommittedTransactionsCount());
1436 // Commit index should advance as we do not have an empty modification
1437 assertEquals(0, shardStats.getCommitIndex());
1439 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1446 public void testCommitPhaseFailure() throws Throwable {
1447 new ShardTestKit(getSystem()) {{
1448 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1449 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1450 "testCommitPhaseFailure");
1452 waitUntilLeader(shard);
1454 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1457 final String transactionID1 = "tx1";
1458 final MutableCompositeModification modification1 = new MutableCompositeModification();
1459 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1460 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1461 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1462 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1463 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1465 final String transactionID2 = "tx2";
1466 final MutableCompositeModification modification2 = new MutableCompositeModification();
1467 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1468 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1470 final FiniteDuration duration = duration("5 seconds");
1471 final Timeout timeout = new Timeout(duration);
1473 // Simulate the ForwardedReadyTransaction messages that would be sent
1474 // by the ShardTransaction.
1476 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1477 cohort1, modification1, true, false), getRef());
1478 expectMsgClass(duration, ReadyTransactionReply.class);
1480 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1481 cohort2, modification2, true, false), getRef());
1482 expectMsgClass(duration, ReadyTransactionReply.class);
1484 // Send the CanCommitTransaction message for the first Tx.
1486 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1487 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1488 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1489 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1491 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1492 // processed after the first Tx completes.
1494 final Future<Object> canCommitFuture = Patterns.ask(shard,
1495 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1497 // Send the CommitTransaction message for the first Tx. This should send back an error
1498 // and trigger the 2nd Tx to proceed.
1500 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1501 expectMsgClass(duration, akka.actor.Status.Failure.class);
1503 // Wait for the 2nd Tx to complete the canCommit phase.
1505 final CountDownLatch latch = new CountDownLatch(1);
1506 canCommitFuture.onComplete(new OnComplete<Object>() {
1508 public void onComplete(final Throwable t, final Object resp) {
1511 }, getSystem().dispatcher());
1513 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1515 final InOrder inOrder = inOrder(cohort1, cohort2);
1516 inOrder.verify(cohort1).canCommit();
1517 inOrder.verify(cohort1).preCommit();
1518 inOrder.verify(cohort1).commit();
1519 inOrder.verify(cohort2).canCommit();
1521 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1526 public void testPreCommitPhaseFailure() throws Throwable {
1527 new ShardTestKit(getSystem()) {{
1528 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1529 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1530 "testPreCommitPhaseFailure");
1532 waitUntilLeader(shard);
1534 final String transactionID1 = "tx1";
1535 final MutableCompositeModification modification1 = new MutableCompositeModification();
1536 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1537 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1538 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1540 final String transactionID2 = "tx2";
1541 final MutableCompositeModification modification2 = new MutableCompositeModification();
1542 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1543 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1545 final FiniteDuration duration = duration("5 seconds");
1546 final Timeout timeout = new Timeout(duration);
1548 // Simulate the ForwardedReadyTransaction messages that would be sent
1549 // by the ShardTransaction.
1551 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1552 cohort1, modification1, true, false), getRef());
1553 expectMsgClass(duration, ReadyTransactionReply.class);
1555 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1556 cohort2, modification2, true, false), getRef());
1557 expectMsgClass(duration, ReadyTransactionReply.class);
1559 // Send the CanCommitTransaction message for the first Tx.
1561 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1562 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1563 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1564 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1566 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1567 // processed after the first Tx completes.
1569 final Future<Object> canCommitFuture = Patterns.ask(shard,
1570 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1572 // Send the CommitTransaction message for the first Tx. This should send back an error
1573 // and trigger the 2nd Tx to proceed.
1575 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1576 expectMsgClass(duration, akka.actor.Status.Failure.class);
1578 // Wait for the 2nd Tx to complete the canCommit phase.
1580 final CountDownLatch latch = new CountDownLatch(1);
1581 canCommitFuture.onComplete(new OnComplete<Object>() {
1583 public void onComplete(final Throwable t, final Object resp) {
1586 }, getSystem().dispatcher());
1588 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1590 final InOrder inOrder = inOrder(cohort1, cohort2);
1591 inOrder.verify(cohort1).canCommit();
1592 inOrder.verify(cohort1).preCommit();
1593 inOrder.verify(cohort2).canCommit();
1595 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1600 public void testCanCommitPhaseFailure() throws Throwable {
1601 new ShardTestKit(getSystem()) {{
1602 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1603 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1604 "testCanCommitPhaseFailure");
1606 waitUntilLeader(shard);
1608 final FiniteDuration duration = duration("5 seconds");
1610 final String transactionID1 = "tx1";
1611 final MutableCompositeModification modification = new MutableCompositeModification();
1612 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1613 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1615 // Simulate the ForwardedReadyTransaction messages that would be sent
1616 // by the ShardTransaction.
1618 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1619 cohort, modification, true, false), getRef());
1620 expectMsgClass(duration, ReadyTransactionReply.class);
1622 // Send the CanCommitTransaction message.
1624 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1625 expectMsgClass(duration, akka.actor.Status.Failure.class);
1627 // Send another can commit to ensure the failed one got cleaned up.
1631 final String transactionID2 = "tx2";
1632 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1634 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1635 cohort, modification, true, false), getRef());
1636 expectMsgClass(duration, ReadyTransactionReply.class);
1638 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1639 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1640 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1641 assertEquals("getCanCommit", true, reply.getCanCommit());
1643 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1648 public void testCanCommitPhaseFalseResponse() throws Throwable {
1649 new ShardTestKit(getSystem()) {{
1650 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1651 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1652 "testCanCommitPhaseFalseResponse");
1654 waitUntilLeader(shard);
1656 final FiniteDuration duration = duration("5 seconds");
1658 final String transactionID1 = "tx1";
1659 final MutableCompositeModification modification = new MutableCompositeModification();
1660 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1661 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1663 // Simulate the ForwardedReadyTransaction messages that would be sent
1664 // by the ShardTransaction.
1666 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1667 cohort, modification, true, false), getRef());
1668 expectMsgClass(duration, ReadyTransactionReply.class);
1670 // Send the CanCommitTransaction message.
1672 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1673 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1674 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1675 assertEquals("getCanCommit", false, reply.getCanCommit());
1677 // Send another can commit to ensure the failed one got cleaned up.
1681 final String transactionID2 = "tx2";
1682 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1684 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1685 cohort, modification, true, false), getRef());
1686 expectMsgClass(duration, ReadyTransactionReply.class);
1688 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1689 reply = CanCommitTransactionReply.fromSerializable(
1690 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1691 assertEquals("getCanCommit", true, reply.getCanCommit());
1693 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1698 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1699 new ShardTestKit(getSystem()) {{
1700 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1701 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1702 "testImmediateCommitWithCanCommitPhaseFailure");
1704 waitUntilLeader(shard);
1706 final FiniteDuration duration = duration("5 seconds");
1708 final String transactionID1 = "tx1";
1709 final MutableCompositeModification modification = new MutableCompositeModification();
1710 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1711 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1713 // Simulate the ForwardedReadyTransaction messages that would be sent
1714 // by the ShardTransaction.
1716 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1717 cohort, modification, true, true), getRef());
1719 expectMsgClass(duration, akka.actor.Status.Failure.class);
1721 // Send another can commit to ensure the failed one got cleaned up.
1725 final String transactionID2 = "tx2";
1726 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1727 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1728 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1729 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1730 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1731 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1732 doReturn(candidateRoot).when(candidate).getRootNode();
1733 doReturn(candidate).when(cohort).getCandidate();
1735 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1736 cohort, modification, true, true), getRef());
1738 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1740 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1745 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1746 new ShardTestKit(getSystem()) {{
1747 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1748 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1749 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1751 waitUntilLeader(shard);
1753 final FiniteDuration duration = duration("5 seconds");
1755 final String transactionID = "tx1";
1756 final MutableCompositeModification modification = new MutableCompositeModification();
1757 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1758 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1760 // Simulate the ForwardedReadyTransaction messages that would be sent
1761 // by the ShardTransaction.
1763 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1764 cohort, modification, true, true), getRef());
1766 expectMsgClass(duration, akka.actor.Status.Failure.class);
1768 // Send another can commit to ensure the failed one got cleaned up.
1772 final String transactionID2 = "tx2";
1773 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1774 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1775 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1776 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1777 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1778 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1779 doReturn(candidateRoot).when(candidate).getRootNode();
1780 doReturn(candidate).when(cohort).getCandidate();
1782 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1783 cohort, modification, true, true), getRef());
1785 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1787 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1792 public void testAbortBeforeFinishCommit() throws Throwable {
1793 new ShardTestKit(getSystem()) {{
1794 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1795 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1796 "testAbortBeforeFinishCommit");
1798 waitUntilLeader(shard);
1800 final FiniteDuration duration = duration("5 seconds");
1801 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1803 final String transactionID = "tx1";
1804 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1805 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1807 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1808 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1810 // Simulate an AbortTransaction message occurring during replication, after
1811 // persisting and before finishing the commit to the in-memory store.
1812 // We have no followers so due to optimizations in the RaftActor, it does not
1813 // attempt replication and thus we can't send an AbortTransaction message b/c
1814 // it would be processed too late after CommitTransaction completes. So we'll
1815 // simulate an AbortTransaction message occurring during replication by calling
1816 // the shard directly.
1818 shard.underlyingActor().doAbortTransaction(transactionID, null);
1820 return preCommitFuture;
1824 final MutableCompositeModification modification = new MutableCompositeModification();
1825 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1826 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1827 modification, preCommit);
1829 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1830 cohort, modification, true, false), getRef());
1831 expectMsgClass(duration, ReadyTransactionReply.class);
1833 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1834 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1835 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1836 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1838 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1839 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1841 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1843 // Since we're simulating an abort occurring during replication and before finish commit,
1844 // the data should still get written to the in-memory store since we've gotten past
1845 // canCommit and preCommit and persisted the data.
1846 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1848 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1853 public void testTransactionCommitTimeout() throws Throwable {
1854 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1856 new ShardTestKit(getSystem()) {{
1857 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1858 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1859 "testTransactionCommitTimeout");
1861 waitUntilLeader(shard);
1863 final FiniteDuration duration = duration("5 seconds");
1865 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1867 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1868 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1869 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1871 // Create 1st Tx - will timeout
1873 final String transactionID1 = "tx1";
1874 final MutableCompositeModification modification1 = new MutableCompositeModification();
1875 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1876 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1877 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1878 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1883 final String transactionID2 = "tx3";
1884 final MutableCompositeModification modification2 = new MutableCompositeModification();
1885 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1886 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1887 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1889 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1894 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1895 cohort1, modification1, true, false), getRef());
1896 expectMsgClass(duration, ReadyTransactionReply.class);
1898 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1899 cohort2, modification2, true, false), getRef());
1900 expectMsgClass(duration, ReadyTransactionReply.class);
1902 // canCommit 1st Tx. We don't send the commit so it should timeout.
1904 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1905 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1907 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1909 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1910 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1912 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1914 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1915 expectMsgClass(duration, akka.actor.Status.Failure.class);
1917 // Commit the 2nd Tx.
1919 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1920 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1922 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1923 assertNotNull(listNodePath + " not found", node);
1925 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1930 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1931 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1933 new ShardTestKit(getSystem()) {{
1934 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1935 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1936 "testTransactionCommitQueueCapacityExceeded");
1938 waitUntilLeader(shard);
1940 final FiniteDuration duration = duration("5 seconds");
1942 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1944 final String transactionID1 = "tx1";
1945 final MutableCompositeModification modification1 = new MutableCompositeModification();
1946 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1947 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1949 final String transactionID2 = "tx2";
1950 final MutableCompositeModification modification2 = new MutableCompositeModification();
1951 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1952 TestModel.OUTER_LIST_PATH,
1953 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1956 final String transactionID3 = "tx3";
1957 final MutableCompositeModification modification3 = new MutableCompositeModification();
1958 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1959 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1963 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1964 cohort1, modification1, true, false), getRef());
1965 expectMsgClass(duration, ReadyTransactionReply.class);
1967 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1968 cohort2, modification2, true, false), getRef());
1969 expectMsgClass(duration, ReadyTransactionReply.class);
1971 // The 3rd Tx should exceed queue capacity and fail.
1973 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1974 cohort3, modification3, true, false), getRef());
1975 expectMsgClass(duration, akka.actor.Status.Failure.class);
1977 // canCommit 1st Tx.
1979 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1980 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1982 // canCommit the 2nd Tx - it should get queued.
1984 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1986 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1988 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1989 expectMsgClass(duration, akka.actor.Status.Failure.class);
1991 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1996 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1997 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1999 new ShardTestKit(getSystem()) {{
2000 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2001 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2002 "testTransactionCommitWithPriorExpiredCohortEntries");
2004 waitUntilLeader(shard);
2006 final FiniteDuration duration = duration("5 seconds");
2008 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2010 final String transactionID1 = "tx1";
2011 final MutableCompositeModification modification1 = new MutableCompositeModification();
2012 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2013 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2015 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2016 cohort1, modification1, true, false), getRef());
2017 expectMsgClass(duration, ReadyTransactionReply.class);
2019 final String transactionID2 = "tx2";
2020 final MutableCompositeModification modification2 = new MutableCompositeModification();
2021 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2022 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2024 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2025 cohort2, modification2, true, false), getRef());
2026 expectMsgClass(duration, ReadyTransactionReply.class);
2028 final String transactionID3 = "tx3";
2029 final MutableCompositeModification modification3 = new MutableCompositeModification();
2030 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2031 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2033 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2034 cohort3, modification3, true, false), getRef());
2035 expectMsgClass(duration, ReadyTransactionReply.class);
2037 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2038 // should expire from the queue and the last one should be processed.
2040 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2041 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2043 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2048 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2049 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2051 new ShardTestKit(getSystem()) {{
2052 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2053 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2054 "testTransactionCommitWithSubsequentExpiredCohortEntry");
2056 waitUntilLeader(shard);
2058 final FiniteDuration duration = duration("5 seconds");
2060 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2062 final String transactionID1 = "tx1";
2063 final MutableCompositeModification modification1 = new MutableCompositeModification();
2064 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2065 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2067 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2068 cohort1, modification1, true, false), getRef());
2069 expectMsgClass(duration, ReadyTransactionReply.class);
2071 // CanCommit the first one so it's the current in-progress CohortEntry.
2073 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2074 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2076 // Ready the second Tx.
2078 final String transactionID2 = "tx2";
2079 final MutableCompositeModification modification2 = new MutableCompositeModification();
2080 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2081 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2083 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2084 cohort2, modification2, true, false), getRef());
2085 expectMsgClass(duration, ReadyTransactionReply.class);
2087 // Ready the third Tx.
2089 final String transactionID3 = "tx3";
2090 final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2091 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2092 .apply(modification3);
2093 modification3.ready();
2094 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2096 shard.tell(readyMessage, getRef());
2098 // Commit the first Tx. After completing, the second should expire from the queue and the third
2101 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2102 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2104 // Expect commit reply from the third Tx.
2106 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2108 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2109 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2111 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2116 public void testCanCommitBeforeReadyFailure() throws Throwable {
2117 new ShardTestKit(getSystem()) {{
2118 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2119 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2120 "testCanCommitBeforeReadyFailure");
2122 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2123 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2125 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2130 public void testAbortCurrentTransaction() throws Throwable {
2131 new ShardTestKit(getSystem()) {{
2132 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2133 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2134 "testAbortCurrentTransaction");
2136 waitUntilLeader(shard);
2138 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2140 final String transactionID1 = "tx1";
2141 final MutableCompositeModification modification1 = new MutableCompositeModification();
2142 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2143 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2144 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2146 final String transactionID2 = "tx2";
2147 final MutableCompositeModification modification2 = new MutableCompositeModification();
2148 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2149 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2151 final FiniteDuration duration = duration("5 seconds");
2152 final Timeout timeout = new Timeout(duration);
2154 // Simulate the ForwardedReadyTransaction messages that would be sent
2155 // by the ShardTransaction.
2157 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2158 cohort1, modification1, true, false), getRef());
2159 expectMsgClass(duration, ReadyTransactionReply.class);
2161 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2162 cohort2, modification2, true, false), getRef());
2163 expectMsgClass(duration, ReadyTransactionReply.class);
2165 // Send the CanCommitTransaction message for the first Tx.
2167 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2168 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2169 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2170 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2172 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2173 // processed after the first Tx completes.
2175 final Future<Object> canCommitFuture = Patterns.ask(shard,
2176 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2178 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2181 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2182 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2184 // Wait for the 2nd Tx to complete the canCommit phase.
2186 Await.ready(canCommitFuture, duration);
2188 final InOrder inOrder = inOrder(cohort1, cohort2);
2189 inOrder.verify(cohort1).canCommit();
2190 inOrder.verify(cohort2).canCommit();
2192 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2197 public void testAbortQueuedTransaction() throws Throwable {
2198 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2199 new ShardTestKit(getSystem()) {{
2200 final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2201 @SuppressWarnings("serial")
2202 final Creator<Shard> creator = new Creator<Shard>() {
2204 public Shard create() throws Exception {
2205 return new Shard(shardID, Collections.<String,String>emptyMap(),
2206 dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2208 public void onReceiveCommand(final Object message) throws Exception {
2209 super.onReceiveCommand(message);
2210 if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2211 if(cleaupCheckLatch.get() != null) {
2212 cleaupCheckLatch.get().countDown();
2220 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2221 Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2222 Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2224 waitUntilLeader(shard);
2226 final String transactionID = "tx1";
2228 final MutableCompositeModification modification = new MutableCompositeModification();
2229 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2230 doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2232 final FiniteDuration duration = duration("5 seconds");
2236 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2237 cohort, modification, true, false), getRef());
2238 expectMsgClass(duration, ReadyTransactionReply.class);
2240 assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2242 // Send the AbortTransaction message.
2244 shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2245 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2247 verify(cohort).abort();
2249 // Verify the tx cohort is removed from queue at the cleanup check interval.
2251 cleaupCheckLatch.set(new CountDownLatch(1));
2252 assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2253 cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2255 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2257 // Now send CanCommitTransaction - should fail.
2259 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2261 Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2262 assertTrue("Failure type", failure instanceof IllegalStateException);
2264 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2269 public void testCreateSnapshot() throws Exception {
2270 testCreateSnapshot(true, "testCreateSnapshot");
2274 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2275 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2278 @SuppressWarnings("serial")
2279 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2281 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2283 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2284 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2285 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2290 public void saveSnapshot(final Object o) {
2291 savedSnapshot.set(o);
2292 super.saveSnapshot(o);
2296 dataStoreContextBuilder.persistent(persistent);
2298 new ShardTestKit(getSystem()) {{
2299 class TestShard extends Shard {
2301 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2302 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2303 super(name, peerAddresses, datastoreContext, schemaContext);
2304 setPersistence(new TestPersistentDataProvider(super.persistence()));
2308 public void handleCommand(final Object message) {
2309 super.handleCommand(message);
2311 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2312 latch.get().countDown();
2317 public RaftActorContext getRaftActorContext() {
2318 return super.getRaftActorContext();
2322 final Creator<Shard> creator = new Creator<Shard>() {
2324 public Shard create() throws Exception {
2325 return new TestShard(shardID, Collections.<String,String>emptyMap(),
2326 newDatastoreContext(), SCHEMA_CONTEXT);
2330 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2331 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2333 waitUntilLeader(shard);
2335 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2337 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2339 // Trigger creation of a snapshot by ensuring
2340 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2341 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2343 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2345 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2346 savedSnapshot.get() instanceof Snapshot);
2348 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2350 latch.set(new CountDownLatch(1));
2351 savedSnapshot.set(null);
2353 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2355 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2357 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2358 savedSnapshot.get() instanceof Snapshot);
2360 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2362 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2365 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2367 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2368 assertEquals("Root node", expectedRoot, actual);
2374 * This test simply verifies that the applySnapShot logic will work
2375 * @throws ReadFailedException
2376 * @throws DataValidationFailedException
2379 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2380 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2381 store.setSchemaContext(SCHEMA_CONTEXT);
2383 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2384 putTransaction.write(TestModel.TEST_PATH,
2385 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2386 commitTransaction(store, putTransaction);
2389 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2391 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2393 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2394 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2396 commitTransaction(store, writeTransaction);
2398 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2400 assertEquals(expected, actual);
2404 public void testRecoveryApplicable(){
2406 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2407 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2409 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2410 persistentContext, SCHEMA_CONTEXT);
2412 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2413 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2415 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2416 nonPersistentContext, SCHEMA_CONTEXT);
2418 new ShardTestKit(getSystem()) {{
2419 final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2420 persistentProps, "testPersistence1");
2422 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2424 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2426 final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2427 nonPersistentProps, "testPersistence2");
2429 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2431 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2438 public void testOnDatastoreContext() {
2439 new ShardTestKit(getSystem()) {{
2440 dataStoreContextBuilder.persistent(true);
2442 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2444 assertEquals("isRecoveryApplicable", true,
2445 shard.underlyingActor().persistence().isRecoveryApplicable());
2447 waitUntilLeader(shard);
2449 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2451 assertEquals("isRecoveryApplicable", false,
2452 shard.underlyingActor().persistence().isRecoveryApplicable());
2454 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2456 assertEquals("isRecoveryApplicable", true,
2457 shard.underlyingActor().persistence().isRecoveryApplicable());
2459 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2464 public void testRegisterRoleChangeListener() throws Exception {
2465 new ShardTestKit(getSystem()) {
2467 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2468 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2469 "testRegisterRoleChangeListener");
2471 waitUntilLeader(shard);
2473 final TestActorRef<MessageCollectorActor> listener =
2474 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2476 shard.tell(new RegisterRoleChangeListener(), listener);
2478 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2480 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2481 ShardLeaderStateChanged.class);
2482 assertEquals("getLocalShardDataTree present", true,
2483 leaderStateChanged.getLocalShardDataTree().isPresent());
2484 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2485 leaderStateChanged.getLocalShardDataTree().get());
2487 MessageCollectorActor.clearMessages(listener);
2489 // Force a leader change
2491 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2493 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2494 ShardLeaderStateChanged.class);
2495 assertEquals("getLocalShardDataTree present", false,
2496 leaderStateChanged.getLocalShardDataTree().isPresent());
2498 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2504 public void testFollowerInitialSyncStatus() throws Exception {
2505 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2506 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2507 "testFollowerInitialSyncStatus");
2509 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2511 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2513 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2515 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2517 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2520 private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2521 modification.ready();
2522 store.validate(modification);
2523 store.commit(store.prepare(modification));
2527 public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2528 new ShardTestKit(getSystem()) {{
2529 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2530 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2531 final Creator<Shard> creator = new Creator<Shard>() {
2532 boolean firstElectionTimeout = true;
2535 public Shard create() throws Exception {
2536 return new Shard(shardID, Collections.<String,String>emptyMap(),
2537 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2539 public void onReceiveCommand(final Object message) throws Exception {
2540 if(message instanceof ElectionTimeout && firstElectionTimeout) {
2541 firstElectionTimeout = false;
2542 final ActorRef self = getSelf();
2546 Uninterruptibles.awaitUninterruptibly(
2547 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2548 self.tell(message, self);
2552 onFirstElectionTimeout.countDown();
2554 super.onReceiveCommand(message);
2561 final MockDataChangeListener listener = new MockDataChangeListener(1);
2562 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2563 "testDataChangeListenerOnFollower-DataChangeListener");
2565 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2566 Props.create(new DelegatingShardCreator(creator)),
2567 "testDataChangeListenerOnFollower");
2569 assertEquals("Got first ElectionTimeout", true,
2570 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2572 shard.tell(new FindLeader(), getRef());
2573 final FindLeaderReply findLeadeReply =
2574 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2575 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2577 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2579 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2580 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2581 RegisterChangeListenerReply.class);
2582 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2584 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2586 onChangeListenerRegistered.countDown();
2588 listener.waitForChangeEvents();
2590 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2591 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2596 public void testClusteredDataChangeListernerRegistration() throws Exception {
2597 new ShardTestKit(getSystem()) {{
2598 final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2599 .shardName("inventory").type("config").build();
2601 final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2602 .shardName("inventory").type("config").build();
2603 final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2606 public Shard create() throws Exception {
2607 return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
2608 "akka://test/user/" + member2ShardID.toString()),
2609 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2611 public void onReceiveCommand(final Object message) throws Exception {
2613 if(!(message instanceof ElectionTimeout)) {
2614 super.onReceiveCommand(message);
2621 final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2624 public Shard create() throws Exception {
2625 return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
2626 "akka://test/user/" + member1ShardID.toString()),
2627 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
2632 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2633 Props.create(new DelegatingShardCreator(followerShardCreator)),
2634 member1ShardID.toString());
2636 final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2637 Props.create(new DelegatingShardCreator(leaderShardCreator)),
2638 member2ShardID.toString());
2639 // Sleep to let election happen
2640 Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2642 shard.tell(new FindLeader(), getRef());
2643 final FindLeaderReply findLeaderReply =
2644 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2645 assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2647 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2648 final MockDataChangeListener listener = new MockDataChangeListener(1);
2649 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2650 "testDataChangeListenerOnFollower-DataChangeListener");
2652 shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2653 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2654 RegisterChangeListenerReply.class);
2655 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2657 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2659 listener.waitForChangeEvents();
2661 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2662 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());