1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
59 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.Modification;
62 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
63 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
64 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
65 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
67 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
70 import org.opendaylight.controller.cluster.raft.Snapshot;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
76 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
78 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
81 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
82 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
83 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
86 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
87 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
89 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
90 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
91 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
94 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
96 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
97 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
98 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
99 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
100 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
101 import scala.concurrent.Await;
102 import scala.concurrent.Future;
103 import scala.concurrent.duration.FiniteDuration;
105 public class ShardTest extends AbstractActorTest {
107 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
109 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
111 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
112 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
114 private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
115 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
116 shardHeartbeatIntervalInMillis(100).build();
119 public void setUp() {
120 InMemorySnapshotStore.clear();
121 InMemoryJournal.clear();
125 public void tearDown() {
126 InMemorySnapshotStore.clear();
127 InMemoryJournal.clear();
130 private Props newShardProps() {
131 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
132 dataStoreContext, SCHEMA_CONTEXT);
136 public void testRegisterChangeListener() throws Exception {
137 new ShardTestKit(getSystem()) {{
138 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
139 newShardProps(), "testRegisterChangeListener");
141 waitUntilLeader(shard);
143 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145 MockDataChangeListener listener = new MockDataChangeListener(1);
146 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
147 "testRegisterChangeListener-DataChangeListener");
149 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
150 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
152 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
153 RegisterChangeListenerReply.class);
154 String replyPath = reply.getListenerRegistrationPath().toString();
155 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
156 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158 YangInstanceIdentifier path = TestModel.TEST_PATH;
159 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161 listener.waitForChangeEvents(path);
163 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
168 @SuppressWarnings("serial")
170 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
171 // This test tests the timing window in which a change listener is registered before the
172 // shard becomes the leader. We verify that the listener is registered and notified of the
173 // existing data when the shard becomes the leader.
174 new ShardTestKit(getSystem()) {{
175 // For this test, we want to send the RegisterChangeListener message after the shard
176 // has recovered from persistence and before it becomes the leader. So we subclass
177 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
178 // we know that the shard has been initialized to a follower and has started the
179 // election process. The following 2 CountDownLatches are used to coordinate the
180 // ElectionTimeout with the sending of the RegisterChangeListener message.
181 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
182 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
183 Creator<Shard> creator = new Creator<Shard>() {
184 boolean firstElectionTimeout = true;
187 public Shard create() throws Exception {
188 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
189 dataStoreContext, SCHEMA_CONTEXT) {
191 public void onReceiveCommand(final Object message) throws Exception {
192 if(message instanceof ElectionTimeout && firstElectionTimeout) {
193 // Got the first ElectionTimeout. We don't forward it to the
194 // base Shard yet until we've sent the RegisterChangeListener
195 // message. So we signal the onFirstElectionTimeout latch to tell
196 // the main thread to send the RegisterChangeListener message and
197 // start a thread to wait on the onChangeListenerRegistered latch,
198 // which the main thread signals after it has sent the message.
199 // After the onChangeListenerRegistered is triggered, we send the
200 // original ElectionTimeout message to proceed with the election.
201 firstElectionTimeout = false;
202 final ActorRef self = getSelf();
206 Uninterruptibles.awaitUninterruptibly(
207 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
208 self.tell(message, self);
212 onFirstElectionTimeout.countDown();
214 super.onReceiveCommand(message);
221 MockDataChangeListener listener = new MockDataChangeListener(1);
222 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
223 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
225 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
226 Props.create(new DelegatingShardCreator(creator)),
227 "testRegisterChangeListenerWhenNotLeaderInitially");
229 // Write initial data into the in-memory store.
230 YangInstanceIdentifier path = TestModel.TEST_PATH;
231 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
233 // Wait until the shard receives the first ElectionTimeout message.
234 assertEquals("Got first ElectionTimeout", true,
235 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
237 // Now send the RegisterChangeListener and wait for the reply.
238 shard.tell(new RegisterChangeListener(path, dclActor.path(),
239 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
241 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
242 RegisterChangeListenerReply.class);
243 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
245 // Sanity check - verify the shard is not the leader yet.
246 shard.tell(new FindLeader(), getRef());
247 FindLeaderReply findLeadeReply =
248 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
249 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
251 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
252 // with the election process.
253 onChangeListenerRegistered.countDown();
255 // Wait for the shard to become the leader and notify our listener with the existing
256 // data in the store.
257 listener.waitForChangeEvents(path);
259 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
260 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
265 public void testCreateTransaction(){
266 new ShardTestKit(getSystem()) {{
267 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
269 waitUntilLeader(shard);
271 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
273 shard.tell(new CreateTransaction("txn-1",
274 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
276 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
277 CreateTransactionReply.class);
279 String path = reply.getTransactionActorPath().toString();
280 assertTrue("Unexpected transaction path " + path,
281 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
283 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
288 public void testCreateTransactionOnChain(){
289 new ShardTestKit(getSystem()) {{
290 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
292 waitUntilLeader(shard);
294 shard.tell(new CreateTransaction("txn-1",
295 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
298 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
299 CreateTransactionReply.class);
301 String path = reply.getTransactionActorPath().toString();
302 assertTrue("Unexpected transaction path " + path,
303 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
305 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
309 @SuppressWarnings("serial")
311 public void testPeerAddressResolved() throws Exception {
312 new ShardTestKit(getSystem()) {{
313 final CountDownLatch recoveryComplete = new CountDownLatch(1);
314 class TestShard extends Shard {
316 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
317 dataStoreContext, SCHEMA_CONTEXT);
320 Map<String, String> getPeerAddresses() {
321 return getRaftActorContext().getPeerAddresses();
325 protected void onRecoveryComplete() {
327 super.onRecoveryComplete();
329 recoveryComplete.countDown();
334 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
335 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
337 public TestShard create() throws Exception {
338 return new TestShard();
340 })), "testPeerAddressResolved");
342 //waitUntilLeader(shard);
343 assertEquals("Recovery complete", true,
344 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
346 String address = "akka://foobar";
347 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
349 assertEquals("getPeerAddresses", address,
350 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
352 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
357 public void testApplySnapshot() throws Exception {
358 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
359 "testApplySnapshot");
361 NormalizedNodeToNodeCodec codec =
362 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
364 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
366 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
367 NormalizedNode<?,?> expected = readStore(shard, root);
369 NormalizedNodeMessages.Container encode = codec.encode(expected);
371 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
372 encode.getNormalizedNode().toByteString().toByteArray(),
373 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
375 shard.underlyingActor().onReceiveCommand(applySnapshot);
377 NormalizedNode<?,?> actual = readStore(shard, root);
379 assertEquals(expected, actual);
381 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
385 public void testApplyState() throws Exception {
387 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
389 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
391 MutableCompositeModification compMod = new MutableCompositeModification();
392 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
393 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
394 ApplyState applyState = new ApplyState(null, "test",
395 new ReplicatedLogImplEntry(1, 2, payload));
397 shard.underlyingActor().onReceiveCommand(applyState);
399 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
400 assertEquals("Applied state", node, actual);
402 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
405 @SuppressWarnings("serial")
407 public void testRecovery() throws Exception {
409 // Set up the InMemorySnapshotStore.
411 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
412 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
414 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
415 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
416 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
417 commitCohort.preCommit().get();
418 commitCohort.commit().get();
420 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
421 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
423 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
424 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
426 getNormalizedNode().toByteString().toByteArray(),
427 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
429 // Set up the InMemoryJournal.
431 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
432 new WriteModification(TestModel.OUTER_LIST_PATH,
433 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
436 int nListEntries = 16;
437 Set<Integer> listEntryKeys = new HashSet<>();
438 for(int i = 1; i <= nListEntries-5; i++) {
439 listEntryKeys.add(Integer.valueOf(i));
440 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
441 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
442 Modification mod = new MergeModification(path,
443 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
445 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
449 // Add some of the new CompositeModificationByteStringPayload
450 for(int i = 11; i <= nListEntries; i++) {
451 listEntryKeys.add(Integer.valueOf(i));
452 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
453 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
454 Modification mod = new MergeModification(path,
455 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
457 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
458 newByteStringPayload(mod)));
462 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
463 new ApplyLogEntries(nListEntries));
465 // Create the actor and wait for recovery complete.
467 final CountDownLatch recoveryComplete = new CountDownLatch(1);
469 Creator<Shard> creator = new Creator<Shard>() {
471 public Shard create() throws Exception {
472 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
473 dataStoreContext, SCHEMA_CONTEXT) {
475 protected void onRecoveryComplete() {
477 super.onRecoveryComplete();
479 recoveryComplete.countDown();
486 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
487 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
489 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
491 // Verify data in the data store.
493 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
494 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
495 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
496 outerList.getValue() instanceof Iterable);
497 for(Object entry: (Iterable<?>) outerList.getValue()) {
498 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
499 entry instanceof MapEntryNode);
500 MapEntryNode mapEntry = (MapEntryNode)entry;
501 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
502 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
503 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
504 Object value = idLeaf.get().getValue();
505 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
506 listEntryKeys.remove(value));
509 if(!listEntryKeys.isEmpty()) {
510 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
514 assertEquals("Last log index", nListEntries,
515 shard.underlyingActor().getShardMBean().getLastLogIndex());
516 assertEquals("Commit index", nListEntries,
517 shard.underlyingActor().getShardMBean().getCommitIndex());
518 assertEquals("Last applied", nListEntries,
519 shard.underlyingActor().getShardMBean().getLastApplied());
521 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
524 private CompositeModificationPayload newPayload(final Modification... mods) {
525 MutableCompositeModification compMod = new MutableCompositeModification();
526 for(Modification mod: mods) {
527 compMod.addModification(mod);
530 return new CompositeModificationPayload(compMod.toSerializable());
533 private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
534 MutableCompositeModification compMod = new MutableCompositeModification();
535 for(Modification mod: mods) {
536 compMod.addModification(mod);
539 return new CompositeModificationByteStringPayload(compMod.toSerializable());
543 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
544 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
545 final MutableCompositeModification modification) {
546 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
549 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
550 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
551 final MutableCompositeModification modification,
552 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
554 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
555 tx.write(path, data);
556 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
557 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
559 doAnswer(new Answer<ListenableFuture<Boolean>>() {
561 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
562 return realCohort.canCommit();
564 }).when(cohort).canCommit();
566 doAnswer(new Answer<ListenableFuture<Void>>() {
568 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
569 if(preCommit != null) {
570 return preCommit.apply(realCohort);
572 return realCohort.preCommit();
575 }).when(cohort).preCommit();
577 doAnswer(new Answer<ListenableFuture<Void>>() {
579 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
580 return realCohort.commit();
582 }).when(cohort).commit();
584 doAnswer(new Answer<ListenableFuture<Void>>() {
586 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
587 return realCohort.abort();
589 }).when(cohort).abort();
591 modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
596 @SuppressWarnings({ "unchecked" })
598 public void testConcurrentThreePhaseCommits() throws Throwable {
599 new ShardTestKit(getSystem()) {{
600 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
601 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
602 "testConcurrentThreePhaseCommits");
604 waitUntilLeader(shard);
606 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
608 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
610 String transactionID1 = "tx1";
611 MutableCompositeModification modification1 = new MutableCompositeModification();
612 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
613 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
615 String transactionID2 = "tx2";
616 MutableCompositeModification modification2 = new MutableCompositeModification();
617 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
618 TestModel.OUTER_LIST_PATH,
619 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
622 String transactionID3 = "tx3";
623 MutableCompositeModification modification3 = new MutableCompositeModification();
624 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
625 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
626 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
627 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
631 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
632 final Timeout timeout = new Timeout(duration);
634 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
635 // by the ShardTransaction.
637 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
638 cohort1, modification1, true), getRef());
639 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
640 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
641 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
643 // Send the CanCommitTransaction message for the first Tx.
645 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
646 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
647 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
648 assertEquals("Can commit", true, canCommitReply.getCanCommit());
650 // Send the ForwardedReadyTransaction for the next 2 Tx's.
652 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
653 cohort2, modification2, true), getRef());
654 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
656 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
657 cohort3, modification3, true), getRef());
658 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
660 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
661 // processed after the first Tx completes.
663 Future<Object> canCommitFuture1 = Patterns.ask(shard,
664 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
666 Future<Object> canCommitFuture2 = Patterns.ask(shard,
667 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
669 // Send the CommitTransaction message for the first Tx. After it completes, it should
670 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
672 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
673 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
675 // Wait for the next 2 Tx's to complete.
677 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
678 final CountDownLatch commitLatch = new CountDownLatch(2);
680 class OnFutureComplete extends OnComplete<Object> {
681 private final Class<?> expRespType;
683 OnFutureComplete(final Class<?> expRespType) {
684 this.expRespType = expRespType;
688 public void onComplete(final Throwable error, final Object resp) {
690 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
693 assertEquals("Commit response type", expRespType, resp.getClass());
695 } catch (Exception e) {
701 void onSuccess(final Object resp) throws Exception {
705 class OnCommitFutureComplete extends OnFutureComplete {
706 OnCommitFutureComplete() {
707 super(CommitTransactionReply.SERIALIZABLE_CLASS);
711 public void onComplete(final Throwable error, final Object resp) {
712 super.onComplete(error, resp);
713 commitLatch.countDown();
717 class OnCanCommitFutureComplete extends OnFutureComplete {
718 private final String transactionID;
720 OnCanCommitFutureComplete(final String transactionID) {
721 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
722 this.transactionID = transactionID;
726 void onSuccess(final Object resp) throws Exception {
727 CanCommitTransactionReply canCommitReply =
728 CanCommitTransactionReply.fromSerializable(resp);
729 assertEquals("Can commit", true, canCommitReply.getCanCommit());
731 Future<Object> commitFuture = Patterns.ask(shard,
732 new CommitTransaction(transactionID).toSerializable(), timeout);
733 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
737 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
738 getSystem().dispatcher());
740 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
741 getSystem().dispatcher());
743 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
745 if(caughtEx.get() != null) {
746 throw caughtEx.get();
749 assertEquals("Commits complete", true, done);
751 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
752 inOrder.verify(cohort1).canCommit();
753 inOrder.verify(cohort1).preCommit();
754 inOrder.verify(cohort1).commit();
755 inOrder.verify(cohort2).canCommit();
756 inOrder.verify(cohort2).preCommit();
757 inOrder.verify(cohort2).commit();
758 inOrder.verify(cohort3).canCommit();
759 inOrder.verify(cohort3).preCommit();
760 inOrder.verify(cohort3).commit();
762 // Verify data in the data store.
764 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
765 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
766 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
767 outerList.getValue() instanceof Iterable);
768 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
769 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
770 entry instanceof MapEntryNode);
771 MapEntryNode mapEntry = (MapEntryNode)entry;
772 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
773 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
774 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
775 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
777 for(int i = 0; i < 20 * 5; i++) {
778 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
779 if(lastLogIndex == 2) {
782 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
785 assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
787 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
792 public void testCommitPhaseFailure() throws Throwable {
793 new ShardTestKit(getSystem()) {{
794 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
795 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
796 "testCommitPhaseFailure");
798 waitUntilLeader(shard);
800 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
803 String transactionID1 = "tx1";
804 MutableCompositeModification modification1 = new MutableCompositeModification();
805 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
806 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
807 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
808 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
810 String transactionID2 = "tx2";
811 MutableCompositeModification modification2 = new MutableCompositeModification();
812 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
813 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
815 FiniteDuration duration = duration("5 seconds");
816 final Timeout timeout = new Timeout(duration);
818 // Simulate the ForwardedReadyTransaction messages that would be sent
819 // by the ShardTransaction.
821 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
822 cohort1, modification1, true), getRef());
823 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
825 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
826 cohort2, modification2, true), getRef());
827 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
829 // Send the CanCommitTransaction message for the first Tx.
831 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
832 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
833 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
834 assertEquals("Can commit", true, canCommitReply.getCanCommit());
836 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
837 // processed after the first Tx completes.
839 Future<Object> canCommitFuture = Patterns.ask(shard,
840 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
842 // Send the CommitTransaction message for the first Tx. This should send back an error
843 // and trigger the 2nd Tx to proceed.
845 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
846 expectMsgClass(duration, akka.actor.Status.Failure.class);
848 // Wait for the 2nd Tx to complete the canCommit phase.
850 final CountDownLatch latch = new CountDownLatch(1);
851 canCommitFuture.onComplete(new OnComplete<Object>() {
853 public void onComplete(final Throwable t, final Object resp) {
856 }, getSystem().dispatcher());
858 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
860 InOrder inOrder = inOrder(cohort1, cohort2);
861 inOrder.verify(cohort1).canCommit();
862 inOrder.verify(cohort1).preCommit();
863 inOrder.verify(cohort1).commit();
864 inOrder.verify(cohort2).canCommit();
866 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
871 public void testPreCommitPhaseFailure() throws Throwable {
872 new ShardTestKit(getSystem()) {{
873 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
874 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
875 "testPreCommitPhaseFailure");
877 waitUntilLeader(shard);
879 String transactionID = "tx1";
880 MutableCompositeModification modification = new MutableCompositeModification();
881 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
882 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
883 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
885 FiniteDuration duration = duration("5 seconds");
887 // Simulate the ForwardedReadyTransaction messages that would be sent
888 // by the ShardTransaction.
890 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
891 cohort, modification, true), getRef());
892 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
894 // Send the CanCommitTransaction message.
896 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
897 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
898 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
899 assertEquals("Can commit", true, canCommitReply.getCanCommit());
901 // Send the CommitTransaction message. This should send back an error
902 // for preCommit failure.
904 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
905 expectMsgClass(duration, akka.actor.Status.Failure.class);
907 InOrder inOrder = inOrder(cohort);
908 inOrder.verify(cohort).canCommit();
909 inOrder.verify(cohort).preCommit();
911 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
916 public void testCanCommitPhaseFailure() throws Throwable {
917 new ShardTestKit(getSystem()) {{
918 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
919 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
920 "testCanCommitPhaseFailure");
922 waitUntilLeader(shard);
924 final FiniteDuration duration = duration("5 seconds");
926 String transactionID = "tx1";
927 MutableCompositeModification modification = new MutableCompositeModification();
928 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
929 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
931 // Simulate the ForwardedReadyTransaction messages that would be sent
932 // by the ShardTransaction.
934 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
935 cohort, modification, true), getRef());
936 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
938 // Send the CanCommitTransaction message.
940 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
941 expectMsgClass(duration, akka.actor.Status.Failure.class);
943 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
948 public void testAbortBeforeFinishCommit() throws Throwable {
949 new ShardTestKit(getSystem()) {{
950 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
951 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
952 "testAbortBeforeFinishCommit");
954 waitUntilLeader(shard);
956 final FiniteDuration duration = duration("5 seconds");
957 final Timeout timeout = new Timeout(duration);
959 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
961 final String transactionID = "tx1";
962 final CountDownLatch abortComplete = new CountDownLatch(1);
963 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
964 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
966 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
967 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
969 Future<Object> abortFuture = Patterns.ask(shard,
970 new AbortTransaction(transactionID).toSerializable(), timeout);
971 abortFuture.onComplete(new OnComplete<Object>() {
973 public void onComplete(final Throwable e, final Object resp) {
974 abortComplete.countDown();
976 }, getSystem().dispatcher());
978 return preCommitFuture;
982 MutableCompositeModification modification = new MutableCompositeModification();
983 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
984 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
985 modification, preCommit);
987 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
988 cohort, modification, true), getRef());
989 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
991 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
992 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
993 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
994 assertEquals("Can commit", true, canCommitReply.getCanCommit());
996 Future<Object> commitFuture = Patterns.ask(shard,
997 new CommitTransaction(transactionID).toSerializable(), timeout);
999 assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
1001 Await.result(commitFuture, duration);
1003 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1004 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1006 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1011 public void testTransactionCommitTimeout() throws Throwable {
1012 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
1014 new ShardTestKit(getSystem()) {{
1015 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1016 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1017 "testTransactionCommitTimeout");
1019 waitUntilLeader(shard);
1021 final FiniteDuration duration = duration("5 seconds");
1023 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1025 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1026 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1027 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1029 // Create 1st Tx - will timeout
1031 String transactionID1 = "tx1";
1032 MutableCompositeModification modification1 = new MutableCompositeModification();
1033 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1034 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1035 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1036 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1041 String transactionID2 = "tx3";
1042 MutableCompositeModification modification2 = new MutableCompositeModification();
1043 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1044 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1045 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1047 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1052 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1053 cohort1, modification1, true), getRef());
1054 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1056 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1057 cohort2, modification2, true), getRef());
1058 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1060 // canCommit 1st Tx. We don't send the commit so it should timeout.
1062 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1063 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1065 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1067 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1068 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1070 // Commit the 2nd Tx.
1072 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1073 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1075 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1076 assertNotNull(listNodePath + " not found", node);
1078 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1083 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1084 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1086 new ShardTestKit(getSystem()) {{
1087 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1088 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1089 "testTransactionCommitQueueCapacityExceeded");
1091 waitUntilLeader(shard);
1093 final FiniteDuration duration = duration("5 seconds");
1095 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1097 String transactionID1 = "tx1";
1098 MutableCompositeModification modification1 = new MutableCompositeModification();
1099 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1100 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1102 String transactionID2 = "tx2";
1103 MutableCompositeModification modification2 = new MutableCompositeModification();
1104 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1105 TestModel.OUTER_LIST_PATH,
1106 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1109 String transactionID3 = "tx3";
1110 MutableCompositeModification modification3 = new MutableCompositeModification();
1111 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1112 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1116 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1117 cohort1, modification1, true), getRef());
1118 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1120 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1121 cohort2, modification2, true), getRef());
1122 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1124 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1125 cohort3, modification3, true), getRef());
1126 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1128 // canCommit 1st Tx.
1130 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1131 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1133 // canCommit the 2nd Tx - it should get queued.
1135 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1137 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1139 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1140 expectMsgClass(duration, akka.actor.Status.Failure.class);
1142 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1147 public void testCanCommitBeforeReadyFailure() throws Throwable {
1148 new ShardTestKit(getSystem()) {{
1149 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1150 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1151 "testCanCommitBeforeReadyFailure");
1153 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1154 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1156 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1161 public void testAbortTransaction() throws Throwable {
1162 new ShardTestKit(getSystem()) {{
1163 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1164 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1165 "testAbortTransaction");
1167 waitUntilLeader(shard);
1169 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1171 String transactionID1 = "tx1";
1172 MutableCompositeModification modification1 = new MutableCompositeModification();
1173 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1174 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1175 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1177 String transactionID2 = "tx2";
1178 MutableCompositeModification modification2 = new MutableCompositeModification();
1179 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1180 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1182 FiniteDuration duration = duration("5 seconds");
1183 final Timeout timeout = new Timeout(duration);
1185 // Simulate the ForwardedReadyTransaction messages that would be sent
1186 // by the ShardTransaction.
1188 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1189 cohort1, modification1, true), getRef());
1190 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1192 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1193 cohort2, modification2, true), getRef());
1194 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1196 // Send the CanCommitTransaction message for the first Tx.
1198 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1199 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1200 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1201 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1203 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1204 // processed after the first Tx completes.
1206 Future<Object> canCommitFuture = Patterns.ask(shard,
1207 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1209 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1212 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1213 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1215 // Wait for the 2nd Tx to complete the canCommit phase.
1217 final CountDownLatch latch = new CountDownLatch(1);
1218 canCommitFuture.onComplete(new OnComplete<Object>() {
1220 public void onComplete(final Throwable t, final Object resp) {
1223 }, getSystem().dispatcher());
1225 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1227 InOrder inOrder = inOrder(cohort1, cohort2);
1228 inOrder.verify(cohort1).canCommit();
1229 inOrder.verify(cohort2).canCommit();
1231 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1236 public void testCreateSnapshot() throws IOException, InterruptedException {
1237 testCreateSnapshot(true, "testCreateSnapshot");
1241 public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1242 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1245 @SuppressWarnings("serial")
1246 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1247 final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1248 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1250 new ShardTestKit(getSystem()) {{
1251 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1252 Creator<Shard> creator = new Creator<Shard>() {
1254 public Shard create() throws Exception {
1255 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1256 dataStoreContext, SCHEMA_CONTEXT) {
1258 protected void commitSnapshot(final long sequenceNumber) {
1259 super.commitSnapshot(sequenceNumber);
1260 latch.get().countDown();
1266 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1267 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1269 waitUntilLeader(shard);
1271 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1273 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1275 latch.set(new CountDownLatch(1));
1276 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1278 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1280 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1285 * This test simply verifies that the applySnapShot logic will work
1286 * @throws ReadFailedException
1289 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1290 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1292 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1294 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1295 putTransaction.write(TestModel.TEST_PATH,
1296 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1297 commitTransaction(putTransaction);
1300 NormalizedNode<?, ?> expected = readStore(store);
1302 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1304 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1305 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1307 commitTransaction(writeTransaction);
1309 NormalizedNode<?, ?> actual = readStore(store);
1311 assertEquals(expected, actual);
1315 public void testRecoveryApplicable(){
1317 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1318 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1320 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1321 persistentContext, SCHEMA_CONTEXT);
1323 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1324 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1326 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1327 nonPersistentContext, SCHEMA_CONTEXT);
1329 new ShardTestKit(getSystem()) {{
1330 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1331 persistentProps, "testPersistence1");
1333 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1335 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1337 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1338 nonPersistentProps, "testPersistence2");
1340 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1342 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1349 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1350 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1351 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1352 transaction.read(YangInstanceIdentifier.builder().build());
1354 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1356 NormalizedNode<?, ?> normalizedNode = optional.get();
1358 transaction.close();
1360 return normalizedNode;
1363 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1364 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1365 ListenableFuture<Void> future =
1366 commitCohort.preCommit();
1369 future = commitCohort.commit();
1371 } catch (InterruptedException | ExecutionException e) {
1375 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1376 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1378 public void onDataChanged(
1379 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1385 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1386 throws ExecutionException, InterruptedException {
1387 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1389 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1390 transaction.read(id);
1392 Optional<NormalizedNode<?, ?>> optional = future.get();
1393 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1395 transaction.close();
1400 private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1401 throws ExecutionException, InterruptedException {
1402 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1404 transaction.write(id, node);
1406 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1407 commitCohort.preCommit().get();
1408 commitCohort.commit().get();
1411 @SuppressWarnings("serial")
1412 private static final class DelegatingShardCreator implements Creator<Shard> {
1413 private final Creator<Shard> delegate;
1415 DelegatingShardCreator(final Creator<Shard> delegate) {
1416 this.delegate = delegate;
1420 public Shard create() throws Exception {
1421 return delegate.create();