1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSelection;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Creator;
23 import akka.pattern.Patterns;
24 import akka.persistence.SaveSnapshotSuccess;
25 import akka.testkit.TestActorRef;
26 import akka.util.Timeout;
27 import com.google.common.base.Function;
28 import com.google.common.base.Optional;
29 import com.google.common.util.concurrent.Futures;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
65 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
67 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
68 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
69 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
70 import org.opendaylight.controller.cluster.datastore.modification.Modification;
71 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
72 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
73 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
76 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
78 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
79 import org.opendaylight.controller.cluster.raft.RaftActorContext;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
81 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
82 import org.opendaylight.controller.cluster.raft.Snapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
86 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
90 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
91 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
92 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
93 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
94 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
98 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
99 import org.opendaylight.yangtools.yang.common.QName;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
119 import scala.concurrent.Await;
120 import scala.concurrent.Future;
121 import scala.concurrent.duration.FiniteDuration;
123 public class ShardTest extends AbstractShardTest {
124 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
126 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
128 final CountDownLatch recoveryComplete = new CountDownLatch(1);
130 protected Props newShardPropsWithRecoveryComplete() {
132 final Creator<Shard> creator = new Creator<Shard>() {
134 public Shard create() throws Exception {
135 return new Shard(shardID, Collections.<String,String>emptyMap(),
136 newDatastoreContext(), SCHEMA_CONTEXT) {
138 protected void onRecoveryComplete() {
140 super.onRecoveryComplete();
142 recoveryComplete.countDown();
148 return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
152 public void testRegisterChangeListener() throws Exception {
153 new ShardTestKit(getSystem()) {{
154 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
155 newShardProps(), "testRegisterChangeListener");
157 waitUntilLeader(shard);
159 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
161 final MockDataChangeListener listener = new MockDataChangeListener(1);
162 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
163 "testRegisterChangeListener-DataChangeListener");
165 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
166 dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
168 final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
169 RegisterChangeListenerReply.class);
170 final String replyPath = reply.getListenerRegistrationPath().toString();
171 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
172 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
174 final YangInstanceIdentifier path = TestModel.TEST_PATH;
175 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
177 listener.waitForChangeEvents(path);
179 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
180 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
184 @SuppressWarnings("serial")
186 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
187 // This test tests the timing window in which a change listener is registered before the
188 // shard becomes the leader. We verify that the listener is registered and notified of the
189 // existing data when the shard becomes the leader.
190 new ShardTestKit(getSystem()) {{
191 // For this test, we want to send the RegisterChangeListener message after the shard
192 // has recovered from persistence and before it becomes the leader. So we subclass
193 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
194 // we know that the shard has been initialized to a follower and has started the
195 // election process. The following 2 CountDownLatches are used to coordinate the
196 // ElectionTimeout with the sending of the RegisterChangeListener message.
197 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
198 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
199 final Creator<Shard> creator = new Creator<Shard>() {
200 boolean firstElectionTimeout = true;
203 public Shard create() throws Exception {
204 // Use a non persistent provider because this test actually invokes persist on the journal
205 // this will cause all other messages to not be queued properly after that.
206 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
207 // it does do a persist)
208 return new Shard(shardID, Collections.<String,String>emptyMap(),
209 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
211 public void onReceiveCommand(final Object message) throws Exception {
212 if(message instanceof ElectionTimeout && firstElectionTimeout) {
213 // Got the first ElectionTimeout. We don't forward it to the
214 // base Shard yet until we've sent the RegisterChangeListener
215 // message. So we signal the onFirstElectionTimeout latch to tell
216 // the main thread to send the RegisterChangeListener message and
217 // start a thread to wait on the onChangeListenerRegistered latch,
218 // which the main thread signals after it has sent the message.
219 // After the onChangeListenerRegistered is triggered, we send the
220 // original ElectionTimeout message to proceed with the election.
221 firstElectionTimeout = false;
222 final ActorRef self = getSelf();
226 Uninterruptibles.awaitUninterruptibly(
227 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
228 self.tell(message, self);
232 onFirstElectionTimeout.countDown();
234 super.onReceiveCommand(message);
241 final MockDataChangeListener listener = new MockDataChangeListener(1);
242 final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
243 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
245 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
246 Props.create(new DelegatingShardCreator(creator)),
247 "testRegisterChangeListenerWhenNotLeaderInitially");
249 // Write initial data into the in-memory store.
250 final YangInstanceIdentifier path = TestModel.TEST_PATH;
251 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
253 // Wait until the shard receives the first ElectionTimeout message.
254 assertEquals("Got first ElectionTimeout", true,
255 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
257 // Now send the RegisterChangeListener and wait for the reply.
258 shard.tell(new RegisterChangeListener(path, dclActor,
259 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
261 final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
262 RegisterChangeListenerReply.class);
263 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
265 // Sanity check - verify the shard is not the leader yet.
266 shard.tell(new FindLeader(), getRef());
267 final FindLeaderReply findLeadeReply =
268 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
269 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
271 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
272 // with the election process.
273 onChangeListenerRegistered.countDown();
275 // Wait for the shard to become the leader and notify our listener with the existing
276 // data in the store.
277 listener.waitForChangeEvents(path);
279 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
280 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
285 public void testRegisterDataTreeChangeListener() throws Exception {
286 new ShardTestKit(getSystem()) {{
287 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
288 newShardProps(), "testRegisterDataTreeChangeListener");
290 waitUntilLeader(shard);
292 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
294 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
295 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
296 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
298 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
300 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
301 RegisterDataTreeChangeListenerReply.class);
302 final String replyPath = reply.getListenerRegistrationPath().toString();
303 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
304 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
306 final YangInstanceIdentifier path = TestModel.TEST_PATH;
307 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
309 listener.waitForChangeEvents();
311 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
312 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
316 @SuppressWarnings("serial")
318 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
319 new ShardTestKit(getSystem()) {{
320 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
321 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
322 final Creator<Shard> creator = new Creator<Shard>() {
323 boolean firstElectionTimeout = true;
326 public Shard create() throws Exception {
327 return new Shard(shardID, Collections.<String,String>emptyMap(),
328 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
330 public void onReceiveCommand(final Object message) throws Exception {
331 if(message instanceof ElectionTimeout && firstElectionTimeout) {
332 firstElectionTimeout = false;
333 final ActorRef self = getSelf();
337 Uninterruptibles.awaitUninterruptibly(
338 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
339 self.tell(message, self);
343 onFirstElectionTimeout.countDown();
345 super.onReceiveCommand(message);
352 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
353 final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
354 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
356 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
357 Props.create(new DelegatingShardCreator(creator)),
358 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
360 final YangInstanceIdentifier path = TestModel.TEST_PATH;
361 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
363 assertEquals("Got first ElectionTimeout", true,
364 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
366 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
367 final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
368 RegisterDataTreeChangeListenerReply.class);
369 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
371 shard.tell(new FindLeader(), getRef());
372 final FindLeaderReply findLeadeReply =
373 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
374 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
376 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
378 onChangeListenerRegistered.countDown();
380 // TODO: investigate why we do not receive data chage events
381 listener.waitForChangeEvents();
383 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
384 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
389 public void testCreateTransaction(){
390 new ShardTestKit(getSystem()) {{
391 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
393 waitUntilLeader(shard);
395 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
397 shard.tell(new CreateTransaction("txn-1",
398 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
400 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
401 CreateTransactionReply.class);
403 final String path = reply.getTransactionActorPath().toString();
404 assertTrue("Unexpected transaction path " + path,
405 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
407 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
412 public void testCreateTransactionOnChain(){
413 new ShardTestKit(getSystem()) {{
414 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
416 waitUntilLeader(shard);
418 shard.tell(new CreateTransaction("txn-1",
419 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
422 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
423 CreateTransactionReply.class);
425 final String path = reply.getTransactionActorPath().toString();
426 assertTrue("Unexpected transaction path " + path,
427 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
429 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
433 @SuppressWarnings("serial")
435 public void testPeerAddressResolved() throws Exception {
436 new ShardTestKit(getSystem()) {{
437 final CountDownLatch recoveryComplete = new CountDownLatch(1);
438 class TestShard extends Shard {
440 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
441 newDatastoreContext(), SCHEMA_CONTEXT);
444 Map<String, String> getPeerAddresses() {
445 return getRaftActorContext().getPeerAddresses();
449 protected void onRecoveryComplete() {
451 super.onRecoveryComplete();
453 recoveryComplete.countDown();
458 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
459 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
461 public TestShard create() throws Exception {
462 return new TestShard();
464 })), "testPeerAddressResolved");
466 //waitUntilLeader(shard);
467 assertEquals("Recovery complete", true,
468 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
470 final String address = "akka://foobar";
471 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
473 assertEquals("getPeerAddresses", address,
474 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
476 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
481 public void testApplySnapshot() throws Exception {
482 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
483 "testApplySnapshot");
485 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
486 store.setSchemaContext(SCHEMA_CONTEXT);
488 final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
489 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
490 withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
491 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
493 writeToStore(store, TestModel.TEST_PATH, container);
495 final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
496 final NormalizedNode<?,?> expected = readStore(store, root);
498 final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
499 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
501 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
503 final NormalizedNode<?,?> actual = readStore(shard, root);
505 assertEquals("Root node", expected, actual);
507 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
511 public void testApplyState() throws Exception {
513 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
515 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
517 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
518 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
520 shard.underlyingActor().onReceiveCommand(applyState);
522 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
523 assertEquals("Applied state", node, actual);
525 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
529 public void testApplyStateWithCandidatePayload() throws Exception {
531 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
533 recoveryComplete.await(5, TimeUnit.SECONDS);
535 final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
536 final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
538 final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
539 DataTreeCandidatePayload.create(candidate)));
541 shard.underlyingActor().onReceiveCommand(applyState);
543 final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
544 assertEquals("Applied state", node, actual);
546 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
549 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
550 final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
551 testStore.setSchemaContext(SCHEMA_CONTEXT);
553 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
555 final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
557 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
558 SerializationUtils.serializeNormalizedNode(root),
559 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
563 private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
564 source.validate(mod);
565 final DataTreeCandidate candidate = source.prepare(mod);
566 source.commit(candidate);
567 return DataTreeCandidatePayload.create(candidate);
571 public void testDataTreeCandidateRecovery() throws Exception {
572 // Set up the InMemorySnapshotStore.
573 final DataTree source = setupInMemorySnapshotStore();
575 final DataTreeModification writeMod = source.takeSnapshot().newModification();
576 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
578 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
580 // Set up the InMemoryJournal.
581 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
583 final int nListEntries = 16;
584 final Set<Integer> listEntryKeys = new HashSet<>();
586 // Add some ModificationPayload entries
587 for (int i = 1; i <= nListEntries; i++) {
588 listEntryKeys.add(Integer.valueOf(i));
590 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
591 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
593 final DataTreeModification mod = source.takeSnapshot().newModification();
594 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
596 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
597 payloadForModification(source, mod)));
600 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
601 new ApplyJournalEntries(nListEntries));
603 testRecovery(listEntryKeys);
607 public void testModicationRecovery() throws Exception {
609 // Set up the InMemorySnapshotStore.
610 setupInMemorySnapshotStore();
612 // Set up the InMemoryJournal.
614 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
616 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
617 new WriteModification(TestModel.OUTER_LIST_PATH,
618 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
620 final int nListEntries = 16;
621 final Set<Integer> listEntryKeys = new HashSet<>();
623 // Add some ModificationPayload entries
624 for(int i = 1; i <= nListEntries; i++) {
625 listEntryKeys.add(Integer.valueOf(i));
626 final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
627 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
628 final Modification mod = new MergeModification(path,
629 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
630 InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
631 newModificationPayload(mod)));
634 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
635 new ApplyJournalEntries(nListEntries));
637 testRecovery(listEntryKeys);
640 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
641 final MutableCompositeModification compMod = new MutableCompositeModification();
642 for(final Modification mod: mods) {
643 compMod.addModification(mod);
646 return new ModificationPayload(compMod);
650 public void testConcurrentThreePhaseCommits() throws Throwable {
651 new ShardTestKit(getSystem()) {{
652 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
653 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
654 "testConcurrentThreePhaseCommits");
656 waitUntilLeader(shard);
658 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
660 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
662 final String transactionID1 = "tx1";
663 final MutableCompositeModification modification1 = new MutableCompositeModification();
664 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
665 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
667 final String transactionID2 = "tx2";
668 final MutableCompositeModification modification2 = new MutableCompositeModification();
669 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
670 TestModel.OUTER_LIST_PATH,
671 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
674 final String transactionID3 = "tx3";
675 final MutableCompositeModification modification3 = new MutableCompositeModification();
676 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
677 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
678 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
679 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
682 final long timeoutSec = 5;
683 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
684 final Timeout timeout = new Timeout(duration);
686 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
687 // by the ShardTransaction.
689 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
690 cohort1, modification1, true, false), getRef());
691 final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
692 expectMsgClass(duration, ReadyTransactionReply.class));
693 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
695 // Send the CanCommitTransaction message for the first Tx.
697 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
698 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
699 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
700 assertEquals("Can commit", true, canCommitReply.getCanCommit());
702 // Send the ForwardedReadyTransaction for the next 2 Tx's.
704 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
705 cohort2, modification2, true, false), getRef());
706 expectMsgClass(duration, ReadyTransactionReply.class);
708 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
709 cohort3, modification3, true, false), getRef());
710 expectMsgClass(duration, ReadyTransactionReply.class);
712 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
713 // processed after the first Tx completes.
715 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
716 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
718 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
719 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
721 // Send the CommitTransaction message for the first Tx. After it completes, it should
722 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
724 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
725 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
727 // Wait for the next 2 Tx's to complete.
729 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
730 final CountDownLatch commitLatch = new CountDownLatch(2);
732 class OnFutureComplete extends OnComplete<Object> {
733 private final Class<?> expRespType;
735 OnFutureComplete(final Class<?> expRespType) {
736 this.expRespType = expRespType;
740 public void onComplete(final Throwable error, final Object resp) {
742 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
745 assertEquals("Commit response type", expRespType, resp.getClass());
747 } catch (final Exception e) {
753 void onSuccess(final Object resp) throws Exception {
757 class OnCommitFutureComplete extends OnFutureComplete {
758 OnCommitFutureComplete() {
759 super(CommitTransactionReply.SERIALIZABLE_CLASS);
763 public void onComplete(final Throwable error, final Object resp) {
764 super.onComplete(error, resp);
765 commitLatch.countDown();
769 class OnCanCommitFutureComplete extends OnFutureComplete {
770 private final String transactionID;
772 OnCanCommitFutureComplete(final String transactionID) {
773 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
774 this.transactionID = transactionID;
778 void onSuccess(final Object resp) throws Exception {
779 final CanCommitTransactionReply canCommitReply =
780 CanCommitTransactionReply.fromSerializable(resp);
781 assertEquals("Can commit", true, canCommitReply.getCanCommit());
783 final Future<Object> commitFuture = Patterns.ask(shard,
784 new CommitTransaction(transactionID).toSerializable(), timeout);
785 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
789 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
790 getSystem().dispatcher());
792 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
793 getSystem().dispatcher());
795 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
797 if(caughtEx.get() != null) {
798 throw caughtEx.get();
801 assertEquals("Commits complete", true, done);
803 final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
804 inOrder.verify(cohort1).canCommit();
805 inOrder.verify(cohort1).preCommit();
806 inOrder.verify(cohort1).commit();
807 inOrder.verify(cohort2).canCommit();
808 inOrder.verify(cohort2).preCommit();
809 inOrder.verify(cohort2).commit();
810 inOrder.verify(cohort3).canCommit();
811 inOrder.verify(cohort3).preCommit();
812 inOrder.verify(cohort3).commit();
814 // Verify data in the data store.
816 verifyOuterListEntry(shard, 1);
818 verifyLastApplied(shard, 2);
820 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
824 private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
825 final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
826 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
829 private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
830 final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
831 final int messagesSent) {
832 final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
833 batched.addModification(new WriteModification(path, data));
834 batched.setReady(ready);
835 batched.setDoCommitOnReady(doCommitOnReady);
836 batched.setTotalMessagesSent(messagesSent);
841 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
842 new ShardTestKit(getSystem()) {{
843 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
844 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
845 "testBatchedModificationsWithNoCommitOnReady");
847 waitUntilLeader(shard);
849 final String transactionID = "tx";
850 final FiniteDuration duration = duration("5 seconds");
852 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
853 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
855 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
856 if(mockCohort.get() == null) {
857 mockCohort.set(createDelegatingMockCohort("cohort", actual));
860 return mockCohort.get();
864 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
866 // Send a BatchedModifications to start a transaction.
868 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
869 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
870 expectMsgClass(duration, BatchedModificationsReply.class);
872 // Send a couple more BatchedModifications.
874 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
875 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
876 expectMsgClass(duration, BatchedModificationsReply.class);
878 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
879 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
880 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
881 expectMsgClass(duration, ReadyTransactionReply.class);
883 // Send the CanCommitTransaction message.
885 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
886 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
887 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
888 assertEquals("Can commit", true, canCommitReply.getCanCommit());
890 // Send the CanCommitTransaction message.
892 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
893 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
895 final InOrder inOrder = inOrder(mockCohort.get());
896 inOrder.verify(mockCohort.get()).canCommit();
897 inOrder.verify(mockCohort.get()).preCommit();
898 inOrder.verify(mockCohort.get()).commit();
900 // Verify data in the data store.
902 verifyOuterListEntry(shard, 1);
904 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
909 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
910 new ShardTestKit(getSystem()) {{
911 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
912 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
913 "testBatchedModificationsWithCommitOnReady");
915 waitUntilLeader(shard);
917 final String transactionID = "tx";
918 final FiniteDuration duration = duration("5 seconds");
920 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
921 final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
923 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
924 if(mockCohort.get() == null) {
925 mockCohort.set(createDelegatingMockCohort("cohort", actual));
928 return mockCohort.get();
932 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
934 // Send a BatchedModifications to start a transaction.
936 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
937 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
938 expectMsgClass(duration, BatchedModificationsReply.class);
940 // Send a couple more BatchedModifications.
942 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
943 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
944 expectMsgClass(duration, BatchedModificationsReply.class);
946 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
947 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
948 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
950 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
952 final InOrder inOrder = inOrder(mockCohort.get());
953 inOrder.verify(mockCohort.get()).canCommit();
954 inOrder.verify(mockCohort.get()).preCommit();
955 inOrder.verify(mockCohort.get()).commit();
957 // Verify data in the data store.
959 verifyOuterListEntry(shard, 1);
961 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
965 @Test(expected=IllegalStateException.class)
966 public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
967 new ShardTestKit(getSystem()) {{
968 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
969 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
970 "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
972 waitUntilLeader(shard);
974 final String transactionID = "tx1";
975 final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
976 batched.setReady(true);
977 batched.setTotalMessagesSent(2);
979 shard.tell(batched, getRef());
981 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
983 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
985 if(failure != null) {
986 throw failure.cause();
991 @SuppressWarnings("unchecked")
992 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
993 final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
994 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
995 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
996 outerList.getValue() instanceof Iterable);
997 final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
998 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
999 entry instanceof MapEntryNode);
1000 final MapEntryNode mapEntry = (MapEntryNode)entry;
1001 final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1002 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1003 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1004 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1008 public void testBatchedModificationsOnTransactionChain() throws Throwable {
1009 new ShardTestKit(getSystem()) {{
1010 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1011 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1012 "testBatchedModificationsOnTransactionChain");
1014 waitUntilLeader(shard);
1016 final String transactionChainID = "txChain";
1017 final String transactionID1 = "tx1";
1018 final String transactionID2 = "tx2";
1020 final FiniteDuration duration = duration("5 seconds");
1022 // Send a BatchedModifications to start a chained write transaction and ready it.
1024 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1025 final YangInstanceIdentifier path = TestModel.TEST_PATH;
1026 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1027 containerNode, true, false, 1), getRef());
1028 expectMsgClass(duration, ReadyTransactionReply.class);
1030 // Create a read Tx on the same chain.
1032 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1033 transactionChainID).toSerializable(), getRef());
1035 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1037 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1038 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1039 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1041 // Commit the write transaction.
1043 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1044 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1045 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1046 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1048 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1049 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1051 // Verify data in the data store.
1053 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1054 assertEquals("Stored node", containerNode, actualNode);
1056 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1061 public void testOnBatchedModificationsWhenNotLeader() {
1062 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1063 new ShardTestKit(getSystem()) {{
1064 final Creator<Shard> creator = new Creator<Shard>() {
1065 private static final long serialVersionUID = 1L;
1068 public Shard create() throws Exception {
1069 return new Shard(shardID, Collections.<String,String>emptyMap(),
1070 newDatastoreContext(), SCHEMA_CONTEXT) {
1072 protected boolean isLeader() {
1073 return overrideLeaderCalls.get() ? false : super.isLeader();
1077 protected ActorSelection getLeader() {
1078 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1085 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1086 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1088 waitUntilLeader(shard);
1090 overrideLeaderCalls.set(true);
1092 final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1094 shard.tell(batched, ActorRef.noSender());
1096 expectMsgEquals(batched);
1098 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1103 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1104 new ShardTestKit(getSystem()) {{
1105 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1106 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1107 "testForwardedReadyTransactionWithImmediateCommit");
1109 waitUntilLeader(shard);
1111 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1113 final String transactionID = "tx1";
1114 final MutableCompositeModification modification = new MutableCompositeModification();
1115 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1116 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1117 TestModel.TEST_PATH, containerNode, modification);
1119 final FiniteDuration duration = duration("5 seconds");
1121 // Simulate the ForwardedReadyTransaction messages that would be sent
1122 // by the ShardTransaction.
1124 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1125 cohort, modification, true, true), getRef());
1127 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1129 final InOrder inOrder = inOrder(cohort);
1130 inOrder.verify(cohort).canCommit();
1131 inOrder.verify(cohort).preCommit();
1132 inOrder.verify(cohort).commit();
1134 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1135 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1137 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1142 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1143 new ShardTestKit(getSystem()) {{
1144 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1145 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1146 "testReadyLocalTransactionWithImmediateCommit");
1148 waitUntilLeader(shard);
1150 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1152 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1154 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1155 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1156 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1157 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1159 final String txId = "tx1";
1160 modification.ready();
1161 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1163 shard.tell(readyMessage, getRef());
1165 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1167 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1168 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1170 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1175 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1176 new ShardTestKit(getSystem()) {{
1177 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1178 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179 "testReadyLocalTransactionWithThreePhaseCommit");
1181 waitUntilLeader(shard);
1183 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1185 final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1187 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1188 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1189 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1190 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1192 final String txId = "tx1";
1193 modification.ready();
1194 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1196 shard.tell(readyMessage, getRef());
1198 expectMsgClass(ReadyTransactionReply.class);
1200 // Send the CanCommitTransaction message.
1202 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1203 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1204 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1205 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1207 // Send the CanCommitTransaction message.
1209 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1210 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1212 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1213 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1215 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1220 public void testCommitWithPersistenceDisabled() throws Throwable {
1221 dataStoreContextBuilder.persistent(false);
1222 new ShardTestKit(getSystem()) {{
1223 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1224 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1225 "testCommitWithPersistenceDisabled");
1227 waitUntilLeader(shard);
1229 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1231 // Setup a simulated transactions with a mock cohort.
1233 final String transactionID = "tx";
1234 final MutableCompositeModification modification = new MutableCompositeModification();
1235 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1236 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1237 TestModel.TEST_PATH, containerNode, modification);
1239 final FiniteDuration duration = duration("5 seconds");
1241 // Simulate the ForwardedReadyTransaction messages that would be sent
1242 // by the ShardTransaction.
1244 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1245 cohort, modification, true, false), getRef());
1246 expectMsgClass(duration, ReadyTransactionReply.class);
1248 // Send the CanCommitTransaction message.
1250 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1251 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1252 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1253 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1255 // Send the CanCommitTransaction message.
1257 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1258 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1260 final InOrder inOrder = inOrder(cohort);
1261 inOrder.verify(cohort).canCommit();
1262 inOrder.verify(cohort).preCommit();
1263 inOrder.verify(cohort).commit();
1265 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1266 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1268 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1272 private static DataTreeCandidateTip mockCandidate(final String name) {
1273 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1274 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1275 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1276 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1277 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1278 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1279 return mockCandidate;
1282 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1283 final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1284 final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1285 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1286 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1287 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1288 return mockCandidate;
1292 public void testCommitWhenTransactionHasNoModifications(){
1293 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1294 // but here that need not happen
1295 new ShardTestKit(getSystem()) {
1297 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1298 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1299 "testCommitWhenTransactionHasNoModifications");
1301 waitUntilLeader(shard);
1303 final String transactionID = "tx1";
1304 final MutableCompositeModification modification = new MutableCompositeModification();
1305 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1306 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1307 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1308 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1309 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1311 final FiniteDuration duration = duration("5 seconds");
1313 // Simulate the ForwardedReadyTransaction messages that would be sent
1314 // by the ShardTransaction.
1316 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1317 cohort, modification, true, false), getRef());
1318 expectMsgClass(duration, ReadyTransactionReply.class);
1320 // Send the CanCommitTransaction message.
1322 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1323 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1324 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1325 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1327 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1328 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1330 final InOrder inOrder = inOrder(cohort);
1331 inOrder.verify(cohort).canCommit();
1332 inOrder.verify(cohort).preCommit();
1333 inOrder.verify(cohort).commit();
1335 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1336 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1338 // Use MBean for verification
1339 // Committed transaction count should increase as usual
1340 assertEquals(1,shardStats.getCommittedTransactionsCount());
1342 // Commit index should not advance because this does not go into the journal
1343 assertEquals(-1, shardStats.getCommitIndex());
1345 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1352 public void testCommitWhenTransactionHasModifications(){
1353 new ShardTestKit(getSystem()) {
1355 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1356 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1357 "testCommitWhenTransactionHasModifications");
1359 waitUntilLeader(shard);
1361 final String transactionID = "tx1";
1362 final MutableCompositeModification modification = new MutableCompositeModification();
1363 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1364 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1365 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1366 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1367 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1368 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1370 final FiniteDuration duration = duration("5 seconds");
1372 // Simulate the ForwardedReadyTransaction messages that would be sent
1373 // by the ShardTransaction.
1375 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1376 cohort, modification, true, false), getRef());
1377 expectMsgClass(duration, ReadyTransactionReply.class);
1379 // Send the CanCommitTransaction message.
1381 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1382 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1383 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1384 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1386 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1387 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1389 final InOrder inOrder = inOrder(cohort);
1390 inOrder.verify(cohort).canCommit();
1391 inOrder.verify(cohort).preCommit();
1392 inOrder.verify(cohort).commit();
1394 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1395 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1397 // Use MBean for verification
1398 // Committed transaction count should increase as usual
1399 assertEquals(1, shardStats.getCommittedTransactionsCount());
1401 // Commit index should advance as we do not have an empty modification
1402 assertEquals(0, shardStats.getCommitIndex());
1404 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1411 public void testCommitPhaseFailure() throws Throwable {
1412 new ShardTestKit(getSystem()) {{
1413 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1414 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1415 "testCommitPhaseFailure");
1417 waitUntilLeader(shard);
1419 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1422 final String transactionID1 = "tx1";
1423 final MutableCompositeModification modification1 = new MutableCompositeModification();
1424 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1425 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1426 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1427 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1428 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1430 final String transactionID2 = "tx2";
1431 final MutableCompositeModification modification2 = new MutableCompositeModification();
1432 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1433 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1435 final FiniteDuration duration = duration("5 seconds");
1436 final Timeout timeout = new Timeout(duration);
1438 // Simulate the ForwardedReadyTransaction messages that would be sent
1439 // by the ShardTransaction.
1441 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1442 cohort1, modification1, true, false), getRef());
1443 expectMsgClass(duration, ReadyTransactionReply.class);
1445 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1446 cohort2, modification2, true, false), getRef());
1447 expectMsgClass(duration, ReadyTransactionReply.class);
1449 // Send the CanCommitTransaction message for the first Tx.
1451 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1452 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1453 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1454 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1456 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1457 // processed after the first Tx completes.
1459 final Future<Object> canCommitFuture = Patterns.ask(shard,
1460 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1462 // Send the CommitTransaction message for the first Tx. This should send back an error
1463 // and trigger the 2nd Tx to proceed.
1465 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1466 expectMsgClass(duration, akka.actor.Status.Failure.class);
1468 // Wait for the 2nd Tx to complete the canCommit phase.
1470 final CountDownLatch latch = new CountDownLatch(1);
1471 canCommitFuture.onComplete(new OnComplete<Object>() {
1473 public void onComplete(final Throwable t, final Object resp) {
1476 }, getSystem().dispatcher());
1478 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1480 final InOrder inOrder = inOrder(cohort1, cohort2);
1481 inOrder.verify(cohort1).canCommit();
1482 inOrder.verify(cohort1).preCommit();
1483 inOrder.verify(cohort1).commit();
1484 inOrder.verify(cohort2).canCommit();
1486 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1491 public void testPreCommitPhaseFailure() throws Throwable {
1492 new ShardTestKit(getSystem()) {{
1493 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1494 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1495 "testPreCommitPhaseFailure");
1497 waitUntilLeader(shard);
1499 final String transactionID1 = "tx1";
1500 final MutableCompositeModification modification1 = new MutableCompositeModification();
1501 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1502 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1503 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1505 final String transactionID2 = "tx2";
1506 final MutableCompositeModification modification2 = new MutableCompositeModification();
1507 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1508 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1510 final FiniteDuration duration = duration("5 seconds");
1511 final Timeout timeout = new Timeout(duration);
1513 // Simulate the ForwardedReadyTransaction messages that would be sent
1514 // by the ShardTransaction.
1516 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1517 cohort1, modification1, true, false), getRef());
1518 expectMsgClass(duration, ReadyTransactionReply.class);
1520 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1521 cohort2, modification2, true, false), getRef());
1522 expectMsgClass(duration, ReadyTransactionReply.class);
1524 // Send the CanCommitTransaction message for the first Tx.
1526 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1527 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1528 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1529 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1531 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1532 // processed after the first Tx completes.
1534 final Future<Object> canCommitFuture = Patterns.ask(shard,
1535 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1537 // Send the CommitTransaction message for the first Tx. This should send back an error
1538 // and trigger the 2nd Tx to proceed.
1540 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1541 expectMsgClass(duration, akka.actor.Status.Failure.class);
1543 // Wait for the 2nd Tx to complete the canCommit phase.
1545 final CountDownLatch latch = new CountDownLatch(1);
1546 canCommitFuture.onComplete(new OnComplete<Object>() {
1548 public void onComplete(final Throwable t, final Object resp) {
1551 }, getSystem().dispatcher());
1553 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1555 final InOrder inOrder = inOrder(cohort1, cohort2);
1556 inOrder.verify(cohort1).canCommit();
1557 inOrder.verify(cohort1).preCommit();
1558 inOrder.verify(cohort2).canCommit();
1560 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1565 public void testCanCommitPhaseFailure() throws Throwable {
1566 new ShardTestKit(getSystem()) {{
1567 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1568 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1569 "testCanCommitPhaseFailure");
1571 waitUntilLeader(shard);
1573 final FiniteDuration duration = duration("5 seconds");
1575 final String transactionID1 = "tx1";
1576 final MutableCompositeModification modification = new MutableCompositeModification();
1577 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1578 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1580 // Simulate the ForwardedReadyTransaction messages that would be sent
1581 // by the ShardTransaction.
1583 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1584 cohort, modification, true, false), getRef());
1585 expectMsgClass(duration, ReadyTransactionReply.class);
1587 // Send the CanCommitTransaction message.
1589 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1590 expectMsgClass(duration, akka.actor.Status.Failure.class);
1592 // Send another can commit to ensure the failed one got cleaned up.
1596 final String transactionID2 = "tx2";
1597 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1599 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1600 cohort, modification, true, false), getRef());
1601 expectMsgClass(duration, ReadyTransactionReply.class);
1603 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1604 final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1605 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1606 assertEquals("getCanCommit", true, reply.getCanCommit());
1608 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1613 public void testCanCommitPhaseFalseResponse() throws Throwable {
1614 new ShardTestKit(getSystem()) {{
1615 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1616 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1617 "testCanCommitPhaseFalseResponse");
1619 waitUntilLeader(shard);
1621 final FiniteDuration duration = duration("5 seconds");
1623 final String transactionID1 = "tx1";
1624 final MutableCompositeModification modification = new MutableCompositeModification();
1625 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1626 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1628 // Simulate the ForwardedReadyTransaction messages that would be sent
1629 // by the ShardTransaction.
1631 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1632 cohort, modification, true, false), getRef());
1633 expectMsgClass(duration, ReadyTransactionReply.class);
1635 // Send the CanCommitTransaction message.
1637 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1638 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1639 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1640 assertEquals("getCanCommit", false, reply.getCanCommit());
1642 // Send another can commit to ensure the failed one got cleaned up.
1646 final String transactionID2 = "tx2";
1647 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1649 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1650 cohort, modification, true, false), getRef());
1651 expectMsgClass(duration, ReadyTransactionReply.class);
1653 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1654 reply = CanCommitTransactionReply.fromSerializable(
1655 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1656 assertEquals("getCanCommit", true, reply.getCanCommit());
1658 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1663 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1664 new ShardTestKit(getSystem()) {{
1665 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1666 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1667 "testImmediateCommitWithCanCommitPhaseFailure");
1669 waitUntilLeader(shard);
1671 final FiniteDuration duration = duration("5 seconds");
1673 final String transactionID1 = "tx1";
1674 final MutableCompositeModification modification = new MutableCompositeModification();
1675 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1676 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1678 // Simulate the ForwardedReadyTransaction messages that would be sent
1679 // by the ShardTransaction.
1681 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1682 cohort, modification, true, true), getRef());
1684 expectMsgClass(duration, akka.actor.Status.Failure.class);
1686 // Send another can commit to ensure the failed one got cleaned up.
1690 final String transactionID2 = "tx2";
1691 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1692 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1693 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1694 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1695 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1696 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1697 doReturn(candidateRoot).when(candidate).getRootNode();
1698 doReturn(candidate).when(cohort).getCandidate();
1700 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1701 cohort, modification, true, true), getRef());
1703 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1705 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1710 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1711 new ShardTestKit(getSystem()) {{
1712 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1713 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1714 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1716 waitUntilLeader(shard);
1718 final FiniteDuration duration = duration("5 seconds");
1720 final String transactionID = "tx1";
1721 final MutableCompositeModification modification = new MutableCompositeModification();
1722 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1723 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1725 // Simulate the ForwardedReadyTransaction messages that would be sent
1726 // by the ShardTransaction.
1728 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1729 cohort, modification, true, true), getRef());
1731 expectMsgClass(duration, akka.actor.Status.Failure.class);
1733 // Send another can commit to ensure the failed one got cleaned up.
1737 final String transactionID2 = "tx2";
1738 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1739 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1740 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1741 final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1742 final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1743 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1744 doReturn(candidateRoot).when(candidate).getRootNode();
1745 doReturn(candidate).when(cohort).getCandidate();
1747 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1748 cohort, modification, true, true), getRef());
1750 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1752 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1757 public void testAbortBeforeFinishCommit() throws Throwable {
1758 new ShardTestKit(getSystem()) {{
1759 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1760 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1761 "testAbortBeforeFinishCommit");
1763 waitUntilLeader(shard);
1765 final FiniteDuration duration = duration("5 seconds");
1766 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1768 final String transactionID = "tx1";
1769 final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1770 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1772 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1773 final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1775 // Simulate an AbortTransaction message occurring during replication, after
1776 // persisting and before finishing the commit to the in-memory store.
1777 // We have no followers so due to optimizations in the RaftActor, it does not
1778 // attempt replication and thus we can't send an AbortTransaction message b/c
1779 // it would be processed too late after CommitTransaction completes. So we'll
1780 // simulate an AbortTransaction message occurring during replication by calling
1781 // the shard directly.
1783 shard.underlyingActor().doAbortTransaction(transactionID, null);
1785 return preCommitFuture;
1789 final MutableCompositeModification modification = new MutableCompositeModification();
1790 final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1791 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1792 modification, preCommit);
1794 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1795 cohort, modification, true, false), getRef());
1796 expectMsgClass(duration, ReadyTransactionReply.class);
1798 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1799 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1800 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1801 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1803 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1804 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1806 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1808 // Since we're simulating an abort occurring during replication and before finish commit,
1809 // the data should still get written to the in-memory store since we've gotten past
1810 // canCommit and preCommit and persisted the data.
1811 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1813 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1818 public void testTransactionCommitTimeout() throws Throwable {
1819 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1821 new ShardTestKit(getSystem()) {{
1822 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1823 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1824 "testTransactionCommitTimeout");
1826 waitUntilLeader(shard);
1828 final FiniteDuration duration = duration("5 seconds");
1830 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1832 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1833 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1834 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1836 // Create 1st Tx - will timeout
1838 final String transactionID1 = "tx1";
1839 final MutableCompositeModification modification1 = new MutableCompositeModification();
1840 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1841 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1842 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1843 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1848 final String transactionID2 = "tx3";
1849 final MutableCompositeModification modification2 = new MutableCompositeModification();
1850 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1851 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1852 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1854 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1859 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1860 cohort1, modification1, true, false), getRef());
1861 expectMsgClass(duration, ReadyTransactionReply.class);
1863 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1864 cohort2, modification2, true, false), getRef());
1865 expectMsgClass(duration, ReadyTransactionReply.class);
1867 // canCommit 1st Tx. We don't send the commit so it should timeout.
1869 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1870 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1872 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1874 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1875 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1877 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1879 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1880 expectMsgClass(duration, akka.actor.Status.Failure.class);
1882 // Commit the 2nd Tx.
1884 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1885 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1887 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1888 assertNotNull(listNodePath + " not found", node);
1890 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1895 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1896 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1898 new ShardTestKit(getSystem()) {{
1899 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1900 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1901 "testTransactionCommitQueueCapacityExceeded");
1903 waitUntilLeader(shard);
1905 final FiniteDuration duration = duration("5 seconds");
1907 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1909 final String transactionID1 = "tx1";
1910 final MutableCompositeModification modification1 = new MutableCompositeModification();
1911 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1912 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1914 final String transactionID2 = "tx2";
1915 final MutableCompositeModification modification2 = new MutableCompositeModification();
1916 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1917 TestModel.OUTER_LIST_PATH,
1918 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1921 final String transactionID3 = "tx3";
1922 final MutableCompositeModification modification3 = new MutableCompositeModification();
1923 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1924 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1928 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1929 cohort1, modification1, true, false), getRef());
1930 expectMsgClass(duration, ReadyTransactionReply.class);
1932 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1933 cohort2, modification2, true, false), getRef());
1934 expectMsgClass(duration, ReadyTransactionReply.class);
1936 // The 3rd Tx should exceed queue capacity and fail.
1938 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1939 cohort3, modification3, true, false), getRef());
1940 expectMsgClass(duration, akka.actor.Status.Failure.class);
1942 // canCommit 1st Tx.
1944 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1945 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1947 // canCommit the 2nd Tx - it should get queued.
1949 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1951 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1953 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1954 expectMsgClass(duration, akka.actor.Status.Failure.class);
1956 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1961 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1962 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1964 new ShardTestKit(getSystem()) {{
1965 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1966 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1967 "testTransactionCommitWithPriorExpiredCohortEntries");
1969 waitUntilLeader(shard);
1971 final FiniteDuration duration = duration("5 seconds");
1973 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1975 final String transactionID1 = "tx1";
1976 final MutableCompositeModification modification1 = new MutableCompositeModification();
1977 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1978 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1980 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1981 cohort1, modification1, true, false), getRef());
1982 expectMsgClass(duration, ReadyTransactionReply.class);
1984 final String transactionID2 = "tx2";
1985 final MutableCompositeModification modification2 = new MutableCompositeModification();
1986 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1987 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1989 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1990 cohort2, modification2, true, false), getRef());
1991 expectMsgClass(duration, ReadyTransactionReply.class);
1993 final String transactionID3 = "tx3";
1994 final MutableCompositeModification modification3 = new MutableCompositeModification();
1995 final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1996 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1998 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1999 cohort3, modification3, true, false), getRef());
2000 expectMsgClass(duration, ReadyTransactionReply.class);
2002 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2003 // should expire from the queue and the last one should be processed.
2005 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2006 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2008 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2013 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2014 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2016 new ShardTestKit(getSystem()) {{
2017 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2018 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2019 "testTransactionCommitWithSubsequentExpiredCohortEntry");
2021 waitUntilLeader(shard);
2023 final FiniteDuration duration = duration("5 seconds");
2025 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2027 final String transactionID1 = "tx1";
2028 final MutableCompositeModification modification1 = new MutableCompositeModification();
2029 final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2030 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2032 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2033 cohort1, modification1, true, false), getRef());
2034 expectMsgClass(duration, ReadyTransactionReply.class);
2036 // CanCommit the first one so it's the current in-progress CohortEntry.
2038 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2039 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2041 // Ready the second Tx.
2043 final String transactionID2 = "tx2";
2044 final MutableCompositeModification modification2 = new MutableCompositeModification();
2045 final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2046 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2048 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2049 cohort2, modification2, true, false), getRef());
2050 expectMsgClass(duration, ReadyTransactionReply.class);
2052 // Ready the third Tx.
2054 final String transactionID3 = "tx3";
2055 final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2056 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2057 .apply(modification3);
2058 modification3.ready();
2059 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2061 shard.tell(readyMessage, getRef());
2063 // Commit the first Tx. After completing, the second should expire from the queue and the third
2066 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2067 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2069 // Expect commit reply from the third Tx.
2071 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2073 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2074 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2076 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2081 public void testCanCommitBeforeReadyFailure() throws Throwable {
2082 new ShardTestKit(getSystem()) {{
2083 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2084 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2085 "testCanCommitBeforeReadyFailure");
2087 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2088 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2090 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2095 public void testAbortTransaction() throws Throwable {
2096 new ShardTestKit(getSystem()) {{
2097 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2098 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2099 "testAbortTransaction");
2101 waitUntilLeader(shard);
2103 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2105 final String transactionID1 = "tx1";
2106 final MutableCompositeModification modification1 = new MutableCompositeModification();
2107 final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2108 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2109 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2111 final String transactionID2 = "tx2";
2112 final MutableCompositeModification modification2 = new MutableCompositeModification();
2113 final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2114 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2116 final FiniteDuration duration = duration("5 seconds");
2117 final Timeout timeout = new Timeout(duration);
2119 // Simulate the ForwardedReadyTransaction messages that would be sent
2120 // by the ShardTransaction.
2122 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2123 cohort1, modification1, true, false), getRef());
2124 expectMsgClass(duration, ReadyTransactionReply.class);
2126 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2127 cohort2, modification2, true, false), getRef());
2128 expectMsgClass(duration, ReadyTransactionReply.class);
2130 // Send the CanCommitTransaction message for the first Tx.
2132 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2133 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2134 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2135 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2137 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2138 // processed after the first Tx completes.
2140 final Future<Object> canCommitFuture = Patterns.ask(shard,
2141 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2143 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2146 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2147 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2149 // Wait for the 2nd Tx to complete the canCommit phase.
2151 Await.ready(canCommitFuture, duration);
2153 final InOrder inOrder = inOrder(cohort1, cohort2);
2154 inOrder.verify(cohort1).canCommit();
2155 inOrder.verify(cohort2).canCommit();
2157 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2162 public void testCreateSnapshot() throws Exception {
2163 testCreateSnapshot(true, "testCreateSnapshot");
2167 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2168 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2171 @SuppressWarnings("serial")
2172 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2174 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2176 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2177 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2178 TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2183 public void saveSnapshot(final Object o) {
2184 savedSnapshot.set(o);
2185 super.saveSnapshot(o);
2189 dataStoreContextBuilder.persistent(persistent);
2191 new ShardTestKit(getSystem()) {{
2192 class TestShard extends Shard {
2194 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2195 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2196 super(name, peerAddresses, datastoreContext, schemaContext);
2197 setPersistence(new TestPersistentDataProvider(super.persistence()));
2201 public void handleCommand(final Object message) {
2202 super.handleCommand(message);
2204 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2205 latch.get().countDown();
2210 public RaftActorContext getRaftActorContext() {
2211 return super.getRaftActorContext();
2215 final Creator<Shard> creator = new Creator<Shard>() {
2217 public Shard create() throws Exception {
2218 return new TestShard(shardID, Collections.<String,String>emptyMap(),
2219 newDatastoreContext(), SCHEMA_CONTEXT);
2223 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2224 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2226 waitUntilLeader(shard);
2228 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2230 final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2232 // Trigger creation of a snapshot by ensuring
2233 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2234 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2236 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2238 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2239 savedSnapshot.get() instanceof Snapshot);
2241 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2243 latch.set(new CountDownLatch(1));
2244 savedSnapshot.set(null);
2246 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2248 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2250 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2251 savedSnapshot.get() instanceof Snapshot);
2253 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2255 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2258 private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2260 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2261 assertEquals("Root node", expectedRoot, actual);
2267 * This test simply verifies that the applySnapShot logic will work
2268 * @throws ReadFailedException
2269 * @throws DataValidationFailedException
2272 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2273 final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2274 store.setSchemaContext(SCHEMA_CONTEXT);
2276 final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2277 putTransaction.write(TestModel.TEST_PATH,
2278 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2279 commitTransaction(store, putTransaction);
2282 final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2284 final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2286 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2287 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2289 commitTransaction(store, writeTransaction);
2291 final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2293 assertEquals(expected, actual);
2297 public void testRecoveryApplicable(){
2299 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2300 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2302 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2303 persistentContext, SCHEMA_CONTEXT);
2305 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2306 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2308 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2309 nonPersistentContext, SCHEMA_CONTEXT);
2311 new ShardTestKit(getSystem()) {{
2312 final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2313 persistentProps, "testPersistence1");
2315 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2317 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2319 final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2320 nonPersistentProps, "testPersistence2");
2322 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2324 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2331 public void testOnDatastoreContext() {
2332 new ShardTestKit(getSystem()) {{
2333 dataStoreContextBuilder.persistent(true);
2335 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2337 assertEquals("isRecoveryApplicable", true,
2338 shard.underlyingActor().persistence().isRecoveryApplicable());
2340 waitUntilLeader(shard);
2342 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2344 assertEquals("isRecoveryApplicable", false,
2345 shard.underlyingActor().persistence().isRecoveryApplicable());
2347 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2349 assertEquals("isRecoveryApplicable", true,
2350 shard.underlyingActor().persistence().isRecoveryApplicable());
2352 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2357 public void testRegisterRoleChangeListener() throws Exception {
2358 new ShardTestKit(getSystem()) {
2360 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2361 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2362 "testRegisterRoleChangeListener");
2364 waitUntilLeader(shard);
2366 final TestActorRef<MessageCollectorActor> listener =
2367 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2369 shard.tell(new RegisterRoleChangeListener(), listener);
2371 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2373 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2374 ShardLeaderStateChanged.class);
2375 assertEquals("getLocalShardDataTree present", true,
2376 leaderStateChanged.getLocalShardDataTree().isPresent());
2377 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2378 leaderStateChanged.getLocalShardDataTree().get());
2380 MessageCollectorActor.clearMessages(listener);
2382 // Force a leader change
2384 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2386 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2387 ShardLeaderStateChanged.class);
2388 assertEquals("getLocalShardDataTree present", false,
2389 leaderStateChanged.getLocalShardDataTree().isPresent());
2391 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2397 public void testFollowerInitialSyncStatus() throws Exception {
2398 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2399 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2400 "testFollowerInitialSyncStatus");
2402 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2404 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2406 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2408 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2410 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2413 private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2414 modification.ready();
2415 store.validate(modification);
2416 store.commit(store.prepare(modification));