1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Ignore;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.invocation.InvocationOnMock;
46 import org.mockito.stubbing.Answer;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.Modification;
63 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
64 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
65 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
66 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
67 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
68 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
71 import org.opendaylight.controller.cluster.raft.Snapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
75 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
76 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
79 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
81 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
82 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
83 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
86 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
87 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
89 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
90 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
91 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
98 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
100 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
101 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
102 import scala.concurrent.Await;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.FiniteDuration;
106 public class ShardTest extends AbstractActorTest {
108 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
110 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
112 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
113 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
115 private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
116 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
117 shardHeartbeatIntervalInMillis(100).build();
120 public void setUp() {
121 InMemorySnapshotStore.clear();
122 InMemoryJournal.clear();
126 public void tearDown() {
127 InMemorySnapshotStore.clear();
128 InMemoryJournal.clear();
131 private Props newShardProps() {
132 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
133 dataStoreContext, SCHEMA_CONTEXT);
137 public void testRegisterChangeListener() throws Exception {
138 new ShardTestKit(getSystem()) {{
139 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
140 newShardProps(), "testRegisterChangeListener");
142 waitUntilLeader(shard);
144 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
146 MockDataChangeListener listener = new MockDataChangeListener(1);
147 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
148 "testRegisterChangeListener-DataChangeListener");
150 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
151 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
153 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
154 RegisterChangeListenerReply.class);
155 String replyPath = reply.getListenerRegistrationPath().toString();
156 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
157 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
159 YangInstanceIdentifier path = TestModel.TEST_PATH;
160 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
162 listener.waitForChangeEvents(path);
164 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
165 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
169 @SuppressWarnings("serial")
171 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
172 // This test tests the timing window in which a change listener is registered before the
173 // shard becomes the leader. We verify that the listener is registered and notified of the
174 // existing data when the shard becomes the leader.
175 new ShardTestKit(getSystem()) {{
176 // For this test, we want to send the RegisterChangeListener message after the shard
177 // has recovered from persistence and before it becomes the leader. So we subclass
178 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
179 // we know that the shard has been initialized to a follower and has started the
180 // election process. The following 2 CountDownLatches are used to coordinate the
181 // ElectionTimeout with the sending of the RegisterChangeListener message.
182 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
183 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
184 Creator<Shard> creator = new Creator<Shard>() {
185 boolean firstElectionTimeout = true;
188 public Shard create() throws Exception {
189 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
190 dataStoreContext, SCHEMA_CONTEXT) {
192 public void onReceiveCommand(final Object message) throws Exception {
193 if(message instanceof ElectionTimeout && firstElectionTimeout) {
194 // Got the first ElectionTimeout. We don't forward it to the
195 // base Shard yet until we've sent the RegisterChangeListener
196 // message. So we signal the onFirstElectionTimeout latch to tell
197 // the main thread to send the RegisterChangeListener message and
198 // start a thread to wait on the onChangeListenerRegistered latch,
199 // which the main thread signals after it has sent the message.
200 // After the onChangeListenerRegistered is triggered, we send the
201 // original ElectionTimeout message to proceed with the election.
202 firstElectionTimeout = false;
203 final ActorRef self = getSelf();
207 Uninterruptibles.awaitUninterruptibly(
208 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
209 self.tell(message, self);
213 onFirstElectionTimeout.countDown();
215 super.onReceiveCommand(message);
222 MockDataChangeListener listener = new MockDataChangeListener(1);
223 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
224 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
226 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
227 Props.create(new DelegatingShardCreator(creator)),
228 "testRegisterChangeListenerWhenNotLeaderInitially");
230 // Write initial data into the in-memory store.
231 YangInstanceIdentifier path = TestModel.TEST_PATH;
232 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
234 // Wait until the shard receives the first ElectionTimeout message.
235 assertEquals("Got first ElectionTimeout", true,
236 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
238 // Now send the RegisterChangeListener and wait for the reply.
239 shard.tell(new RegisterChangeListener(path, dclActor.path(),
240 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
242 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
243 RegisterChangeListenerReply.class);
244 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
246 // Sanity check - verify the shard is not the leader yet.
247 shard.tell(new FindLeader(), getRef());
248 FindLeaderReply findLeadeReply =
249 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
250 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
252 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
253 // with the election process.
254 onChangeListenerRegistered.countDown();
256 // Wait for the shard to become the leader and notify our listener with the existing
257 // data in the store.
258 listener.waitForChangeEvents(path);
260 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
261 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
266 public void testCreateTransaction(){
267 new ShardTestKit(getSystem()) {{
268 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
270 waitUntilLeader(shard);
272 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
274 shard.tell(new CreateTransaction("txn-1",
275 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
277 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
278 CreateTransactionReply.class);
280 String path = reply.getTransactionActorPath().toString();
281 assertTrue("Unexpected transaction path " + path,
282 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
284 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
289 public void testCreateTransactionOnChain(){
290 new ShardTestKit(getSystem()) {{
291 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
293 waitUntilLeader(shard);
295 shard.tell(new CreateTransaction("txn-1",
296 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
299 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
300 CreateTransactionReply.class);
302 String path = reply.getTransactionActorPath().toString();
303 assertTrue("Unexpected transaction path " + path,
304 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
306 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
310 @SuppressWarnings("serial")
312 public void testPeerAddressResolved() throws Exception {
313 new ShardTestKit(getSystem()) {{
314 final CountDownLatch recoveryComplete = new CountDownLatch(1);
315 class TestShard extends Shard {
317 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
318 dataStoreContext, SCHEMA_CONTEXT);
321 Map<String, String> getPeerAddresses() {
322 return getRaftActorContext().getPeerAddresses();
326 protected void onRecoveryComplete() {
328 super.onRecoveryComplete();
330 recoveryComplete.countDown();
335 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
336 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
338 public TestShard create() throws Exception {
339 return new TestShard();
341 })), "testPeerAddressResolved");
343 //waitUntilLeader(shard);
344 assertEquals("Recovery complete", true,
345 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
347 String address = "akka://foobar";
348 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
350 assertEquals("getPeerAddresses", address,
351 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
353 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
358 public void testApplySnapshot() throws Exception {
359 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
360 "testApplySnapshot");
362 NormalizedNodeToNodeCodec codec =
363 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
365 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
367 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
368 NormalizedNode<?,?> expected = readStore(shard, root);
370 NormalizedNodeMessages.Container encode = codec.encode(expected);
372 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
373 encode.getNormalizedNode().toByteString().toByteArray(),
374 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
376 shard.underlyingActor().onReceiveCommand(applySnapshot);
378 NormalizedNode<?,?> actual = readStore(shard, root);
380 assertEquals(expected, actual);
382 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
386 public void testApplyState() throws Exception {
388 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
390 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
392 MutableCompositeModification compMod = new MutableCompositeModification();
393 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
394 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
395 ApplyState applyState = new ApplyState(null, "test",
396 new ReplicatedLogImplEntry(1, 2, payload));
398 shard.underlyingActor().onReceiveCommand(applyState);
400 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
401 assertEquals("Applied state", node, actual);
403 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
406 @SuppressWarnings("serial")
408 public void testRecovery() throws Exception {
410 // Set up the InMemorySnapshotStore.
412 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
413 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
415 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
416 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
417 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
418 commitCohort.preCommit().get();
419 commitCohort.commit().get();
421 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
422 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
424 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
425 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
427 getNormalizedNode().toByteString().toByteArray(),
428 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
430 // Set up the InMemoryJournal.
432 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
433 new WriteModification(TestModel.OUTER_LIST_PATH,
434 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
437 int nListEntries = 16;
438 Set<Integer> listEntryKeys = new HashSet<>();
439 for(int i = 1; i <= nListEntries-5; i++) {
440 listEntryKeys.add(Integer.valueOf(i));
441 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
442 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
443 Modification mod = new MergeModification(path,
444 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
446 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
450 // Add some of the new CompositeModificationByteStringPayload
451 for(int i = 11; i <= nListEntries; i++) {
452 listEntryKeys.add(Integer.valueOf(i));
453 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
454 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
455 Modification mod = new MergeModification(path,
456 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
458 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
459 newByteStringPayload(mod)));
463 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
464 new ApplyLogEntries(nListEntries));
466 // Create the actor and wait for recovery complete.
468 final CountDownLatch recoveryComplete = new CountDownLatch(1);
470 Creator<Shard> creator = new Creator<Shard>() {
472 public Shard create() throws Exception {
473 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
474 dataStoreContext, SCHEMA_CONTEXT) {
476 protected void onRecoveryComplete() {
478 super.onRecoveryComplete();
480 recoveryComplete.countDown();
487 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
488 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
490 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
492 // Verify data in the data store.
494 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
495 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
496 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
497 outerList.getValue() instanceof Iterable);
498 for(Object entry: (Iterable<?>) outerList.getValue()) {
499 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
500 entry instanceof MapEntryNode);
501 MapEntryNode mapEntry = (MapEntryNode)entry;
502 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
503 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
504 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
505 Object value = idLeaf.get().getValue();
506 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
507 listEntryKeys.remove(value));
510 if(!listEntryKeys.isEmpty()) {
511 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
515 assertEquals("Last log index", nListEntries,
516 shard.underlyingActor().getShardMBean().getLastLogIndex());
517 assertEquals("Commit index", nListEntries,
518 shard.underlyingActor().getShardMBean().getCommitIndex());
519 assertEquals("Last applied", nListEntries,
520 shard.underlyingActor().getShardMBean().getLastApplied());
522 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
525 private CompositeModificationPayload newPayload(final Modification... mods) {
526 MutableCompositeModification compMod = new MutableCompositeModification();
527 for(Modification mod: mods) {
528 compMod.addModification(mod);
531 return new CompositeModificationPayload(compMod.toSerializable());
534 private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
535 MutableCompositeModification compMod = new MutableCompositeModification();
536 for(Modification mod: mods) {
537 compMod.addModification(mod);
540 return new CompositeModificationByteStringPayload(compMod.toSerializable());
544 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
545 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
546 final MutableCompositeModification modification) {
547 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
550 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
551 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
552 final MutableCompositeModification modification,
553 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
555 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
556 tx.write(path, data);
557 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
558 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
560 doAnswer(new Answer<ListenableFuture<Boolean>>() {
562 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
563 return realCohort.canCommit();
565 }).when(cohort).canCommit();
567 doAnswer(new Answer<ListenableFuture<Void>>() {
569 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
570 if(preCommit != null) {
571 return preCommit.apply(realCohort);
573 return realCohort.preCommit();
576 }).when(cohort).preCommit();
578 doAnswer(new Answer<ListenableFuture<Void>>() {
580 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
581 return realCohort.commit();
583 }).when(cohort).commit();
585 doAnswer(new Answer<ListenableFuture<Void>>() {
587 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
588 return realCohort.abort();
590 }).when(cohort).abort();
592 modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
597 @SuppressWarnings({ "unchecked" })
599 public void testConcurrentThreePhaseCommits() throws Throwable {
600 new ShardTestKit(getSystem()) {{
601 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
602 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
603 "testConcurrentThreePhaseCommits");
605 waitUntilLeader(shard);
607 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
609 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
611 String transactionID1 = "tx1";
612 MutableCompositeModification modification1 = new MutableCompositeModification();
613 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
614 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
616 String transactionID2 = "tx2";
617 MutableCompositeModification modification2 = new MutableCompositeModification();
618 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
619 TestModel.OUTER_LIST_PATH,
620 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
623 String transactionID3 = "tx3";
624 MutableCompositeModification modification3 = new MutableCompositeModification();
625 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
626 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
627 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
628 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
632 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
633 final Timeout timeout = new Timeout(duration);
635 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
636 // by the ShardTransaction.
638 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
639 cohort1, modification1, true), getRef());
640 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
641 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
642 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
644 // Send the CanCommitTransaction message for the first Tx.
646 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
647 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
648 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
649 assertEquals("Can commit", true, canCommitReply.getCanCommit());
651 // Send the ForwardedReadyTransaction for the next 2 Tx's.
653 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
654 cohort2, modification2, true), getRef());
655 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
657 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
658 cohort3, modification3, true), getRef());
659 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
661 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
662 // processed after the first Tx completes.
664 Future<Object> canCommitFuture1 = Patterns.ask(shard,
665 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
667 Future<Object> canCommitFuture2 = Patterns.ask(shard,
668 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
670 // Send the CommitTransaction message for the first Tx. After it completes, it should
671 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
673 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
674 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
676 // Wait for the next 2 Tx's to complete.
678 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
679 final CountDownLatch commitLatch = new CountDownLatch(2);
681 class OnFutureComplete extends OnComplete<Object> {
682 private final Class<?> expRespType;
684 OnFutureComplete(final Class<?> expRespType) {
685 this.expRespType = expRespType;
689 public void onComplete(final Throwable error, final Object resp) {
691 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
694 assertEquals("Commit response type", expRespType, resp.getClass());
696 } catch (Exception e) {
702 void onSuccess(final Object resp) throws Exception {
706 class OnCommitFutureComplete extends OnFutureComplete {
707 OnCommitFutureComplete() {
708 super(CommitTransactionReply.SERIALIZABLE_CLASS);
712 public void onComplete(final Throwable error, final Object resp) {
713 super.onComplete(error, resp);
714 commitLatch.countDown();
718 class OnCanCommitFutureComplete extends OnFutureComplete {
719 private final String transactionID;
721 OnCanCommitFutureComplete(final String transactionID) {
722 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
723 this.transactionID = transactionID;
727 void onSuccess(final Object resp) throws Exception {
728 CanCommitTransactionReply canCommitReply =
729 CanCommitTransactionReply.fromSerializable(resp);
730 assertEquals("Can commit", true, canCommitReply.getCanCommit());
732 Future<Object> commitFuture = Patterns.ask(shard,
733 new CommitTransaction(transactionID).toSerializable(), timeout);
734 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
738 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
739 getSystem().dispatcher());
741 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
742 getSystem().dispatcher());
744 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
746 if(caughtEx.get() != null) {
747 throw caughtEx.get();
750 assertEquals("Commits complete", true, done);
752 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
753 inOrder.verify(cohort1).canCommit();
754 inOrder.verify(cohort1).preCommit();
755 inOrder.verify(cohort1).commit();
756 inOrder.verify(cohort2).canCommit();
757 inOrder.verify(cohort2).preCommit();
758 inOrder.verify(cohort2).commit();
759 inOrder.verify(cohort3).canCommit();
760 inOrder.verify(cohort3).preCommit();
761 inOrder.verify(cohort3).commit();
763 // Verify data in the data store.
765 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
766 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
767 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
768 outerList.getValue() instanceof Iterable);
769 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
770 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
771 entry instanceof MapEntryNode);
772 MapEntryNode mapEntry = (MapEntryNode)entry;
773 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
774 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
775 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
776 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
778 for(int i = 0; i < 20 * 5; i++) {
779 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
780 if(lastLogIndex == 2) {
783 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
786 assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
788 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
793 public void testCommitPhaseFailure() throws Throwable {
794 new ShardTestKit(getSystem()) {{
795 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
796 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
797 "testCommitPhaseFailure");
799 waitUntilLeader(shard);
801 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
804 String transactionID1 = "tx1";
805 MutableCompositeModification modification1 = new MutableCompositeModification();
806 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
807 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
808 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
809 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
811 String transactionID2 = "tx2";
812 MutableCompositeModification modification2 = new MutableCompositeModification();
813 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
814 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
816 FiniteDuration duration = duration("5 seconds");
817 final Timeout timeout = new Timeout(duration);
819 // Simulate the ForwardedReadyTransaction messages that would be sent
820 // by the ShardTransaction.
822 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
823 cohort1, modification1, true), getRef());
824 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
826 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
827 cohort2, modification2, true), getRef());
828 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
830 // Send the CanCommitTransaction message for the first Tx.
832 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
833 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
834 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
835 assertEquals("Can commit", true, canCommitReply.getCanCommit());
837 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
838 // processed after the first Tx completes.
840 Future<Object> canCommitFuture = Patterns.ask(shard,
841 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
843 // Send the CommitTransaction message for the first Tx. This should send back an error
844 // and trigger the 2nd Tx to proceed.
846 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
847 expectMsgClass(duration, akka.actor.Status.Failure.class);
849 // Wait for the 2nd Tx to complete the canCommit phase.
851 final CountDownLatch latch = new CountDownLatch(1);
852 canCommitFuture.onComplete(new OnComplete<Object>() {
854 public void onComplete(final Throwable t, final Object resp) {
857 }, getSystem().dispatcher());
859 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
861 InOrder inOrder = inOrder(cohort1, cohort2);
862 inOrder.verify(cohort1).canCommit();
863 inOrder.verify(cohort1).preCommit();
864 inOrder.verify(cohort1).commit();
865 inOrder.verify(cohort2).canCommit();
867 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
872 public void testPreCommitPhaseFailure() throws Throwable {
873 new ShardTestKit(getSystem()) {{
874 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
875 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
876 "testPreCommitPhaseFailure");
878 waitUntilLeader(shard);
880 String transactionID = "tx1";
881 MutableCompositeModification modification = new MutableCompositeModification();
882 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
883 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
884 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
886 FiniteDuration duration = duration("5 seconds");
888 // Simulate the ForwardedReadyTransaction messages that would be sent
889 // by the ShardTransaction.
891 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
892 cohort, modification, true), getRef());
893 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
895 // Send the CanCommitTransaction message.
897 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
898 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
899 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
900 assertEquals("Can commit", true, canCommitReply.getCanCommit());
902 // Send the CommitTransaction message. This should send back an error
903 // for preCommit failure.
905 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
906 expectMsgClass(duration, akka.actor.Status.Failure.class);
908 InOrder inOrder = inOrder(cohort);
909 inOrder.verify(cohort).canCommit();
910 inOrder.verify(cohort).preCommit();
912 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
917 public void testCanCommitPhaseFailure() throws Throwable {
918 new ShardTestKit(getSystem()) {{
919 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
920 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
921 "testCanCommitPhaseFailure");
923 waitUntilLeader(shard);
925 final FiniteDuration duration = duration("5 seconds");
927 String transactionID = "tx1";
928 MutableCompositeModification modification = new MutableCompositeModification();
929 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
930 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
932 // Simulate the ForwardedReadyTransaction messages that would be sent
933 // by the ShardTransaction.
935 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
936 cohort, modification, true), getRef());
937 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
939 // Send the CanCommitTransaction message.
941 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
942 expectMsgClass(duration, akka.actor.Status.Failure.class);
944 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
949 @Ignore("This test will work only if replication is turned on. Needs modification due to optimizations added to Shard/RaftActor.")
950 public void testAbortBeforeFinishCommit() throws Throwable {
951 new ShardTestKit(getSystem()) {{
952 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
953 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
954 "testAbortBeforeFinishCommit");
956 waitUntilLeader(shard);
958 final FiniteDuration duration = duration("5 seconds");
959 final Timeout timeout = new Timeout(duration);
961 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
963 final String transactionID = "tx1";
964 final CountDownLatch abortComplete = new CountDownLatch(1);
965 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
966 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
968 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
969 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
971 Future<Object> abortFuture = Patterns.ask(shard,
972 new AbortTransaction(transactionID).toSerializable(), timeout);
973 abortFuture.onComplete(new OnComplete<Object>() {
975 public void onComplete(final Throwable e, final Object resp) {
976 abortComplete.countDown();
978 }, getSystem().dispatcher());
980 return preCommitFuture;
984 MutableCompositeModification modification = new MutableCompositeModification();
985 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
986 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
987 modification, preCommit);
989 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
990 cohort, modification, true), getRef());
991 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
993 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
994 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
995 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
996 assertEquals("Can commit", true, canCommitReply.getCanCommit());
998 Future<Object> commitFuture = Patterns.ask(shard,
999 new CommitTransaction(transactionID).toSerializable(), timeout);
1001 assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
1003 Await.result(commitFuture, duration);
1005 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1006 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1008 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1013 public void testTransactionCommitTimeout() throws Throwable {
1014 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
1016 new ShardTestKit(getSystem()) {{
1017 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1018 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1019 "testTransactionCommitTimeout");
1021 waitUntilLeader(shard);
1023 final FiniteDuration duration = duration("5 seconds");
1025 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1027 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1028 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1029 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1031 // Create 1st Tx - will timeout
1033 String transactionID1 = "tx1";
1034 MutableCompositeModification modification1 = new MutableCompositeModification();
1035 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1036 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1037 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1038 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1043 String transactionID2 = "tx3";
1044 MutableCompositeModification modification2 = new MutableCompositeModification();
1045 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1046 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1047 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1049 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1054 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1055 cohort1, modification1, true), getRef());
1056 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1058 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1059 cohort2, modification2, true), getRef());
1060 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1062 // canCommit 1st Tx. We don't send the commit so it should timeout.
1064 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1065 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1067 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1069 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1070 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1072 // Commit the 2nd Tx.
1074 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1075 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1077 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1078 assertNotNull(listNodePath + " not found", node);
1080 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1085 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1086 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1088 new ShardTestKit(getSystem()) {{
1089 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1090 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1091 "testTransactionCommitQueueCapacityExceeded");
1093 waitUntilLeader(shard);
1095 final FiniteDuration duration = duration("5 seconds");
1097 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1099 String transactionID1 = "tx1";
1100 MutableCompositeModification modification1 = new MutableCompositeModification();
1101 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1102 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1104 String transactionID2 = "tx2";
1105 MutableCompositeModification modification2 = new MutableCompositeModification();
1106 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1107 TestModel.OUTER_LIST_PATH,
1108 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1111 String transactionID3 = "tx3";
1112 MutableCompositeModification modification3 = new MutableCompositeModification();
1113 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1114 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1118 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1119 cohort1, modification1, true), getRef());
1120 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1122 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1123 cohort2, modification2, true), getRef());
1124 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1126 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1127 cohort3, modification3, true), getRef());
1128 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1130 // canCommit 1st Tx.
1132 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1133 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1135 // canCommit the 2nd Tx - it should get queued.
1137 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1139 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1141 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1142 expectMsgClass(duration, akka.actor.Status.Failure.class);
1144 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1149 public void testCanCommitBeforeReadyFailure() throws Throwable {
1150 new ShardTestKit(getSystem()) {{
1151 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1152 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1153 "testCanCommitBeforeReadyFailure");
1155 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1156 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1158 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1163 public void testAbortTransaction() throws Throwable {
1164 new ShardTestKit(getSystem()) {{
1165 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1166 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1167 "testAbortTransaction");
1169 waitUntilLeader(shard);
1171 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1173 String transactionID1 = "tx1";
1174 MutableCompositeModification modification1 = new MutableCompositeModification();
1175 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1176 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1177 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1179 String transactionID2 = "tx2";
1180 MutableCompositeModification modification2 = new MutableCompositeModification();
1181 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1182 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1184 FiniteDuration duration = duration("5 seconds");
1185 final Timeout timeout = new Timeout(duration);
1187 // Simulate the ForwardedReadyTransaction messages that would be sent
1188 // by the ShardTransaction.
1190 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1191 cohort1, modification1, true), getRef());
1192 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1194 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1195 cohort2, modification2, true), getRef());
1196 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1198 // Send the CanCommitTransaction message for the first Tx.
1200 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1201 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1202 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1203 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1205 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1206 // processed after the first Tx completes.
1208 Future<Object> canCommitFuture = Patterns.ask(shard,
1209 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1211 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1214 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1215 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1217 // Wait for the 2nd Tx to complete the canCommit phase.
1219 final CountDownLatch latch = new CountDownLatch(1);
1220 canCommitFuture.onComplete(new OnComplete<Object>() {
1222 public void onComplete(final Throwable t, final Object resp) {
1225 }, getSystem().dispatcher());
1227 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1229 InOrder inOrder = inOrder(cohort1, cohort2);
1230 inOrder.verify(cohort1).canCommit();
1231 inOrder.verify(cohort2).canCommit();
1233 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1238 public void testCreateSnapshot() throws IOException, InterruptedException {
1239 testCreateSnapshot(true, "testCreateSnapshot");
1243 public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1244 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1247 @SuppressWarnings("serial")
1248 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1249 final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1250 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1252 new ShardTestKit(getSystem()) {{
1253 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1254 Creator<Shard> creator = new Creator<Shard>() {
1256 public Shard create() throws Exception {
1257 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1258 dataStoreContext, SCHEMA_CONTEXT) {
1260 protected void commitSnapshot(final long sequenceNumber) {
1261 super.commitSnapshot(sequenceNumber);
1262 latch.get().countDown();
1268 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1269 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1271 waitUntilLeader(shard);
1273 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1275 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1277 latch.set(new CountDownLatch(1));
1278 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1280 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1282 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1287 * This test simply verifies that the applySnapShot logic will work
1288 * @throws ReadFailedException
1291 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1292 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1294 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1296 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1297 putTransaction.write(TestModel.TEST_PATH,
1298 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1299 commitTransaction(putTransaction);
1302 NormalizedNode<?, ?> expected = readStore(store);
1304 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1306 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1307 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1309 commitTransaction(writeTransaction);
1311 NormalizedNode<?, ?> actual = readStore(store);
1313 assertEquals(expected, actual);
1317 public void testRecoveryApplicable(){
1319 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1320 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1322 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1323 persistentContext, SCHEMA_CONTEXT);
1325 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1326 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1328 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1329 nonPersistentContext, SCHEMA_CONTEXT);
1331 new ShardTestKit(getSystem()) {{
1332 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1333 persistentProps, "testPersistence1");
1335 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1337 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1339 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1340 nonPersistentProps, "testPersistence2");
1342 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1344 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1351 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1352 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1353 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1354 transaction.read(YangInstanceIdentifier.builder().build());
1356 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1358 NormalizedNode<?, ?> normalizedNode = optional.get();
1360 transaction.close();
1362 return normalizedNode;
1365 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1366 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1367 ListenableFuture<Void> future =
1368 commitCohort.preCommit();
1371 future = commitCohort.commit();
1373 } catch (InterruptedException | ExecutionException e) {
1377 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1378 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1380 public void onDataChanged(
1381 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1387 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1388 throws ExecutionException, InterruptedException {
1389 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1391 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1392 transaction.read(id);
1394 Optional<NormalizedNode<?, ?>> optional = future.get();
1395 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1397 transaction.close();
1402 private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1403 throws ExecutionException, InterruptedException {
1404 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1406 transaction.write(id, node);
1408 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1409 commitCohort.preCommit().get();
1410 commitCohort.commit().get();
1413 @SuppressWarnings("serial")
1414 private static final class DelegatingShardCreator implements Creator<Shard> {
1415 private final Creator<Shard> delegate;
1417 DelegatingShardCreator(final Creator<Shard> delegate) {
1418 this.delegate = delegate;
1422 public Shard create() throws Exception {
1423 return delegate.create();