1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.InOrder;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
70 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
80 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
84 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
85 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
86 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
87 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
88 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
89 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
90 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
91 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
92 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
93 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
94 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
95 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
96 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
97 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
98 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
102 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
104 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
105 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
106 import scala.concurrent.Await;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.FiniteDuration;
110 public class ShardTest extends AbstractActorTest {
112 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
114 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
116 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
117 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
119 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
120 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
121 shardHeartbeatIntervalInMillis(100);
124 public void setUp() {
125 Builder newBuilder = DatastoreContext.newBuilder();
126 InMemorySnapshotStore.clear();
127 InMemoryJournal.clear();
131 public void tearDown() {
132 InMemorySnapshotStore.clear();
133 InMemoryJournal.clear();
136 private DatastoreContext newDatastoreContext() {
137 return dataStoreContextBuilder.build();
140 private Props newShardProps() {
141 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
142 newDatastoreContext(), SCHEMA_CONTEXT);
146 public void testRegisterChangeListener() throws Exception {
147 new ShardTestKit(getSystem()) {{
148 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
149 newShardProps(), "testRegisterChangeListener");
151 waitUntilLeader(shard);
153 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
155 MockDataChangeListener listener = new MockDataChangeListener(1);
156 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
157 "testRegisterChangeListener-DataChangeListener");
159 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
160 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
162 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
163 RegisterChangeListenerReply.class);
164 String replyPath = reply.getListenerRegistrationPath().toString();
165 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
166 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
168 YangInstanceIdentifier path = TestModel.TEST_PATH;
169 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
171 listener.waitForChangeEvents(path);
173 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
174 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
178 @SuppressWarnings("serial")
180 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
181 // This test tests the timing window in which a change listener is registered before the
182 // shard becomes the leader. We verify that the listener is registered and notified of the
183 // existing data when the shard becomes the leader.
184 new ShardTestKit(getSystem()) {{
185 // For this test, we want to send the RegisterChangeListener message after the shard
186 // has recovered from persistence and before it becomes the leader. So we subclass
187 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
188 // we know that the shard has been initialized to a follower and has started the
189 // election process. The following 2 CountDownLatches are used to coordinate the
190 // ElectionTimeout with the sending of the RegisterChangeListener message.
191 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
192 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
193 Creator<Shard> creator = new Creator<Shard>() {
194 boolean firstElectionTimeout = true;
197 public Shard create() throws Exception {
198 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
199 newDatastoreContext(), SCHEMA_CONTEXT) {
201 public void onReceiveCommand(final Object message) throws Exception {
202 if(message instanceof ElectionTimeout && firstElectionTimeout) {
203 // Got the first ElectionTimeout. We don't forward it to the
204 // base Shard yet until we've sent the RegisterChangeListener
205 // message. So we signal the onFirstElectionTimeout latch to tell
206 // the main thread to send the RegisterChangeListener message and
207 // start a thread to wait on the onChangeListenerRegistered latch,
208 // which the main thread signals after it has sent the message.
209 // After the onChangeListenerRegistered is triggered, we send the
210 // original ElectionTimeout message to proceed with the election.
211 firstElectionTimeout = false;
212 final ActorRef self = getSelf();
216 Uninterruptibles.awaitUninterruptibly(
217 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
218 self.tell(message, self);
222 onFirstElectionTimeout.countDown();
224 super.onReceiveCommand(message);
231 MockDataChangeListener listener = new MockDataChangeListener(1);
232 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
233 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
235 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
236 Props.create(new DelegatingShardCreator(creator)),
237 "testRegisterChangeListenerWhenNotLeaderInitially");
239 // Write initial data into the in-memory store.
240 YangInstanceIdentifier path = TestModel.TEST_PATH;
241 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
243 // Wait until the shard receives the first ElectionTimeout message.
244 assertEquals("Got first ElectionTimeout", true,
245 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
247 // Now send the RegisterChangeListener and wait for the reply.
248 shard.tell(new RegisterChangeListener(path, dclActor.path(),
249 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
251 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
252 RegisterChangeListenerReply.class);
253 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
255 // Sanity check - verify the shard is not the leader yet.
256 shard.tell(new FindLeader(), getRef());
257 FindLeaderReply findLeadeReply =
258 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
259 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
261 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
262 // with the election process.
263 onChangeListenerRegistered.countDown();
265 // Wait for the shard to become the leader and notify our listener with the existing
266 // data in the store.
267 listener.waitForChangeEvents(path);
269 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
270 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
275 public void testCreateTransaction(){
276 new ShardTestKit(getSystem()) {{
277 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
279 waitUntilLeader(shard);
281 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
283 shard.tell(new CreateTransaction("txn-1",
284 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
286 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
287 CreateTransactionReply.class);
289 String path = reply.getTransactionActorPath().toString();
290 assertTrue("Unexpected transaction path " + path,
291 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
293 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
298 public void testCreateTransactionOnChain(){
299 new ShardTestKit(getSystem()) {{
300 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
302 waitUntilLeader(shard);
304 shard.tell(new CreateTransaction("txn-1",
305 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
308 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
309 CreateTransactionReply.class);
311 String path = reply.getTransactionActorPath().toString();
312 assertTrue("Unexpected transaction path " + path,
313 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
315 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
319 @SuppressWarnings("serial")
321 public void testPeerAddressResolved() throws Exception {
322 new ShardTestKit(getSystem()) {{
323 final CountDownLatch recoveryComplete = new CountDownLatch(1);
324 class TestShard extends Shard {
326 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
327 newDatastoreContext(), SCHEMA_CONTEXT);
330 Map<String, String> getPeerAddresses() {
331 return getRaftActorContext().getPeerAddresses();
335 protected void onRecoveryComplete() {
337 super.onRecoveryComplete();
339 recoveryComplete.countDown();
344 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
345 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
347 public TestShard create() throws Exception {
348 return new TestShard();
350 })), "testPeerAddressResolved");
352 //waitUntilLeader(shard);
353 assertEquals("Recovery complete", true,
354 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
356 String address = "akka://foobar";
357 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
359 assertEquals("getPeerAddresses", address,
360 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
362 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
367 public void testApplySnapshot() throws Exception {
368 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
369 "testApplySnapshot");
371 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
372 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
374 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
377 NormalizedNode<?,?> expected = readStore(store, root);
379 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
380 SerializationUtils.serializeNormalizedNode(expected),
381 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
383 shard.underlyingActor().onReceiveCommand(applySnapshot);
385 NormalizedNode<?,?> actual = readStore(shard, root);
387 assertEquals("Root node", expected, actual);
389 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
393 public void testApplyHelium2VersionSnapshot() throws Exception {
394 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
395 "testApplySnapshot");
397 NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
399 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
400 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
402 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
404 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
405 NormalizedNode<?,?> expected = readStore(store, root);
407 NormalizedNodeMessages.Container encode = codec.encode(expected);
409 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
410 encode.getNormalizedNode().toByteString().toByteArray(),
411 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
413 shard.underlyingActor().onReceiveCommand(applySnapshot);
415 NormalizedNode<?,?> actual = readStore(shard, root);
417 assertEquals("Root node", expected, actual);
419 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
423 public void testApplyState() throws Exception {
425 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
427 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
429 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
430 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
432 shard.underlyingActor().onReceiveCommand(applyState);
434 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
435 assertEquals("Applied state", node, actual);
437 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
441 public void testApplyStateLegacy() throws Exception {
443 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
445 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
447 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
448 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
450 shard.underlyingActor().onReceiveCommand(applyState);
452 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
453 assertEquals("Applied state", node, actual);
455 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
459 public void testRecovery() throws Exception {
461 // Set up the InMemorySnapshotStore.
463 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
464 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
466 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
468 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
470 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
471 SerializationUtils.serializeNormalizedNode(root),
472 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
474 // Set up the InMemoryJournal.
476 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
477 new WriteModification(TestModel.OUTER_LIST_PATH,
478 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
480 int nListEntries = 16;
481 Set<Integer> listEntryKeys = new HashSet<>();
483 // Add some ModificationPayload entries
484 for(int i = 1; i <= nListEntries; i++) {
485 listEntryKeys.add(Integer.valueOf(i));
486 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
487 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
488 Modification mod = new MergeModification(path,
489 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
490 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
491 newModificationPayload(mod)));
494 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
495 new ApplyLogEntries(nListEntries));
497 testRecovery(listEntryKeys);
501 public void testHelium2VersionRecovery() throws Exception {
503 // Set up the InMemorySnapshotStore.
505 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
506 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
508 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
510 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
512 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
513 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
514 getNormalizedNode().toByteString().toByteArray(),
515 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
517 // Set up the InMemoryJournal.
519 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
520 new WriteModification(TestModel.OUTER_LIST_PATH,
521 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
523 int nListEntries = 16;
524 Set<Integer> listEntryKeys = new HashSet<>();
527 // Add some CompositeModificationPayload entries
529 listEntryKeys.add(Integer.valueOf(i));
530 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
531 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
532 Modification mod = new MergeModification(path,
533 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
534 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
535 newLegacyPayload(mod)));
538 // Add some CompositeModificationByteStringPayload entries
539 for(; i <= nListEntries; i++) {
540 listEntryKeys.add(Integer.valueOf(i));
541 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
542 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
543 Modification mod = new MergeModification(path,
544 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
545 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
546 newLegacyByteStringPayload(mod)));
549 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
551 testRecovery(listEntryKeys);
554 private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
555 // Create the actor and wait for recovery complete.
557 int nListEntries = listEntryKeys.size();
559 final CountDownLatch recoveryComplete = new CountDownLatch(1);
561 @SuppressWarnings("serial")
562 Creator<Shard> creator = new Creator<Shard>() {
564 public Shard create() throws Exception {
565 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
566 newDatastoreContext(), SCHEMA_CONTEXT) {
568 protected void onRecoveryComplete() {
570 super.onRecoveryComplete();
572 recoveryComplete.countDown();
579 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
580 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
582 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
584 // Verify data in the data store.
586 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
587 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
588 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
589 outerList.getValue() instanceof Iterable);
590 for(Object entry: (Iterable<?>) outerList.getValue()) {
591 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
592 entry instanceof MapEntryNode);
593 MapEntryNode mapEntry = (MapEntryNode)entry;
594 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
595 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
596 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
597 Object value = idLeaf.get().getValue();
598 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
599 listEntryKeys.remove(value));
602 if(!listEntryKeys.isEmpty()) {
603 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
607 assertEquals("Last log index", nListEntries,
608 shard.underlyingActor().getShardMBean().getLastLogIndex());
609 assertEquals("Commit index", nListEntries,
610 shard.underlyingActor().getShardMBean().getCommitIndex());
611 assertEquals("Last applied", nListEntries,
612 shard.underlyingActor().getShardMBean().getLastApplied());
614 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
617 private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
618 MutableCompositeModification compMod = new MutableCompositeModification();
619 for(Modification mod: mods) {
620 compMod.addModification(mod);
623 return new CompositeModificationPayload(compMod.toSerializable());
626 private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
627 MutableCompositeModification compMod = new MutableCompositeModification();
628 for(Modification mod: mods) {
629 compMod.addModification(mod);
632 return new CompositeModificationByteStringPayload(compMod.toSerializable());
635 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
636 MutableCompositeModification compMod = new MutableCompositeModification();
637 for(Modification mod: mods) {
638 compMod.addModification(mod);
641 return new ModificationPayload(compMod);
644 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
645 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
646 final MutableCompositeModification modification) {
647 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
650 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
651 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
652 final MutableCompositeModification modification,
653 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
655 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
656 tx.write(path, data);
657 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
658 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
660 doAnswer(new Answer<ListenableFuture<Boolean>>() {
662 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
663 return realCohort.canCommit();
665 }).when(cohort).canCommit();
667 doAnswer(new Answer<ListenableFuture<Void>>() {
669 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
670 if(preCommit != null) {
671 return preCommit.apply(realCohort);
673 return realCohort.preCommit();
676 }).when(cohort).preCommit();
678 doAnswer(new Answer<ListenableFuture<Void>>() {
680 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
681 return realCohort.commit();
683 }).when(cohort).commit();
685 doAnswer(new Answer<ListenableFuture<Void>>() {
687 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
688 return realCohort.abort();
690 }).when(cohort).abort();
692 modification.addModification(new WriteModification(path, data));
697 @SuppressWarnings({ "unchecked" })
699 public void testConcurrentThreePhaseCommits() throws Throwable {
700 new ShardTestKit(getSystem()) {{
701 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
702 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
703 "testConcurrentThreePhaseCommits");
705 waitUntilLeader(shard);
707 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
709 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
711 String transactionID1 = "tx1";
712 MutableCompositeModification modification1 = new MutableCompositeModification();
713 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
714 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
716 String transactionID2 = "tx2";
717 MutableCompositeModification modification2 = new MutableCompositeModification();
718 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
719 TestModel.OUTER_LIST_PATH,
720 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
723 String transactionID3 = "tx3";
724 MutableCompositeModification modification3 = new MutableCompositeModification();
725 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
726 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
727 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
728 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
732 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
733 final Timeout timeout = new Timeout(duration);
735 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
736 // by the ShardTransaction.
738 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
739 cohort1, modification1, true), getRef());
740 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
741 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
742 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
744 // Send the CanCommitTransaction message for the first Tx.
746 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
747 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
748 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
749 assertEquals("Can commit", true, canCommitReply.getCanCommit());
751 // Send the ForwardedReadyTransaction for the next 2 Tx's.
753 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
754 cohort2, modification2, true), getRef());
755 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
757 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
758 cohort3, modification3, true), getRef());
759 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
761 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
762 // processed after the first Tx completes.
764 Future<Object> canCommitFuture1 = Patterns.ask(shard,
765 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
767 Future<Object> canCommitFuture2 = Patterns.ask(shard,
768 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
770 // Send the CommitTransaction message for the first Tx. After it completes, it should
771 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
773 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
774 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
776 // Wait for the next 2 Tx's to complete.
778 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
779 final CountDownLatch commitLatch = new CountDownLatch(2);
781 class OnFutureComplete extends OnComplete<Object> {
782 private final Class<?> expRespType;
784 OnFutureComplete(final Class<?> expRespType) {
785 this.expRespType = expRespType;
789 public void onComplete(final Throwable error, final Object resp) {
791 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
794 assertEquals("Commit response type", expRespType, resp.getClass());
796 } catch (Exception e) {
802 void onSuccess(final Object resp) throws Exception {
806 class OnCommitFutureComplete extends OnFutureComplete {
807 OnCommitFutureComplete() {
808 super(CommitTransactionReply.SERIALIZABLE_CLASS);
812 public void onComplete(final Throwable error, final Object resp) {
813 super.onComplete(error, resp);
814 commitLatch.countDown();
818 class OnCanCommitFutureComplete extends OnFutureComplete {
819 private final String transactionID;
821 OnCanCommitFutureComplete(final String transactionID) {
822 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
823 this.transactionID = transactionID;
827 void onSuccess(final Object resp) throws Exception {
828 CanCommitTransactionReply canCommitReply =
829 CanCommitTransactionReply.fromSerializable(resp);
830 assertEquals("Can commit", true, canCommitReply.getCanCommit());
832 Future<Object> commitFuture = Patterns.ask(shard,
833 new CommitTransaction(transactionID).toSerializable(), timeout);
834 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
838 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
839 getSystem().dispatcher());
841 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
842 getSystem().dispatcher());
844 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
846 if(caughtEx.get() != null) {
847 throw caughtEx.get();
850 assertEquals("Commits complete", true, done);
852 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
853 inOrder.verify(cohort1).canCommit();
854 inOrder.verify(cohort1).preCommit();
855 inOrder.verify(cohort1).commit();
856 inOrder.verify(cohort2).canCommit();
857 inOrder.verify(cohort2).preCommit();
858 inOrder.verify(cohort2).commit();
859 inOrder.verify(cohort3).canCommit();
860 inOrder.verify(cohort3).preCommit();
861 inOrder.verify(cohort3).commit();
863 // Verify data in the data store.
865 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
866 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
867 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
868 outerList.getValue() instanceof Iterable);
869 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
870 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
871 entry instanceof MapEntryNode);
872 MapEntryNode mapEntry = (MapEntryNode)entry;
873 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
874 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
875 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
876 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
878 verifyLastLogIndex(shard, 2);
880 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
884 private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
885 for(int i = 0; i < 20 * 5; i++) {
886 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
887 if(lastLogIndex == expectedValue) {
890 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
893 assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
897 public void testCommitWithPersistenceDisabled() throws Throwable {
898 dataStoreContextBuilder.persistent(false);
899 new ShardTestKit(getSystem()) {{
900 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
901 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
902 "testCommitPhaseFailure");
904 waitUntilLeader(shard);
906 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
908 // Setup a simulated transactions with a mock cohort.
910 String transactionID = "tx";
911 MutableCompositeModification modification = new MutableCompositeModification();
912 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
913 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
914 TestModel.TEST_PATH, containerNode, modification);
916 FiniteDuration duration = duration("5 seconds");
918 // Simulate the ForwardedReadyTransaction messages that would be sent
919 // by the ShardTransaction.
921 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
922 cohort, modification, true), getRef());
923 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
925 // Send the CanCommitTransaction message.
927 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
928 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
929 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
930 assertEquals("Can commit", true, canCommitReply.getCanCommit());
932 // Send the CanCommitTransaction message.
934 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
935 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
937 InOrder inOrder = inOrder(cohort);
938 inOrder.verify(cohort).canCommit();
939 inOrder.verify(cohort).preCommit();
940 inOrder.verify(cohort).commit();
942 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
943 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
945 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
950 public void testCommitPhaseFailure() throws Throwable {
951 new ShardTestKit(getSystem()) {{
952 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
953 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
954 "testCommitPhaseFailure");
956 waitUntilLeader(shard);
958 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
961 String transactionID1 = "tx1";
962 MutableCompositeModification modification1 = new MutableCompositeModification();
963 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
964 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
965 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
966 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
968 String transactionID2 = "tx2";
969 MutableCompositeModification modification2 = new MutableCompositeModification();
970 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
971 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
973 FiniteDuration duration = duration("5 seconds");
974 final Timeout timeout = new Timeout(duration);
976 // Simulate the ForwardedReadyTransaction messages that would be sent
977 // by the ShardTransaction.
979 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
980 cohort1, modification1, true), getRef());
981 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
983 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
984 cohort2, modification2, true), getRef());
985 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
987 // Send the CanCommitTransaction message for the first Tx.
989 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
990 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
991 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
992 assertEquals("Can commit", true, canCommitReply.getCanCommit());
994 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
995 // processed after the first Tx completes.
997 Future<Object> canCommitFuture = Patterns.ask(shard,
998 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1000 // Send the CommitTransaction message for the first Tx. This should send back an error
1001 // and trigger the 2nd Tx to proceed.
1003 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1004 expectMsgClass(duration, akka.actor.Status.Failure.class);
1006 // Wait for the 2nd Tx to complete the canCommit phase.
1008 final CountDownLatch latch = new CountDownLatch(1);
1009 canCommitFuture.onComplete(new OnComplete<Object>() {
1011 public void onComplete(final Throwable t, final Object resp) {
1014 }, getSystem().dispatcher());
1016 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1018 InOrder inOrder = inOrder(cohort1, cohort2);
1019 inOrder.verify(cohort1).canCommit();
1020 inOrder.verify(cohort1).preCommit();
1021 inOrder.verify(cohort1).commit();
1022 inOrder.verify(cohort2).canCommit();
1024 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1029 public void testPreCommitPhaseFailure() throws Throwable {
1030 new ShardTestKit(getSystem()) {{
1031 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1032 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1033 "testPreCommitPhaseFailure");
1035 waitUntilLeader(shard);
1037 String transactionID = "tx1";
1038 MutableCompositeModification modification = new MutableCompositeModification();
1039 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1040 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1041 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1043 FiniteDuration duration = duration("5 seconds");
1045 // Simulate the ForwardedReadyTransaction messages that would be sent
1046 // by the ShardTransaction.
1048 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1049 cohort, modification, true), getRef());
1050 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1052 // Send the CanCommitTransaction message.
1054 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1055 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1056 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1057 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1059 // Send the CommitTransaction message. This should send back an error
1060 // for preCommit failure.
1062 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1063 expectMsgClass(duration, akka.actor.Status.Failure.class);
1065 InOrder inOrder = inOrder(cohort);
1066 inOrder.verify(cohort).canCommit();
1067 inOrder.verify(cohort).preCommit();
1069 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1074 public void testCanCommitPhaseFailure() throws Throwable {
1075 new ShardTestKit(getSystem()) {{
1076 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1077 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1078 "testCanCommitPhaseFailure");
1080 waitUntilLeader(shard);
1082 final FiniteDuration duration = duration("5 seconds");
1084 String transactionID = "tx1";
1085 MutableCompositeModification modification = new MutableCompositeModification();
1086 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1087 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1089 // Simulate the ForwardedReadyTransaction messages that would be sent
1090 // by the ShardTransaction.
1092 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1093 cohort, modification, true), getRef());
1094 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1096 // Send the CanCommitTransaction message.
1098 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1099 expectMsgClass(duration, akka.actor.Status.Failure.class);
1101 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1106 public void testAbortBeforeFinishCommit() throws Throwable {
1107 new ShardTestKit(getSystem()) {{
1108 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1109 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1110 "testAbortBeforeFinishCommit");
1112 waitUntilLeader(shard);
1114 final FiniteDuration duration = duration("5 seconds");
1115 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1117 final String transactionID = "tx1";
1118 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1119 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1121 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1122 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1124 // Simulate an AbortTransaction message occurring during replication, after
1125 // persisting and before finishing the commit to the in-memory store.
1126 // We have no followers so due to optimizations in the RaftActor, it does not
1127 // attempt replication and thus we can't send an AbortTransaction message b/c
1128 // it would be processed too late after CommitTransaction completes. So we'll
1129 // simulate an AbortTransaction message occurring during replication by calling
1130 // the shard directly.
1132 shard.underlyingActor().doAbortTransaction(transactionID, null);
1134 return preCommitFuture;
1138 MutableCompositeModification modification = new MutableCompositeModification();
1139 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1140 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1141 modification, preCommit);
1143 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1144 cohort, modification, true), getRef());
1145 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1147 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1148 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1149 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1150 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1152 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1153 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1155 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1157 // Since we're simulating an abort occurring during replication and before finish commit,
1158 // the data should still get written to the in-memory store since we've gotten past
1159 // canCommit and preCommit and persisted the data.
1160 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1162 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1167 public void testTransactionCommitTimeout() throws Throwable {
1168 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1170 new ShardTestKit(getSystem()) {{
1171 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1172 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1173 "testTransactionCommitTimeout");
1175 waitUntilLeader(shard);
1177 final FiniteDuration duration = duration("5 seconds");
1179 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1181 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1182 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1183 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1185 // Create 1st Tx - will timeout
1187 String transactionID1 = "tx1";
1188 MutableCompositeModification modification1 = new MutableCompositeModification();
1189 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1190 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1191 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1192 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1197 String transactionID2 = "tx3";
1198 MutableCompositeModification modification2 = new MutableCompositeModification();
1199 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1200 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1201 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1203 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1208 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1209 cohort1, modification1, true), getRef());
1210 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1212 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1213 cohort2, modification2, true), getRef());
1214 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1216 // canCommit 1st Tx. We don't send the commit so it should timeout.
1218 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1219 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1221 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1223 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1224 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1226 // Commit the 2nd Tx.
1228 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1229 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1231 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1232 assertNotNull(listNodePath + " not found", node);
1234 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1239 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1240 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1242 new ShardTestKit(getSystem()) {{
1243 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1245 "testTransactionCommitQueueCapacityExceeded");
1247 waitUntilLeader(shard);
1249 final FiniteDuration duration = duration("5 seconds");
1251 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1253 String transactionID1 = "tx1";
1254 MutableCompositeModification modification1 = new MutableCompositeModification();
1255 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1256 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1258 String transactionID2 = "tx2";
1259 MutableCompositeModification modification2 = new MutableCompositeModification();
1260 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1261 TestModel.OUTER_LIST_PATH,
1262 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1265 String transactionID3 = "tx3";
1266 MutableCompositeModification modification3 = new MutableCompositeModification();
1267 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1268 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1272 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1273 cohort1, modification1, true), getRef());
1274 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1276 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1277 cohort2, modification2, true), getRef());
1278 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1280 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1281 cohort3, modification3, true), getRef());
1282 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1284 // canCommit 1st Tx.
1286 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1287 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1289 // canCommit the 2nd Tx - it should get queued.
1291 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1293 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1295 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1296 expectMsgClass(duration, akka.actor.Status.Failure.class);
1298 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1303 public void testCanCommitBeforeReadyFailure() throws Throwable {
1304 new ShardTestKit(getSystem()) {{
1305 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1306 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1307 "testCanCommitBeforeReadyFailure");
1309 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1310 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1312 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1317 public void testAbortTransaction() throws Throwable {
1318 new ShardTestKit(getSystem()) {{
1319 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1320 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1321 "testAbortTransaction");
1323 waitUntilLeader(shard);
1325 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1327 String transactionID1 = "tx1";
1328 MutableCompositeModification modification1 = new MutableCompositeModification();
1329 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1330 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1331 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1333 String transactionID2 = "tx2";
1334 MutableCompositeModification modification2 = new MutableCompositeModification();
1335 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1336 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1338 FiniteDuration duration = duration("5 seconds");
1339 final Timeout timeout = new Timeout(duration);
1341 // Simulate the ForwardedReadyTransaction messages that would be sent
1342 // by the ShardTransaction.
1344 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1345 cohort1, modification1, true), getRef());
1346 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1348 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1349 cohort2, modification2, true), getRef());
1350 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1352 // Send the CanCommitTransaction message for the first Tx.
1354 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1355 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1356 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1357 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1359 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1360 // processed after the first Tx completes.
1362 Future<Object> canCommitFuture = Patterns.ask(shard,
1363 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1365 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1368 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1369 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1371 // Wait for the 2nd Tx to complete the canCommit phase.
1373 Await.ready(canCommitFuture, duration);
1375 InOrder inOrder = inOrder(cohort1, cohort2);
1376 inOrder.verify(cohort1).canCommit();
1377 inOrder.verify(cohort2).canCommit();
1379 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1384 public void testCreateSnapshot() throws Exception {
1385 testCreateSnapshot(true, "testCreateSnapshot");
1389 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1390 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1393 @SuppressWarnings("serial")
1394 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1396 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1397 class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1398 DataPersistenceProvider delegate;
1400 DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1401 this.delegate = delegate;
1405 public boolean isRecoveryApplicable() {
1406 return delegate.isRecoveryApplicable();
1410 public <T> void persist(T o, Procedure<T> procedure) {
1411 delegate.persist(o, procedure);
1415 public void saveSnapshot(Object o) {
1416 savedSnapshot.set(o);
1417 delegate.saveSnapshot(o);
1421 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1422 delegate.deleteSnapshots(criteria);
1426 public void deleteMessages(long sequenceNumber) {
1427 delegate.deleteMessages(sequenceNumber);
1431 dataStoreContextBuilder.persistent(persistent);
1433 new ShardTestKit(getSystem()) {{
1434 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1435 Creator<Shard> creator = new Creator<Shard>() {
1437 public Shard create() throws Exception {
1438 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1439 newDatastoreContext(), SCHEMA_CONTEXT) {
1441 DelegatingPersistentDataProvider delegating;
1444 protected DataPersistenceProvider persistence() {
1445 if(delegating == null) {
1446 delegating = new DelegatingPersistentDataProvider(super.persistence());
1453 protected void commitSnapshot(final long sequenceNumber) {
1454 super.commitSnapshot(sequenceNumber);
1455 latch.get().countDown();
1461 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1462 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1464 waitUntilLeader(shard);
1466 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1468 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1470 CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1);
1471 shard.tell(capture, getRef());
1473 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1475 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1476 savedSnapshot.get() instanceof Snapshot);
1478 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1480 latch.set(new CountDownLatch(1));
1481 savedSnapshot.set(null);
1483 shard.tell(capture, getRef());
1485 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1487 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1488 savedSnapshot.get() instanceof Snapshot);
1490 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1492 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1495 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1497 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1498 assertEquals("Root node", expectedRoot, actual);
1504 * This test simply verifies that the applySnapShot logic will work
1505 * @throws ReadFailedException
1508 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1509 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1511 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1513 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1514 putTransaction.write(TestModel.TEST_PATH,
1515 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1516 commitTransaction(putTransaction);
1519 NormalizedNode<?, ?> expected = readStore(store);
1521 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1523 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1524 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1526 commitTransaction(writeTransaction);
1528 NormalizedNode<?, ?> actual = readStore(store);
1530 assertEquals(expected, actual);
1534 public void testRecoveryApplicable(){
1536 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1537 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1539 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1540 persistentContext, SCHEMA_CONTEXT);
1542 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1543 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1545 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1546 nonPersistentContext, SCHEMA_CONTEXT);
1548 new ShardTestKit(getSystem()) {{
1549 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1550 persistentProps, "testPersistence1");
1552 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1554 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1556 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1557 nonPersistentProps, "testPersistence2");
1559 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1561 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1568 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1569 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1570 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1571 transaction.read(YangInstanceIdentifier.builder().build());
1573 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1575 NormalizedNode<?, ?> normalizedNode = optional.get();
1577 transaction.close();
1579 return normalizedNode;
1582 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1583 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1584 ListenableFuture<Void> future =
1585 commitCohort.preCommit();
1588 future = commitCohort.commit();
1590 } catch (InterruptedException | ExecutionException e) {
1594 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1595 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1597 public void onDataChanged(
1598 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1604 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1605 throws ExecutionException, InterruptedException {
1606 return readStore(shard.underlyingActor().getDataStore(), id);
1609 public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1610 throws ExecutionException, InterruptedException {
1611 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1613 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1614 transaction.read(id);
1616 Optional<NormalizedNode<?, ?>> optional = future.get();
1617 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1619 transaction.close();
1624 static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1625 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1626 writeToStore(shard.underlyingActor().getDataStore(), id, node);
1629 public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1630 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1631 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1633 transaction.write(id, node);
1635 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1636 commitCohort.preCommit().get();
1637 commitCohort.commit().get();
1640 @SuppressWarnings("serial")
1641 private static final class DelegatingShardCreator implements Creator<Shard> {
1642 private final Creator<Shard> delegate;
1644 DelegatingShardCreator(final Creator<Shard> delegate) {
1645 this.delegate = delegate;
1649 public Shard create() throws Exception {
1650 return delegate.create();