1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSelection;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Creator;
23 import akka.pattern.Patterns;
24 import akka.persistence.SaveSnapshotSuccess;
25 import akka.testkit.TestActorRef;
26 import akka.util.Timeout;
27 import com.google.common.base.Function;
28 import com.google.common.base.Optional;
29 import com.google.common.util.concurrent.Futures;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
65 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
67 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
68 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
69 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
70 import org.opendaylight.controller.cluster.datastore.modification.Modification;
71 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
72 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
73 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
76 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
78 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
79 import org.opendaylight.controller.cluster.raft.RaftActorContext;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
81 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
82 import org.opendaylight.controller.cluster.raft.Snapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
86 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
90 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
91 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
92 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
93 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
94 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
98 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
99 import org.opendaylight.yangtools.yang.common.QName;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
119 import scala.concurrent.Await;
120 import scala.concurrent.Future;
121 import scala.concurrent.duration.FiniteDuration;
123 public class ShardTest extends AbstractShardTest {
124 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
126 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
129 public void testRegisterChangeListener() throws Exception {
130 new ShardTestKit(getSystem()) {{
131 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
132 newShardProps(), "testRegisterChangeListener");
134 waitUntilLeader(shard);
136 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
138 final MockDataChangeListener listener = new MockDataChangeListener(1);
139 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
140 "testRegisterChangeListener-DataChangeListener");
142 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
143 dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
145 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
146 RegisterChangeListenerReply.class);
147 final String replyPath = reply.getListenerRegistrationPath().toString();
148 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
149 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
151 final YangInstanceIdentifier path = TestModel.TEST_PATH;
152 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
154 listener.waitForChangeEvents(path);
156 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
157 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
161 @SuppressWarnings("serial")
163 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
164 // This test tests the timing window in which a change listener is registered before the
165 // shard becomes the leader. We verify that the listener is registered and notified of the
166 // existing data when the shard becomes the leader.
167 new ShardTestKit(getSystem()) {{
168 // For this test, we want to send the RegisterChangeListener message after the shard
169 // has recovered from persistence and before it becomes the leader. So we subclass
170 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
171 // we know that the shard has been initialized to a follower and has started the
172 // election process. The following 2 CountDownLatches are used to coordinate the
173 // ElectionTimeout with the sending of the RegisterChangeListener message.
174 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
175 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
176 final Creator<Shard> creator = new Creator<Shard>() {
177 boolean firstElectionTimeout = true;
180 public Shard create() throws Exception {
181 // Use a non persistent provider because this test actually invokes persist on the journal
182 // this will cause all other messages to not be queued properly after that.
183 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
184 // it does do a persist)
185 return new Shard(shardID, Collections.<String,String>emptyMap(),
186 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
188 public void onReceiveCommand(final Object message) throws Exception {
189 if(message instanceof ElectionTimeout && firstElectionTimeout) {
190 // Got the first ElectionTimeout. We don't forward it to the
191 // base Shard yet until we've sent the RegisterChangeListener
192 // message. So we signal the onFirstElectionTimeout latch to tell
193 // the main thread to send the RegisterChangeListener message and
194 // start a thread to wait on the onChangeListenerRegistered latch,
195 // which the main thread signals after it has sent the message.
196 // After the onChangeListenerRegistered is triggered, we send the
197 // original ElectionTimeout message to proceed with the election.
198 firstElectionTimeout = false;
199 final ActorRef self = getSelf();
203 Uninterruptibles.awaitUninterruptibly(
204 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
205 self.tell(message, self);
209 onFirstElectionTimeout.countDown();
211 super.onReceiveCommand(message);
218 final MockDataChangeListener listener = new MockDataChangeListener(1);
219 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
220 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
222 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
223 Props.create(new DelegatingShardCreator(creator)),
224 "testRegisterChangeListenerWhenNotLeaderInitially");
226 // Write initial data into the in-memory store.
227 final YangInstanceIdentifier path = TestModel.TEST_PATH;
228 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
230 // Wait until the shard receives the first ElectionTimeout message.
231 assertEquals("Got first ElectionTimeout", true,
232 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
234 // Now send the RegisterChangeListener and wait for the reply.
235 shard.tell(new RegisterChangeListener(path, dclActor,
236 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
238 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
239 RegisterChangeListenerReply.class);
240 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
242 // Sanity check - verify the shard is not the leader yet.
243 shard.tell(new FindLeader(), getRef());
244 final FindLeaderReply findLeadeReply =
245 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
246 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
248 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
249 // with the election process.
250 onChangeListenerRegistered.countDown();
252 // Wait for the shard to become the leader and notify our listener with the existing
253 // data in the store.
254 listener.waitForChangeEvents(path);
256 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
257 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
262 public void testRegisterDataTreeChangeListener() throws Exception {
263 new ShardTestKit(getSystem()) {{
264 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
265 newShardProps(), "testRegisterDataTreeChangeListener");
267 waitUntilLeader(shard);
269 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
271 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
272 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
273 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
275 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
277 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
278 RegisterDataTreeChangeListenerReply.class);
279 final String replyPath = reply.getListenerRegistrationPath().toString();
280 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
281 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
283 final YangInstanceIdentifier path = TestModel.TEST_PATH;
284 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286 listener.waitForChangeEvents();
288 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
289 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
293 @SuppressWarnings("serial")
295 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
296 new ShardTestKit(getSystem()) {{
297 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
298 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
299 final Creator<Shard> creator = new Creator<Shard>() {
300 boolean firstElectionTimeout = true;
303 public Shard create() throws Exception {
304 return new Shard(shardID, Collections.<String,String>emptyMap(),
305 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
307 public void onReceiveCommand(final Object message) throws Exception {
308 if(message instanceof ElectionTimeout && firstElectionTimeout) {
309 firstElectionTimeout = false;
310 final ActorRef self = getSelf();
314 Uninterruptibles.awaitUninterruptibly(
315 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
316 self.tell(message, self);
320 onFirstElectionTimeout.countDown();
322 super.onReceiveCommand(message);
329 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
330 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
331 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
333 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
334 Props.create(new DelegatingShardCreator(creator)),
335 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
337 final YangInstanceIdentifier path = TestModel.TEST_PATH;
338 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
340 assertEquals("Got first ElectionTimeout", true,
341 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
343 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
344 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
345 RegisterDataTreeChangeListenerReply.class);
346 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
348 shard.tell(new FindLeader(), getRef());
349 final FindLeaderReply findLeadeReply =
350 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
351 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
353 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
355 onChangeListenerRegistered.countDown();
357 // TODO: investigate why we do not receive data chage events
358 listener.waitForChangeEvents();
360 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
361 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
366 public void testCreateTransaction(){
367 new ShardTestKit(getSystem()) {{
368 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
370 waitUntilLeader(shard);
372 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
374 shard.tell(new CreateTransaction("txn-1",
375 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
377 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
378 CreateTransactionReply.class);
380 final String path = reply.getTransactionActorPath().toString();
381 assertTrue("Unexpected transaction path " + path,
382 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
384 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
389 public void testCreateTransactionOnChain(){
390 new ShardTestKit(getSystem()) {{
391 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
393 waitUntilLeader(shard);
395 shard.tell(new CreateTransaction("txn-1",
396 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
399 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
400 CreateTransactionReply.class);
402 final String path = reply.getTransactionActorPath().toString();
403 assertTrue("Unexpected transaction path " + path,
404 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
406 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
410 @SuppressWarnings("serial")
412 public void testPeerAddressResolved() throws Exception {
413 new ShardTestKit(getSystem()) {{
414 final CountDownLatch recoveryComplete = new CountDownLatch(1);
415 class TestShard extends Shard {
417 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
418 newDatastoreContext(), SCHEMA_CONTEXT);
421 Map<String, String> getPeerAddresses() {
422 return getRaftActorContext().getPeerAddresses();
426 protected void onRecoveryComplete() {
428 super.onRecoveryComplete();
430 recoveryComplete.countDown();
435 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
436 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
438 public TestShard create() throws Exception {
439 return new TestShard();
441 })), "testPeerAddressResolved");
443 //waitUntilLeader(shard);
444 assertEquals("Recovery complete", true,
445 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
447 final String address = "akka://foobar";
448 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
450 assertEquals("getPeerAddresses", address,
451 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
453 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
458 public void testApplySnapshot() throws Exception {
460 ShardTestKit testkit = new ShardTestKit(getSystem());
462 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
463 "testApplySnapshot");
465 testkit.waitUntilLeader(shard);
467 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
468 store.setSchemaContext(SCHEMA_CONTEXT);
470 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
471 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
472 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
473 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
475 writeToStore(store, TestModel.TEST_PATH, container);
477 final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
478 final NormalizedNode<?,?> expected = readStore(store, root);
480 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
481 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
483 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
485 final NormalizedNode<?,?> actual = readStore(shard, root);
487 assertEquals("Root node", expected, actual);
489 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
493 public void testApplyState() throws Exception {
495 ShardTestKit testkit = new ShardTestKit(getSystem());
497 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
499 testkit.waitUntilLeader(shard);
501 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
504 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
506 shard.underlyingActor().onReceiveCommand(applyState);
508 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
509 assertEquals("Applied state", node, actual);
511 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
515 public void testApplyStateWithCandidatePayload() throws Exception {
517 ShardTestKit testkit = new ShardTestKit(getSystem());
519 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
521 testkit.waitUntilLeader(shard);
523 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
524 final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
526 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
527 DataTreeCandidatePayload.create(candidate)));
529 shard.underlyingActor().onReceiveCommand(applyState);
531 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
532 assertEquals("Applied state", node, actual);
534 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
537 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
538 final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
539 testStore.setSchemaContext(SCHEMA_CONTEXT);
541 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
543 final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
545 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
546 SerializationUtils.serializeNormalizedNode(root),
547 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
551 private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
552 source.validate(mod);
553 final DataTreeCandidate candidate = source.prepare(mod);
554 source.commit(candidate);
555 return DataTreeCandidatePayload.create(candidate);
559 public void testDataTreeCandidateRecovery() throws Exception {
560 // Set up the InMemorySnapshotStore.
561 final DataTree source = setupInMemorySnapshotStore();
563 final DataTreeModification writeMod = source.takeSnapshot().newModification();
564 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
566 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
568 // Set up the InMemoryJournal.
569 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
571 final int nListEntries = 16;
572 final Set<Integer> listEntryKeys = new HashSet<>();
574 // Add some ModificationPayload entries
575 for (int i = 1; i <= nListEntries; i++) {
576 listEntryKeys.add(Integer.valueOf(i));
578 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
579 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
581 final DataTreeModification mod = source.takeSnapshot().newModification();
582 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
584 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
585 payloadForModification(source, mod)));
588 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
589 new ApplyJournalEntries(nListEntries));
591 testRecovery(listEntryKeys);
595 public void testModicationRecovery() throws Exception {
597 // Set up the InMemorySnapshotStore.
598 setupInMemorySnapshotStore();
600 // Set up the InMemoryJournal.
602 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
604 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
605 new WriteModification(TestModel.OUTER_LIST_PATH,
606 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
608 final int nListEntries = 16;
609 final Set<Integer> listEntryKeys = new HashSet<>();
611 // Add some ModificationPayload entries
612 for(int i = 1; i <= nListEntries; i++) {
613 listEntryKeys.add(Integer.valueOf(i));
614 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
615 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
616 final Modification mod = new MergeModification(path,
617 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
618 InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
619 newModificationPayload(mod)));
622 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
623 new ApplyJournalEntries(nListEntries));
625 testRecovery(listEntryKeys);
628 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
629 final MutableCompositeModification compMod = new MutableCompositeModification();
630 for(final Modification mod: mods) {
631 compMod.addModification(mod);
634 return new ModificationPayload(compMod);
638 public void testConcurrentThreePhaseCommits() throws Throwable {
639 new ShardTestKit(getSystem()) {{
640 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
641 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
642 "testConcurrentThreePhaseCommits");
644 waitUntilLeader(shard);
646 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
648 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
650 final String transactionID1 = "tx1";
651 final MutableCompositeModification modification1 = new MutableCompositeModification();
652 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
653 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
655 final String transactionID2 = "tx2";
656 final MutableCompositeModification modification2 = new MutableCompositeModification();
657 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
658 TestModel.OUTER_LIST_PATH,
659 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
662 final String transactionID3 = "tx3";
663 final MutableCompositeModification modification3 = new MutableCompositeModification();
664 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
665 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
666 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
667 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
670 final long timeoutSec = 5;
671 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
672 final Timeout timeout = new Timeout(duration);
674 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
675 // by the ShardTransaction.
677 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
678 cohort1, modification1, true, false), getRef());
679 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
680 expectMsgClass(duration, ReadyTransactionReply.class));
681 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
683 // Send the CanCommitTransaction message for the first Tx.
685 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
686 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
687 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
688 assertEquals("Can commit", true, canCommitReply.getCanCommit());
690 // Send the ForwardedReadyTransaction for the next 2 Tx's.
692 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
693 cohort2, modification2, true, false), getRef());
694 expectMsgClass(duration, ReadyTransactionReply.class);
696 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
697 cohort3, modification3, true, false), getRef());
698 expectMsgClass(duration, ReadyTransactionReply.class);
700 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
701 // processed after the first Tx completes.
703 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
704 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
706 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
707 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
709 // Send the CommitTransaction message for the first Tx. After it completes, it should
710 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
712 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
713 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
715 // Wait for the next 2 Tx's to complete.
717 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
718 final CountDownLatch commitLatch = new CountDownLatch(2);
720 class OnFutureComplete extends OnComplete<Object> {
721 private final Class<?> expRespType;
723 OnFutureComplete(final Class<?> expRespType) {
724 this.expRespType = expRespType;
728 public void onComplete(final Throwable error, final Object resp) {
730 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
733 assertEquals("Commit response type", expRespType, resp.getClass());
735 } catch (final Exception e) {
741 void onSuccess(final Object resp) throws Exception {
745 class OnCommitFutureComplete extends OnFutureComplete {
746 OnCommitFutureComplete() {
747 super(CommitTransactionReply.SERIALIZABLE_CLASS);
751 public void onComplete(final Throwable error, final Object resp) {
752 super.onComplete(error, resp);
753 commitLatch.countDown();
757 class OnCanCommitFutureComplete extends OnFutureComplete {
758 private final String transactionID;
760 OnCanCommitFutureComplete(final String transactionID) {
761 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
762 this.transactionID = transactionID;
766 void onSuccess(final Object resp) throws Exception {
767 final CanCommitTransactionReply canCommitReply =
768 CanCommitTransactionReply.fromSerializable(resp);
769 assertEquals("Can commit", true, canCommitReply.getCanCommit());
771 final Future<Object> commitFuture = Patterns.ask(shard,
772 new CommitTransaction(transactionID).toSerializable(), timeout);
773 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
777 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
778 getSystem().dispatcher());
780 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
781 getSystem().dispatcher());
783 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
785 if(caughtEx.get() != null) {
786 throw caughtEx.get();
789 assertEquals("Commits complete", true, done);
791 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
792 inOrder.verify(cohort1).canCommit();
793 inOrder.verify(cohort1).preCommit();
794 inOrder.verify(cohort1).commit();
795 inOrder.verify(cohort2).canCommit();
796 inOrder.verify(cohort2).preCommit();
797 inOrder.verify(cohort2).commit();
798 inOrder.verify(cohort3).canCommit();
799 inOrder.verify(cohort3).preCommit();
800 inOrder.verify(cohort3).commit();
802 // Verify data in the data store.
804 verifyOuterListEntry(shard, 1);
806 verifyLastApplied(shard, 2);
808 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
812 private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
813 final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
814 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
817 private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
818 final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
819 final int messagesSent) {
820 final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
821 batched.addModification(new WriteModification(path, data));
822 batched.setReady(ready);
823 batched.setDoCommitOnReady(doCommitOnReady);
824 batched.setTotalMessagesSent(messagesSent);
829 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
830 new ShardTestKit(getSystem()) {{
831 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
832 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
833 "testBatchedModificationsWithNoCommitOnReady");
835 waitUntilLeader(shard);
837 final String transactionID = "tx";
838 final FiniteDuration duration = duration("5 seconds");
840 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
841 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
843 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
844 if(mockCohort.get() == null) {
845 mockCohort.set(createDelegatingMockCohort("cohort", actual));
848 return mockCohort.get();
852 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
854 // Send a BatchedModifications to start a transaction.
856 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
857 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
858 expectMsgClass(duration, BatchedModificationsReply.class);
860 // Send a couple more BatchedModifications.
862 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
863 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
864 expectMsgClass(duration, BatchedModificationsReply.class);
866 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
867 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
868 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
869 expectMsgClass(duration, ReadyTransactionReply.class);
871 // Send the CanCommitTransaction message.
873 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
874 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
875 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
876 assertEquals("Can commit", true, canCommitReply.getCanCommit());
878 // Send the CanCommitTransaction message.
880 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
881 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
883 final InOrder inOrder = inOrder(mockCohort.get());
884 inOrder.verify(mockCohort.get()).canCommit();
885 inOrder.verify(mockCohort.get()).preCommit();
886 inOrder.verify(mockCohort.get()).commit();
888 // Verify data in the data store.
890 verifyOuterListEntry(shard, 1);
892 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
897 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
898 new ShardTestKit(getSystem()) {{
899 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
900 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
901 "testBatchedModificationsWithCommitOnReady");
903 waitUntilLeader(shard);
905 final String transactionID = "tx";
906 final FiniteDuration duration = duration("5 seconds");
908 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
909 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
911 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
912 if(mockCohort.get() == null) {
913 mockCohort.set(createDelegatingMockCohort("cohort", actual));
916 return mockCohort.get();
920 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
922 // Send a BatchedModifications to start a transaction.
924 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
925 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
926 expectMsgClass(duration, BatchedModificationsReply.class);
928 // Send a couple more BatchedModifications.
930 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
931 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
932 expectMsgClass(duration, BatchedModificationsReply.class);
934 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
935 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
936 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
938 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
940 final InOrder inOrder = inOrder(mockCohort.get());
941 inOrder.verify(mockCohort.get()).canCommit();
942 inOrder.verify(mockCohort.get()).preCommit();
943 inOrder.verify(mockCohort.get()).commit();
945 // Verify data in the data store.
947 verifyOuterListEntry(shard, 1);
949 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
953 @Test(expected=IllegalStateException.class)
954 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
955 new ShardTestKit(getSystem()) {{
956 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
957 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
958 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
960 waitUntilLeader(shard);
962 final String transactionID = "tx1";
963 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
964 batched.setReady(true);
965 batched.setTotalMessagesSent(2);
967 shard.tell(batched, getRef());
969 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
971 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
973 if(failure != null) {
974 throw failure.cause();
980 public void testBatchedModificationsWithOperationFailure() throws Throwable {
981 new ShardTestKit(getSystem()) {{
982 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
983 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
984 "testBatchedModificationsWithOperationFailure");
986 waitUntilLeader(shard);
988 // Test merge with invalid data. An exception should occur when the merge is applied. Note that
989 // write will not validate the children for performance reasons.
991 String transactionID = "tx1";
993 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
994 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
995 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
997 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
998 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
999 shard.tell(batched, getRef());
1000 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1002 Throwable cause = failure.cause();
1004 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1005 batched.setReady(true);
1006 batched.setTotalMessagesSent(2);
1008 shard.tell(batched, getRef());
1010 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1011 assertEquals("Failure cause", cause, failure.cause());
1013 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1017 @SuppressWarnings("unchecked")
1018 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1019 final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1020 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1021 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1022 outerList.getValue() instanceof Iterable);
1023 final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1024 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1025 entry instanceof MapEntryNode);
1026 final MapEntryNode mapEntry = (MapEntryNode)entry;
1027 final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1028 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1029 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1030 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1034 public void testBatchedModificationsOnTransactionChain() throws Throwable {
1035 new ShardTestKit(getSystem()) {{
1036 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1037 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038 "testBatchedModificationsOnTransactionChain");
1040 waitUntilLeader(shard);
1042 final String transactionChainID = "txChain";
1043 final String transactionID1 = "tx1";
1044 final String transactionID2 = "tx2";
1046 final FiniteDuration duration = duration("5 seconds");
1048 // Send a BatchedModifications to start a chained write transaction and ready it.
1050 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1051 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1052 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1053 containerNode, true, false, 1), getRef());
1054 expectMsgClass(duration, ReadyTransactionReply.class);
1056 // Create a read Tx on the same chain.
1058 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1059 transactionChainID).toSerializable(), getRef());
1061 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1063 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1064 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1065 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1067 // Commit the write transaction.
1069 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1070 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1071 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1072 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1074 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1075 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1077 // Verify data in the data store.
1079 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1080 assertEquals("Stored node", containerNode, actualNode);
1082 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1087 public void testOnBatchedModificationsWhenNotLeader() {
1088 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1089 new ShardTestKit(getSystem()) {{
1090 final Creator<Shard> creator = new Creator<Shard>() {
1091 private static final long serialVersionUID = 1L;
1094 public Shard create() throws Exception {
1095 return new Shard(shardID, Collections.<String,String>emptyMap(),
1096 newDatastoreContext(), SCHEMA_CONTEXT) {
1098 protected boolean isLeader() {
1099 return overrideLeaderCalls.get() ? false : super.isLeader();
1103 protected ActorSelection getLeader() {
1104 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1111 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1112 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1114 waitUntilLeader(shard);
1116 overrideLeaderCalls.set(true);
1118 final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1120 shard.tell(batched, ActorRef.noSender());
1122 expectMsgEquals(batched);
1124 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1129 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1130 new ShardTestKit(getSystem()) {{
1131 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1132 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1133 "testForwardedReadyTransactionWithImmediateCommit");
1135 waitUntilLeader(shard);
1137 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1139 final String transactionID = "tx1";
1140 final MutableCompositeModification modification = new MutableCompositeModification();
1141 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1142 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1143 TestModel.TEST_PATH, containerNode, modification);
1145 final FiniteDuration duration = duration("5 seconds");
1147 // Simulate the ForwardedReadyTransaction messages that would be sent
1148 // by the ShardTransaction.
1150 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1151 cohort, modification, true, true), getRef());
1153 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1155 final InOrder inOrder = inOrder(cohort);
1156 inOrder.verify(cohort).canCommit();
1157 inOrder.verify(cohort).preCommit();
1158 inOrder.verify(cohort).commit();
1160 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1161 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1163 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1168 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1169 new ShardTestKit(getSystem()) {{
1170 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1171 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1172 "testReadyLocalTransactionWithImmediateCommit");
1174 waitUntilLeader(shard);
1176 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1178 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1180 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1181 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1182 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1183 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1185 final String txId = "tx1";
1186 modification.ready();
1187 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1189 shard.tell(readyMessage, getRef());
1191 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1193 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1194 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1196 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1201 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1202 new ShardTestKit(getSystem()) {{
1203 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1204 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1205 "testReadyLocalTransactionWithThreePhaseCommit");
1207 waitUntilLeader(shard);
1209 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1211 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1213 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1214 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1215 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1216 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1218 final String txId = "tx1";
1219 modification.ready();
1220 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1222 shard.tell(readyMessage, getRef());
1224 expectMsgClass(ReadyTransactionReply.class);
1226 // Send the CanCommitTransaction message.
1228 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1229 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1230 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1231 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1233 // Send the CanCommitTransaction message.
1235 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1236 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1238 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1239 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1241 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1246 public void testCommitWithPersistenceDisabled() throws Throwable {
1247 dataStoreContextBuilder.persistent(false);
1248 new ShardTestKit(getSystem()) {{
1249 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1250 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1251 "testCommitWithPersistenceDisabled");
1253 waitUntilLeader(shard);
1255 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1257 // Setup a simulated transactions with a mock cohort.
1259 final String transactionID = "tx";
1260 final MutableCompositeModification modification = new MutableCompositeModification();
1261 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1262 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1263 TestModel.TEST_PATH, containerNode, modification);
1265 final FiniteDuration duration = duration("5 seconds");
1267 // Simulate the ForwardedReadyTransaction messages that would be sent
1268 // by the ShardTransaction.
1270 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1271 cohort, modification, true, false), getRef());
1272 expectMsgClass(duration, ReadyTransactionReply.class);
1274 // Send the CanCommitTransaction message.
1276 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1277 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1278 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1279 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1281 // Send the CanCommitTransaction message.
1283 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1284 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1286 final InOrder inOrder = inOrder(cohort);
1287 inOrder.verify(cohort).canCommit();
1288 inOrder.verify(cohort).preCommit();
1289 inOrder.verify(cohort).commit();
1291 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1292 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1294 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1298 private static DataTreeCandidateTip mockCandidate(final String name) {
1299 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1300 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1301 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1302 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1303 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1304 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1305 return mockCandidate;
1308 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1309 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1310 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1311 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1312 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1313 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1314 return mockCandidate;
1318 public void testCommitWhenTransactionHasNoModifications(){
1319 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1320 // but here that need not happen
1321 new ShardTestKit(getSystem()) {
1323 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1324 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1325 "testCommitWhenTransactionHasNoModifications");
1327 waitUntilLeader(shard);
1329 final String transactionID = "tx1";
1330 final MutableCompositeModification modification = new MutableCompositeModification();
1331 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1332 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1333 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1334 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1335 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1337 final FiniteDuration duration = duration("5 seconds");
1339 // Simulate the ForwardedReadyTransaction messages that would be sent
1340 // by the ShardTransaction.
1342 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1343 cohort, modification, true, false), getRef());
1344 expectMsgClass(duration, ReadyTransactionReply.class);
1346 // Send the CanCommitTransaction message.
1348 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1349 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1350 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1351 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1353 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1354 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1356 final InOrder inOrder = inOrder(cohort);
1357 inOrder.verify(cohort).canCommit();
1358 inOrder.verify(cohort).preCommit();
1359 inOrder.verify(cohort).commit();
1361 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1362 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1364 // Use MBean for verification
1365 // Committed transaction count should increase as usual
1366 assertEquals(1,shardStats.getCommittedTransactionsCount());
1368 // Commit index should not advance because this does not go into the journal
1369 assertEquals(-1, shardStats.getCommitIndex());
1371 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1378 public void testCommitWhenTransactionHasModifications(){
1379 new ShardTestKit(getSystem()) {
1381 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1382 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1383 "testCommitWhenTransactionHasModifications");
1385 waitUntilLeader(shard);
1387 final String transactionID = "tx1";
1388 final MutableCompositeModification modification = new MutableCompositeModification();
1389 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1390 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1391 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1392 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1393 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1394 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1396 final FiniteDuration duration = duration("5 seconds");
1398 // Simulate the ForwardedReadyTransaction messages that would be sent
1399 // by the ShardTransaction.
1401 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1402 cohort, modification, true, false), getRef());
1403 expectMsgClass(duration, ReadyTransactionReply.class);
1405 // Send the CanCommitTransaction message.
1407 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1408 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1409 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1410 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1412 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1413 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1415 final InOrder inOrder = inOrder(cohort);
1416 inOrder.verify(cohort).canCommit();
1417 inOrder.verify(cohort).preCommit();
1418 inOrder.verify(cohort).commit();
1420 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1421 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1423 // Use MBean for verification
1424 // Committed transaction count should increase as usual
1425 assertEquals(1, shardStats.getCommittedTransactionsCount());
1427 // Commit index should advance as we do not have an empty modification
1428 assertEquals(0, shardStats.getCommitIndex());
1430 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1437 public void testCommitPhaseFailure() throws Throwable {
1438 new ShardTestKit(getSystem()) {{
1439 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1440 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1441 "testCommitPhaseFailure");
1443 waitUntilLeader(shard);
1445 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1448 final String transactionID1 = "tx1";
1449 final MutableCompositeModification modification1 = new MutableCompositeModification();
1450 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1451 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1452 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1453 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1454 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1456 final String transactionID2 = "tx2";
1457 final MutableCompositeModification modification2 = new MutableCompositeModification();
1458 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1459 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1461 final FiniteDuration duration = duration("5 seconds");
1462 final Timeout timeout = new Timeout(duration);
1464 // Simulate the ForwardedReadyTransaction messages that would be sent
1465 // by the ShardTransaction.
1467 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1468 cohort1, modification1, true, false), getRef());
1469 expectMsgClass(duration, ReadyTransactionReply.class);
1471 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1472 cohort2, modification2, true, false), getRef());
1473 expectMsgClass(duration, ReadyTransactionReply.class);
1475 // Send the CanCommitTransaction message for the first Tx.
1477 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1478 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1479 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1480 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1482 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1483 // processed after the first Tx completes.
1485 final Future<Object> canCommitFuture = Patterns.ask(shard,
1486 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1488 // Send the CommitTransaction message for the first Tx. This should send back an error
1489 // and trigger the 2nd Tx to proceed.
1491 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1492 expectMsgClass(duration, akka.actor.Status.Failure.class);
1494 // Wait for the 2nd Tx to complete the canCommit phase.
1496 final CountDownLatch latch = new CountDownLatch(1);
1497 canCommitFuture.onComplete(new OnComplete<Object>() {
1499 public void onComplete(final Throwable t, final Object resp) {
1502 }, getSystem().dispatcher());
1504 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1506 final InOrder inOrder = inOrder(cohort1, cohort2);
1507 inOrder.verify(cohort1).canCommit();
1508 inOrder.verify(cohort1).preCommit();
1509 inOrder.verify(cohort1).commit();
1510 inOrder.verify(cohort2).canCommit();
1512 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1517 public void testPreCommitPhaseFailure() throws Throwable {
1518 new ShardTestKit(getSystem()) {{
1519 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1520 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1521 "testPreCommitPhaseFailure");
1523 waitUntilLeader(shard);
1525 final String transactionID1 = "tx1";
1526 final MutableCompositeModification modification1 = new MutableCompositeModification();
1527 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1528 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1529 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1531 final String transactionID2 = "tx2";
1532 final MutableCompositeModification modification2 = new MutableCompositeModification();
1533 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1534 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1536 final FiniteDuration duration = duration("5 seconds");
1537 final Timeout timeout = new Timeout(duration);
1539 // Simulate the ForwardedReadyTransaction messages that would be sent
1540 // by the ShardTransaction.
1542 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1543 cohort1, modification1, true, false), getRef());
1544 expectMsgClass(duration, ReadyTransactionReply.class);
1546 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1547 cohort2, modification2, true, false), getRef());
1548 expectMsgClass(duration, ReadyTransactionReply.class);
1550 // Send the CanCommitTransaction message for the first Tx.
1552 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1553 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1554 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1555 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1557 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1558 // processed after the first Tx completes.
1560 final Future<Object> canCommitFuture = Patterns.ask(shard,
1561 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1563 // Send the CommitTransaction message for the first Tx. This should send back an error
1564 // and trigger the 2nd Tx to proceed.
1566 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1567 expectMsgClass(duration, akka.actor.Status.Failure.class);
1569 // Wait for the 2nd Tx to complete the canCommit phase.
1571 final CountDownLatch latch = new CountDownLatch(1);
1572 canCommitFuture.onComplete(new OnComplete<Object>() {
1574 public void onComplete(final Throwable t, final Object resp) {
1577 }, getSystem().dispatcher());
1579 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1581 final InOrder inOrder = inOrder(cohort1, cohort2);
1582 inOrder.verify(cohort1).canCommit();
1583 inOrder.verify(cohort1).preCommit();
1584 inOrder.verify(cohort2).canCommit();
1586 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1591 public void testCanCommitPhaseFailure() throws Throwable {
1592 new ShardTestKit(getSystem()) {{
1593 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1594 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1595 "testCanCommitPhaseFailure");
1597 waitUntilLeader(shard);
1599 final FiniteDuration duration = duration("5 seconds");
1601 final String transactionID1 = "tx1";
1602 final MutableCompositeModification modification = new MutableCompositeModification();
1603 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1604 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1606 // Simulate the ForwardedReadyTransaction messages that would be sent
1607 // by the ShardTransaction.
1609 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1610 cohort, modification, true, false), getRef());
1611 expectMsgClass(duration, ReadyTransactionReply.class);
1613 // Send the CanCommitTransaction message.
1615 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1616 expectMsgClass(duration, akka.actor.Status.Failure.class);
1618 // Send another can commit to ensure the failed one got cleaned up.
1622 final String transactionID2 = "tx2";
1623 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1625 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1626 cohort, modification, true, false), getRef());
1627 expectMsgClass(duration, ReadyTransactionReply.class);
1629 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1630 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1631 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1632 assertEquals("getCanCommit", true, reply.getCanCommit());
1634 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1639 public void testCanCommitPhaseFalseResponse() throws Throwable {
1640 new ShardTestKit(getSystem()) {{
1641 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1642 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1643 "testCanCommitPhaseFalseResponse");
1645 waitUntilLeader(shard);
1647 final FiniteDuration duration = duration("5 seconds");
1649 final String transactionID1 = "tx1";
1650 final MutableCompositeModification modification = new MutableCompositeModification();
1651 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1652 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1654 // Simulate the ForwardedReadyTransaction messages that would be sent
1655 // by the ShardTransaction.
1657 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1658 cohort, modification, true, false), getRef());
1659 expectMsgClass(duration, ReadyTransactionReply.class);
1661 // Send the CanCommitTransaction message.
1663 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1664 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1665 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1666 assertEquals("getCanCommit", false, reply.getCanCommit());
1668 // Send another can commit to ensure the failed one got cleaned up.
1672 final String transactionID2 = "tx2";
1673 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1675 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1676 cohort, modification, true, false), getRef());
1677 expectMsgClass(duration, ReadyTransactionReply.class);
1679 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1680 reply = CanCommitTransactionReply.fromSerializable(
1681 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1682 assertEquals("getCanCommit", true, reply.getCanCommit());
1684 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1689 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1690 new ShardTestKit(getSystem()) {{
1691 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1692 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1693 "testImmediateCommitWithCanCommitPhaseFailure");
1695 waitUntilLeader(shard);
1697 final FiniteDuration duration = duration("5 seconds");
1699 final String transactionID1 = "tx1";
1700 final MutableCompositeModification modification = new MutableCompositeModification();
1701 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1702 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1704 // Simulate the ForwardedReadyTransaction messages that would be sent
1705 // by the ShardTransaction.
1707 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1708 cohort, modification, true, true), getRef());
1710 expectMsgClass(duration, akka.actor.Status.Failure.class);
1712 // Send another can commit to ensure the failed one got cleaned up.
1716 final String transactionID2 = "tx2";
1717 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1718 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1719 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1720 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1721 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1722 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1723 doReturn(candidateRoot).when(candidate).getRootNode();
1724 doReturn(candidate).when(cohort).getCandidate();
1726 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1727 cohort, modification, true, true), getRef());
1729 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1731 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1736 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1737 new ShardTestKit(getSystem()) {{
1738 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1739 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1740 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1742 waitUntilLeader(shard);
1744 final FiniteDuration duration = duration("5 seconds");
1746 final String transactionID = "tx1";
1747 final MutableCompositeModification modification = new MutableCompositeModification();
1748 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1749 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1751 // Simulate the ForwardedReadyTransaction messages that would be sent
1752 // by the ShardTransaction.
1754 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1755 cohort, modification, true, true), getRef());
1757 expectMsgClass(duration, akka.actor.Status.Failure.class);
1759 // Send another can commit to ensure the failed one got cleaned up.
1763 final String transactionID2 = "tx2";
1764 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1765 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1766 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1767 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1768 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1769 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1770 doReturn(candidateRoot).when(candidate).getRootNode();
1771 doReturn(candidate).when(cohort).getCandidate();
1773 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1774 cohort, modification, true, true), getRef());
1776 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1778 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1783 public void testAbortBeforeFinishCommit() throws Throwable {
1784 new ShardTestKit(getSystem()) {{
1785 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1786 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1787 "testAbortBeforeFinishCommit");
1789 waitUntilLeader(shard);
1791 final FiniteDuration duration = duration("5 seconds");
1792 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1794 final String transactionID = "tx1";
1795 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1796 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1798 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1799 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1801 // Simulate an AbortTransaction message occurring during replication, after
1802 // persisting and before finishing the commit to the in-memory store.
1803 // We have no followers so due to optimizations in the RaftActor, it does not
1804 // attempt replication and thus we can't send an AbortTransaction message b/c
1805 // it would be processed too late after CommitTransaction completes. So we'll
1806 // simulate an AbortTransaction message occurring during replication by calling
1807 // the shard directly.
1809 shard.underlyingActor().doAbortTransaction(transactionID, null);
1811 return preCommitFuture;
1815 final MutableCompositeModification modification = new MutableCompositeModification();
1816 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1817 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1818 modification, preCommit);
1820 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1821 cohort, modification, true, false), getRef());
1822 expectMsgClass(duration, ReadyTransactionReply.class);
1824 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1825 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1826 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1827 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1829 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1830 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1832 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1834 // Since we're simulating an abort occurring during replication and before finish commit,
1835 // the data should still get written to the in-memory store since we've gotten past
1836 // canCommit and preCommit and persisted the data.
1837 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1839 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1844 public void testTransactionCommitTimeout() throws Throwable {
1845 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1847 new ShardTestKit(getSystem()) {{
1848 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1849 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1850 "testTransactionCommitTimeout");
1852 waitUntilLeader(shard);
1854 final FiniteDuration duration = duration("5 seconds");
1856 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1858 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1859 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1860 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1862 // Create 1st Tx - will timeout
1864 final String transactionID1 = "tx1";
1865 final MutableCompositeModification modification1 = new MutableCompositeModification();
1866 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1867 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1868 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1869 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1874 final String transactionID2 = "tx3";
1875 final MutableCompositeModification modification2 = new MutableCompositeModification();
1876 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1877 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1878 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1880 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1885 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1886 cohort1, modification1, true, false), getRef());
1887 expectMsgClass(duration, ReadyTransactionReply.class);
1889 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1890 cohort2, modification2, true, false), getRef());
1891 expectMsgClass(duration, ReadyTransactionReply.class);
1893 // canCommit 1st Tx. We don't send the commit so it should timeout.
1895 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1896 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1898 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1900 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1901 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1903 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1905 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1906 expectMsgClass(duration, akka.actor.Status.Failure.class);
1908 // Commit the 2nd Tx.
1910 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1911 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1913 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1914 assertNotNull(listNodePath + " not found", node);
1916 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1921 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1922 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1924 new ShardTestKit(getSystem()) {{
1925 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1926 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1927 "testTransactionCommitQueueCapacityExceeded");
1929 waitUntilLeader(shard);
1931 final FiniteDuration duration = duration("5 seconds");
1933 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1935 final String transactionID1 = "tx1";
1936 final MutableCompositeModification modification1 = new MutableCompositeModification();
1937 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1938 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1940 final String transactionID2 = "tx2";
1941 final MutableCompositeModification modification2 = new MutableCompositeModification();
1942 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1943 TestModel.OUTER_LIST_PATH,
1944 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1947 final String transactionID3 = "tx3";
1948 final MutableCompositeModification modification3 = new MutableCompositeModification();
1949 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1950 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1954 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1955 cohort1, modification1, true, false), getRef());
1956 expectMsgClass(duration, ReadyTransactionReply.class);
1958 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1959 cohort2, modification2, true, false), getRef());
1960 expectMsgClass(duration, ReadyTransactionReply.class);
1962 // The 3rd Tx should exceed queue capacity and fail.
1964 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1965 cohort3, modification3, true, false), getRef());
1966 expectMsgClass(duration, akka.actor.Status.Failure.class);
1968 // canCommit 1st Tx.
1970 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1971 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1973 // canCommit the 2nd Tx - it should get queued.
1975 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1977 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1979 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1980 expectMsgClass(duration, akka.actor.Status.Failure.class);
1982 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1987 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1988 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1990 new ShardTestKit(getSystem()) {{
1991 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1992 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1993 "testTransactionCommitWithPriorExpiredCohortEntries");
1995 waitUntilLeader(shard);
1997 final FiniteDuration duration = duration("5 seconds");
1999 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2001 final String transactionID1 = "tx1";
2002 final MutableCompositeModification modification1 = new MutableCompositeModification();
2003 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2004 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2006 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2007 cohort1, modification1, true, false), getRef());
2008 expectMsgClass(duration, ReadyTransactionReply.class);
2010 final String transactionID2 = "tx2";
2011 final MutableCompositeModification modification2 = new MutableCompositeModification();
2012 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2013 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2015 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2016 cohort2, modification2, true, false), getRef());
2017 expectMsgClass(duration, ReadyTransactionReply.class);
2019 final String transactionID3 = "tx3";
2020 final MutableCompositeModification modification3 = new MutableCompositeModification();
2021 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2022 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2024 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2025 cohort3, modification3, true, false), getRef());
2026 expectMsgClass(duration, ReadyTransactionReply.class);
2028 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2029 // should expire from the queue and the last one should be processed.
2031 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2032 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2034 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2039 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2040 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2042 new ShardTestKit(getSystem()) {{
2043 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2044 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2045 "testTransactionCommitWithSubsequentExpiredCohortEntry");
2047 waitUntilLeader(shard);
2049 final FiniteDuration duration = duration("5 seconds");
2051 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2053 final String transactionID1 = "tx1";
2054 final MutableCompositeModification modification1 = new MutableCompositeModification();
2055 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2056 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2058 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2059 cohort1, modification1, true, false), getRef());
2060 expectMsgClass(duration, ReadyTransactionReply.class);
2062 // CanCommit the first one so it's the current in-progress CohortEntry.
2064 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2065 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2067 // Ready the second Tx.
2069 final String transactionID2 = "tx2";
2070 final MutableCompositeModification modification2 = new MutableCompositeModification();
2071 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2072 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2074 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2075 cohort2, modification2, true, false), getRef());
2076 expectMsgClass(duration, ReadyTransactionReply.class);
2078 // Ready the third Tx.
2080 final String transactionID3 = "tx3";
2081 final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2082 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2083 .apply(modification3);
2084 modification3.ready();
2085 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2087 shard.tell(readyMessage, getRef());
2089 // Commit the first Tx. After completing, the second should expire from the queue and the third
2092 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2093 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2095 // Expect commit reply from the third Tx.
2097 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2099 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2100 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2102 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2107 public void testCanCommitBeforeReadyFailure() throws Throwable {
2108 new ShardTestKit(getSystem()) {{
2109 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2110 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2111 "testCanCommitBeforeReadyFailure");
2113 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2114 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2116 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2121 public void testAbortCurrentTransaction() throws Throwable {
2122 new ShardTestKit(getSystem()) {{
2123 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2124 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2125 "testAbortCurrentTransaction");
2127 waitUntilLeader(shard);
2129 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2131 final String transactionID1 = "tx1";
2132 final MutableCompositeModification modification1 = new MutableCompositeModification();
2133 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2134 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2135 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2137 final String transactionID2 = "tx2";
2138 final MutableCompositeModification modification2 = new MutableCompositeModification();
2139 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2140 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2142 final FiniteDuration duration = duration("5 seconds");
2143 final Timeout timeout = new Timeout(duration);
2145 // Simulate the ForwardedReadyTransaction messages that would be sent
2146 // by the ShardTransaction.
2148 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2149 cohort1, modification1, true, false), getRef());
2150 expectMsgClass(duration, ReadyTransactionReply.class);
2152 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2153 cohort2, modification2, true, false), getRef());
2154 expectMsgClass(duration, ReadyTransactionReply.class);
2156 // Send the CanCommitTransaction message for the first Tx.
2158 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2159 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2160 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2161 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2163 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2164 // processed after the first Tx completes.
2166 final Future<Object> canCommitFuture = Patterns.ask(shard,
2167 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2169 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2172 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2173 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2175 // Wait for the 2nd Tx to complete the canCommit phase.
2177 Await.ready(canCommitFuture, duration);
2179 final InOrder inOrder = inOrder(cohort1, cohort2);
2180 inOrder.verify(cohort1).canCommit();
2181 inOrder.verify(cohort2).canCommit();
2183 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2188 public void testAbortQueuedTransaction() throws Throwable {
2189 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2190 new ShardTestKit(getSystem()) {{
2191 final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2192 @SuppressWarnings("serial")
2193 final Creator<Shard> creator = new Creator<Shard>() {
2195 public Shard create() throws Exception {
2196 return new Shard(shardID, Collections.<String,String>emptyMap(),
2197 dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2199 public void onReceiveCommand(final Object message) throws Exception {
2200 super.onReceiveCommand(message);
2201 if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2202 if(cleaupCheckLatch.get() != null) {
2203 cleaupCheckLatch.get().countDown();
2211 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2212 Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2213 Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2215 waitUntilLeader(shard);
2217 final String transactionID = "tx1";
2219 final MutableCompositeModification modification = new MutableCompositeModification();
2220 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2221 doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2223 final FiniteDuration duration = duration("5 seconds");
2227 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2228 cohort, modification, true, false), getRef());
2229 expectMsgClass(duration, ReadyTransactionReply.class);
2231 assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2233 // Send the AbortTransaction message.
2235 shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2236 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2238 verify(cohort).abort();
2240 // Verify the tx cohort is removed from queue at the cleanup check interval.
2242 cleaupCheckLatch.set(new CountDownLatch(1));
2243 assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2244 cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2246 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2248 // Now send CanCommitTransaction - should fail.
2250 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2252 Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2253 assertTrue("Failure type", failure instanceof IllegalStateException);
2255 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2260 public void testCreateSnapshot() throws Exception {
2261 testCreateSnapshot(true, "testCreateSnapshot");
2265 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2266 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2269 @SuppressWarnings("serial")
2270 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2272 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2274 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2275 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2276 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2281 public void saveSnapshot(final Object o) {
2282 savedSnapshot.set(o);
2283 super.saveSnapshot(o);
2287 dataStoreContextBuilder.persistent(persistent);
2289 new ShardTestKit(getSystem()) {{
2290 class TestShard extends Shard {
2292 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2293 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2294 super(name, peerAddresses, datastoreContext, schemaContext);
2295 setPersistence(new TestPersistentDataProvider(super.persistence()));
2299 public void handleCommand(final Object message) {
2300 super.handleCommand(message);
2302 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2303 latch.get().countDown();
2308 public RaftActorContext getRaftActorContext() {
2309 return super.getRaftActorContext();
2313 final Creator<Shard> creator = new Creator<Shard>() {
2315 public Shard create() throws Exception {
2316 return new TestShard(shardID, Collections.<String,String>emptyMap(),
2317 newDatastoreContext(), SCHEMA_CONTEXT);
2321 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2322 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2324 waitUntilLeader(shard);
2326 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2328 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2330 // Trigger creation of a snapshot by ensuring
2331 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2332 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2334 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2336 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2337 savedSnapshot.get() instanceof Snapshot);
2339 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2341 latch.set(new CountDownLatch(1));
2342 savedSnapshot.set(null);
2344 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2346 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2348 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2349 savedSnapshot.get() instanceof Snapshot);
2351 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2353 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2356 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2358 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2359 assertEquals("Root node", expectedRoot, actual);
2365 * This test simply verifies that the applySnapShot logic will work
2366 * @throws ReadFailedException
2367 * @throws DataValidationFailedException
2370 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2371 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2372 store.setSchemaContext(SCHEMA_CONTEXT);
2374 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2375 putTransaction.write(TestModel.TEST_PATH,
2376 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2377 commitTransaction(store, putTransaction);
2380 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2382 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2384 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2385 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2387 commitTransaction(store, writeTransaction);
2389 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2391 assertEquals(expected, actual);
2395 public void testRecoveryApplicable(){
2397 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2398 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2400 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2401 persistentContext, SCHEMA_CONTEXT);
2403 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2404 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2406 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2407 nonPersistentContext, SCHEMA_CONTEXT);
2409 new ShardTestKit(getSystem()) {{
2410 final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2411 persistentProps, "testPersistence1");
2413 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2415 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2417 final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2418 nonPersistentProps, "testPersistence2");
2420 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2422 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2429 public void testOnDatastoreContext() {
2430 new ShardTestKit(getSystem()) {{
2431 dataStoreContextBuilder.persistent(true);
2433 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2435 assertEquals("isRecoveryApplicable", true,
2436 shard.underlyingActor().persistence().isRecoveryApplicable());
2438 waitUntilLeader(shard);
2440 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2442 assertEquals("isRecoveryApplicable", false,
2443 shard.underlyingActor().persistence().isRecoveryApplicable());
2445 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2447 assertEquals("isRecoveryApplicable", true,
2448 shard.underlyingActor().persistence().isRecoveryApplicable());
2450 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2455 public void testRegisterRoleChangeListener() throws Exception {
2456 new ShardTestKit(getSystem()) {
2458 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2459 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2460 "testRegisterRoleChangeListener");
2462 waitUntilLeader(shard);
2464 final TestActorRef<MessageCollectorActor> listener =
2465 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2467 shard.tell(new RegisterRoleChangeListener(), listener);
2469 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2471 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2472 ShardLeaderStateChanged.class);
2473 assertEquals("getLocalShardDataTree present", true,
2474 leaderStateChanged.getLocalShardDataTree().isPresent());
2475 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2476 leaderStateChanged.getLocalShardDataTree().get());
2478 MessageCollectorActor.clearMessages(listener);
2480 // Force a leader change
2482 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2484 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2485 ShardLeaderStateChanged.class);
2486 assertEquals("getLocalShardDataTree present", false,
2487 leaderStateChanged.getLocalShardDataTree().isPresent());
2489 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2495 public void testFollowerInitialSyncStatus() throws Exception {
2496 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2497 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2498 "testFollowerInitialSyncStatus");
2500 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2502 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2504 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2506 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2508 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2511 private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2512 modification.ready();
2513 store.validate(modification);
2514 store.commit(store.prepare(modification));