1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.mockito.invocation.InvocationOnMock;
48 import org.mockito.stubbing.Answer;
49 import org.opendaylight.controller.cluster.DataPersistenceProvider;
50 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.Modification;
67 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
68 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
90 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
91 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
96 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
97 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
98 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
99 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
100 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
101 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
102 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
103 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
104 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
107 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
108 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
111 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
112 import scala.concurrent.Await;
113 import scala.concurrent.Future;
114 import scala.concurrent.duration.FiniteDuration;
116 public class ShardTest extends AbstractActorTest {
118 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
120 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
122 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
123 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
125 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
126 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
127 shardHeartbeatIntervalInMillis(100);
130 public void setUp() {
131 Builder newBuilder = DatastoreContext.newBuilder();
132 InMemorySnapshotStore.clear();
133 InMemoryJournal.clear();
137 public void tearDown() {
138 InMemorySnapshotStore.clear();
139 InMemoryJournal.clear();
142 private DatastoreContext newDatastoreContext() {
143 return dataStoreContextBuilder.build();
146 private Props newShardProps() {
147 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
148 newDatastoreContext(), SCHEMA_CONTEXT);
152 public void testRegisterChangeListener() throws Exception {
153 new ShardTestKit(getSystem()) {{
154 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
155 newShardProps(), "testRegisterChangeListener");
157 waitUntilLeader(shard);
159 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
161 MockDataChangeListener listener = new MockDataChangeListener(1);
162 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
163 "testRegisterChangeListener-DataChangeListener");
165 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
166 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
168 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
169 RegisterChangeListenerReply.class);
170 String replyPath = reply.getListenerRegistrationPath().toString();
171 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
172 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
174 YangInstanceIdentifier path = TestModel.TEST_PATH;
175 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
177 listener.waitForChangeEvents(path);
179 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
180 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
184 @SuppressWarnings("serial")
186 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
187 // This test tests the timing window in which a change listener is registered before the
188 // shard becomes the leader. We verify that the listener is registered and notified of the
189 // existing data when the shard becomes the leader.
190 new ShardTestKit(getSystem()) {{
191 // For this test, we want to send the RegisterChangeListener message after the shard
192 // has recovered from persistence and before it becomes the leader. So we subclass
193 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
194 // we know that the shard has been initialized to a follower and has started the
195 // election process. The following 2 CountDownLatches are used to coordinate the
196 // ElectionTimeout with the sending of the RegisterChangeListener message.
197 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
198 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
199 Creator<Shard> creator = new Creator<Shard>() {
200 boolean firstElectionTimeout = true;
203 public Shard create() throws Exception {
204 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
205 newDatastoreContext(), SCHEMA_CONTEXT) {
207 public void onReceiveCommand(final Object message) throws Exception {
208 if(message instanceof ElectionTimeout && firstElectionTimeout) {
209 // Got the first ElectionTimeout. We don't forward it to the
210 // base Shard yet until we've sent the RegisterChangeListener
211 // message. So we signal the onFirstElectionTimeout latch to tell
212 // the main thread to send the RegisterChangeListener message and
213 // start a thread to wait on the onChangeListenerRegistered latch,
214 // which the main thread signals after it has sent the message.
215 // After the onChangeListenerRegistered is triggered, we send the
216 // original ElectionTimeout message to proceed with the election.
217 firstElectionTimeout = false;
218 final ActorRef self = getSelf();
222 Uninterruptibles.awaitUninterruptibly(
223 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
224 self.tell(message, self);
228 onFirstElectionTimeout.countDown();
230 super.onReceiveCommand(message);
237 MockDataChangeListener listener = new MockDataChangeListener(1);
238 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
239 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
241 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
242 Props.create(new DelegatingShardCreator(creator)),
243 "testRegisterChangeListenerWhenNotLeaderInitially");
245 // Write initial data into the in-memory store.
246 YangInstanceIdentifier path = TestModel.TEST_PATH;
247 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
249 // Wait until the shard receives the first ElectionTimeout message.
250 assertEquals("Got first ElectionTimeout", true,
251 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
253 // Now send the RegisterChangeListener and wait for the reply.
254 shard.tell(new RegisterChangeListener(path, dclActor.path(),
255 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
257 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
258 RegisterChangeListenerReply.class);
259 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
261 // Sanity check - verify the shard is not the leader yet.
262 shard.tell(new FindLeader(), getRef());
263 FindLeaderReply findLeadeReply =
264 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
265 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
267 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
268 // with the election process.
269 onChangeListenerRegistered.countDown();
271 // Wait for the shard to become the leader and notify our listener with the existing
272 // data in the store.
273 listener.waitForChangeEvents(path);
275 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
276 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
281 public void testCreateTransaction(){
282 new ShardTestKit(getSystem()) {{
283 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
285 waitUntilLeader(shard);
287 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
289 shard.tell(new CreateTransaction("txn-1",
290 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
292 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
293 CreateTransactionReply.class);
295 String path = reply.getTransactionActorPath().toString();
296 assertTrue("Unexpected transaction path " + path,
297 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
299 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
304 public void testCreateTransactionOnChain(){
305 new ShardTestKit(getSystem()) {{
306 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
308 waitUntilLeader(shard);
310 shard.tell(new CreateTransaction("txn-1",
311 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
314 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
315 CreateTransactionReply.class);
317 String path = reply.getTransactionActorPath().toString();
318 assertTrue("Unexpected transaction path " + path,
319 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
321 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
325 @SuppressWarnings("serial")
327 public void testPeerAddressResolved() throws Exception {
328 new ShardTestKit(getSystem()) {{
329 final CountDownLatch recoveryComplete = new CountDownLatch(1);
330 class TestShard extends Shard {
332 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
333 newDatastoreContext(), SCHEMA_CONTEXT);
336 Map<String, String> getPeerAddresses() {
337 return getRaftActorContext().getPeerAddresses();
341 protected void onRecoveryComplete() {
343 super.onRecoveryComplete();
345 recoveryComplete.countDown();
350 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
351 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
353 public TestShard create() throws Exception {
354 return new TestShard();
356 })), "testPeerAddressResolved");
358 //waitUntilLeader(shard);
359 assertEquals("Recovery complete", true,
360 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
362 String address = "akka://foobar";
363 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
365 assertEquals("getPeerAddresses", address,
366 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
368 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
373 public void testApplySnapshot() throws Exception {
374 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
375 "testApplySnapshot");
377 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
378 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
380 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
382 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
383 NormalizedNode<?,?> expected = readStore(store, root);
385 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
386 SerializationUtils.serializeNormalizedNode(expected),
387 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
389 shard.underlyingActor().onReceiveCommand(applySnapshot);
391 NormalizedNode<?,?> actual = readStore(shard, root);
393 assertEquals("Root node", expected, actual);
395 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
399 public void testApplyHelium2VersionSnapshot() throws Exception {
400 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
401 "testApplySnapshot");
403 NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
405 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
406 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
408 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
410 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
411 NormalizedNode<?,?> expected = readStore(store, root);
413 NormalizedNodeMessages.Container encode = codec.encode(expected);
415 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
416 encode.getNormalizedNode().toByteString().toByteArray(),
417 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
419 shard.underlyingActor().onReceiveCommand(applySnapshot);
421 NormalizedNode<?,?> actual = readStore(shard, root);
423 assertEquals("Root node", expected, actual);
425 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
429 public void testApplyState() throws Exception {
431 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
433 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
435 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
436 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
438 shard.underlyingActor().onReceiveCommand(applyState);
440 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
441 assertEquals("Applied state", node, actual);
443 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
447 public void testApplyStateLegacy() throws Exception {
449 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
451 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
453 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
454 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
456 shard.underlyingActor().onReceiveCommand(applyState);
458 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
459 assertEquals("Applied state", node, actual);
461 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
465 public void testRecovery() throws Exception {
467 // Set up the InMemorySnapshotStore.
469 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
470 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
472 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
474 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
476 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
477 SerializationUtils.serializeNormalizedNode(root),
478 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
480 // Set up the InMemoryJournal.
482 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
483 new WriteModification(TestModel.OUTER_LIST_PATH,
484 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
486 int nListEntries = 16;
487 Set<Integer> listEntryKeys = new HashSet<>();
489 // Add some ModificationPayload entries
490 for(int i = 1; i <= nListEntries; i++) {
491 listEntryKeys.add(Integer.valueOf(i));
492 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
493 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
494 Modification mod = new MergeModification(path,
495 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
496 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
497 newModificationPayload(mod)));
500 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
501 new ApplyJournalEntries(nListEntries));
503 testRecovery(listEntryKeys);
507 public void testHelium2VersionRecovery() throws Exception {
509 // Set up the InMemorySnapshotStore.
511 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
512 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
514 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
516 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
518 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
519 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
520 getNormalizedNode().toByteString().toByteArray(),
521 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
523 // Set up the InMemoryJournal.
525 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
526 new WriteModification(TestModel.OUTER_LIST_PATH,
527 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
529 int nListEntries = 16;
530 Set<Integer> listEntryKeys = new HashSet<>();
533 // Add some CompositeModificationPayload entries
535 listEntryKeys.add(Integer.valueOf(i));
536 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
537 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
538 Modification mod = new MergeModification(path,
539 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
540 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
541 newLegacyPayload(mod)));
544 // Add some CompositeModificationByteStringPayload entries
545 for(; i <= nListEntries; i++) {
546 listEntryKeys.add(Integer.valueOf(i));
547 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
548 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
549 Modification mod = new MergeModification(path,
550 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
551 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
552 newLegacyByteStringPayload(mod)));
555 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
557 testRecovery(listEntryKeys);
560 private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
561 // Create the actor and wait for recovery complete.
563 int nListEntries = listEntryKeys.size();
565 final CountDownLatch recoveryComplete = new CountDownLatch(1);
567 @SuppressWarnings("serial")
568 Creator<Shard> creator = new Creator<Shard>() {
570 public Shard create() throws Exception {
571 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
572 newDatastoreContext(), SCHEMA_CONTEXT) {
574 protected void onRecoveryComplete() {
576 super.onRecoveryComplete();
578 recoveryComplete.countDown();
585 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
586 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
588 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
590 // Verify data in the data store.
592 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
593 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
594 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
595 outerList.getValue() instanceof Iterable);
596 for(Object entry: (Iterable<?>) outerList.getValue()) {
597 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
598 entry instanceof MapEntryNode);
599 MapEntryNode mapEntry = (MapEntryNode)entry;
600 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
601 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
602 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
603 Object value = idLeaf.get().getValue();
604 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
605 listEntryKeys.remove(value));
608 if(!listEntryKeys.isEmpty()) {
609 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
613 assertEquals("Last log index", nListEntries,
614 shard.underlyingActor().getShardMBean().getLastLogIndex());
615 assertEquals("Commit index", nListEntries,
616 shard.underlyingActor().getShardMBean().getCommitIndex());
617 assertEquals("Last applied", nListEntries,
618 shard.underlyingActor().getShardMBean().getLastApplied());
620 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
623 private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
624 MutableCompositeModification compMod = new MutableCompositeModification();
625 for(Modification mod: mods) {
626 compMod.addModification(mod);
629 return new CompositeModificationPayload(compMod.toSerializable());
632 private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
633 MutableCompositeModification compMod = new MutableCompositeModification();
634 for(Modification mod: mods) {
635 compMod.addModification(mod);
638 return new CompositeModificationByteStringPayload(compMod.toSerializable());
641 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
642 MutableCompositeModification compMod = new MutableCompositeModification();
643 for(Modification mod: mods) {
644 compMod.addModification(mod);
647 return new ModificationPayload(compMod);
650 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
651 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
652 final MutableCompositeModification modification) {
653 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
656 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
657 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
658 final MutableCompositeModification modification,
659 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
661 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
662 tx.write(path, data);
663 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
664 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
666 doAnswer(new Answer<ListenableFuture<Boolean>>() {
668 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
669 return realCohort.canCommit();
671 }).when(cohort).canCommit();
673 doAnswer(new Answer<ListenableFuture<Void>>() {
675 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
676 if(preCommit != null) {
677 return preCommit.apply(realCohort);
679 return realCohort.preCommit();
682 }).when(cohort).preCommit();
684 doAnswer(new Answer<ListenableFuture<Void>>() {
686 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
687 return realCohort.commit();
689 }).when(cohort).commit();
691 doAnswer(new Answer<ListenableFuture<Void>>() {
693 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
694 return realCohort.abort();
696 }).when(cohort).abort();
698 modification.addModification(new WriteModification(path, data));
703 @SuppressWarnings({ "unchecked" })
705 public void testConcurrentThreePhaseCommits() throws Throwable {
706 new ShardTestKit(getSystem()) {{
707 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
708 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
709 "testConcurrentThreePhaseCommits");
711 waitUntilLeader(shard);
713 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
715 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
717 String transactionID1 = "tx1";
718 MutableCompositeModification modification1 = new MutableCompositeModification();
719 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
720 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
722 String transactionID2 = "tx2";
723 MutableCompositeModification modification2 = new MutableCompositeModification();
724 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
725 TestModel.OUTER_LIST_PATH,
726 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
729 String transactionID3 = "tx3";
730 MutableCompositeModification modification3 = new MutableCompositeModification();
731 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
732 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
733 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
734 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
738 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
739 final Timeout timeout = new Timeout(duration);
741 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
742 // by the ShardTransaction.
744 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
745 cohort1, modification1, true), getRef());
746 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
747 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
748 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
750 // Send the CanCommitTransaction message for the first Tx.
752 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
753 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
754 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
755 assertEquals("Can commit", true, canCommitReply.getCanCommit());
757 // Send the ForwardedReadyTransaction for the next 2 Tx's.
759 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
760 cohort2, modification2, true), getRef());
761 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
763 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
764 cohort3, modification3, true), getRef());
765 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
767 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
768 // processed after the first Tx completes.
770 Future<Object> canCommitFuture1 = Patterns.ask(shard,
771 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
773 Future<Object> canCommitFuture2 = Patterns.ask(shard,
774 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
776 // Send the CommitTransaction message for the first Tx. After it completes, it should
777 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
779 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
780 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
782 // Wait for the next 2 Tx's to complete.
784 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
785 final CountDownLatch commitLatch = new CountDownLatch(2);
787 class OnFutureComplete extends OnComplete<Object> {
788 private final Class<?> expRespType;
790 OnFutureComplete(final Class<?> expRespType) {
791 this.expRespType = expRespType;
795 public void onComplete(final Throwable error, final Object resp) {
797 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
800 assertEquals("Commit response type", expRespType, resp.getClass());
802 } catch (Exception e) {
808 void onSuccess(final Object resp) throws Exception {
812 class OnCommitFutureComplete extends OnFutureComplete {
813 OnCommitFutureComplete() {
814 super(CommitTransactionReply.SERIALIZABLE_CLASS);
818 public void onComplete(final Throwable error, final Object resp) {
819 super.onComplete(error, resp);
820 commitLatch.countDown();
824 class OnCanCommitFutureComplete extends OnFutureComplete {
825 private final String transactionID;
827 OnCanCommitFutureComplete(final String transactionID) {
828 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
829 this.transactionID = transactionID;
833 void onSuccess(final Object resp) throws Exception {
834 CanCommitTransactionReply canCommitReply =
835 CanCommitTransactionReply.fromSerializable(resp);
836 assertEquals("Can commit", true, canCommitReply.getCanCommit());
838 Future<Object> commitFuture = Patterns.ask(shard,
839 new CommitTransaction(transactionID).toSerializable(), timeout);
840 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
844 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
845 getSystem().dispatcher());
847 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
848 getSystem().dispatcher());
850 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
852 if(caughtEx.get() != null) {
853 throw caughtEx.get();
856 assertEquals("Commits complete", true, done);
858 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
859 inOrder.verify(cohort1).canCommit();
860 inOrder.verify(cohort1).preCommit();
861 inOrder.verify(cohort1).commit();
862 inOrder.verify(cohort2).canCommit();
863 inOrder.verify(cohort2).preCommit();
864 inOrder.verify(cohort2).commit();
865 inOrder.verify(cohort3).canCommit();
866 inOrder.verify(cohort3).preCommit();
867 inOrder.verify(cohort3).commit();
869 // Verify data in the data store.
871 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
872 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
873 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
874 outerList.getValue() instanceof Iterable);
875 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
876 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
877 entry instanceof MapEntryNode);
878 MapEntryNode mapEntry = (MapEntryNode)entry;
879 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
880 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
881 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
882 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
884 verifyLastLogIndex(shard, 2);
886 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
890 private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
891 for(int i = 0; i < 20 * 5; i++) {
892 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
893 if(lastLogIndex == expectedValue) {
896 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
899 assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
903 public void testCommitWithPersistenceDisabled() throws Throwable {
904 dataStoreContextBuilder.persistent(false);
905 new ShardTestKit(getSystem()) {{
906 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
907 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
908 "testCommitPhaseFailure");
910 waitUntilLeader(shard);
912 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
914 // Setup a simulated transactions with a mock cohort.
916 String transactionID = "tx";
917 MutableCompositeModification modification = new MutableCompositeModification();
918 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
919 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
920 TestModel.TEST_PATH, containerNode, modification);
922 FiniteDuration duration = duration("5 seconds");
924 // Simulate the ForwardedReadyTransaction messages that would be sent
925 // by the ShardTransaction.
927 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
928 cohort, modification, true), getRef());
929 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
931 // Send the CanCommitTransaction message.
933 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
934 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
935 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
936 assertEquals("Can commit", true, canCommitReply.getCanCommit());
938 // Send the CanCommitTransaction message.
940 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
941 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
943 InOrder inOrder = inOrder(cohort);
944 inOrder.verify(cohort).canCommit();
945 inOrder.verify(cohort).preCommit();
946 inOrder.verify(cohort).commit();
948 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
949 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
951 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
956 public void testCommitPhaseFailure() throws Throwable {
957 new ShardTestKit(getSystem()) {{
958 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
959 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
960 "testCommitPhaseFailure");
962 waitUntilLeader(shard);
964 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
967 String transactionID1 = "tx1";
968 MutableCompositeModification modification1 = new MutableCompositeModification();
969 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
970 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
971 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
972 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
974 String transactionID2 = "tx2";
975 MutableCompositeModification modification2 = new MutableCompositeModification();
976 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
977 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
979 FiniteDuration duration = duration("5 seconds");
980 final Timeout timeout = new Timeout(duration);
982 // Simulate the ForwardedReadyTransaction messages that would be sent
983 // by the ShardTransaction.
985 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
986 cohort1, modification1, true), getRef());
987 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
989 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
990 cohort2, modification2, true), getRef());
991 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
993 // Send the CanCommitTransaction message for the first Tx.
995 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
996 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
997 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
998 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1000 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1001 // processed after the first Tx completes.
1003 Future<Object> canCommitFuture = Patterns.ask(shard,
1004 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1006 // Send the CommitTransaction message for the first Tx. This should send back an error
1007 // and trigger the 2nd Tx to proceed.
1009 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1010 expectMsgClass(duration, akka.actor.Status.Failure.class);
1012 // Wait for the 2nd Tx to complete the canCommit phase.
1014 final CountDownLatch latch = new CountDownLatch(1);
1015 canCommitFuture.onComplete(new OnComplete<Object>() {
1017 public void onComplete(final Throwable t, final Object resp) {
1020 }, getSystem().dispatcher());
1022 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1024 InOrder inOrder = inOrder(cohort1, cohort2);
1025 inOrder.verify(cohort1).canCommit();
1026 inOrder.verify(cohort1).preCommit();
1027 inOrder.verify(cohort1).commit();
1028 inOrder.verify(cohort2).canCommit();
1030 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1035 public void testPreCommitPhaseFailure() throws Throwable {
1036 new ShardTestKit(getSystem()) {{
1037 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1038 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1039 "testPreCommitPhaseFailure");
1041 waitUntilLeader(shard);
1043 String transactionID = "tx1";
1044 MutableCompositeModification modification = new MutableCompositeModification();
1045 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1046 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1047 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1049 FiniteDuration duration = duration("5 seconds");
1051 // Simulate the ForwardedReadyTransaction messages that would be sent
1052 // by the ShardTransaction.
1054 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1055 cohort, modification, true), getRef());
1056 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1058 // Send the CanCommitTransaction message.
1060 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1061 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1062 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1063 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1065 // Send the CommitTransaction message. This should send back an error
1066 // for preCommit failure.
1068 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1069 expectMsgClass(duration, akka.actor.Status.Failure.class);
1071 InOrder inOrder = inOrder(cohort);
1072 inOrder.verify(cohort).canCommit();
1073 inOrder.verify(cohort).preCommit();
1075 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1080 public void testCanCommitPhaseFailure() throws Throwable {
1081 new ShardTestKit(getSystem()) {{
1082 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1083 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1084 "testCanCommitPhaseFailure");
1086 waitUntilLeader(shard);
1088 final FiniteDuration duration = duration("5 seconds");
1090 String transactionID = "tx1";
1091 MutableCompositeModification modification = new MutableCompositeModification();
1092 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1093 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1095 // Simulate the ForwardedReadyTransaction messages that would be sent
1096 // by the ShardTransaction.
1098 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1099 cohort, modification, true), getRef());
1100 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1102 // Send the CanCommitTransaction message.
1104 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1105 expectMsgClass(duration, akka.actor.Status.Failure.class);
1107 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1112 public void testAbortBeforeFinishCommit() throws Throwable {
1113 new ShardTestKit(getSystem()) {{
1114 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1115 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1116 "testAbortBeforeFinishCommit");
1118 waitUntilLeader(shard);
1120 final FiniteDuration duration = duration("5 seconds");
1121 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1123 final String transactionID = "tx1";
1124 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1125 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1127 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1128 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1130 // Simulate an AbortTransaction message occurring during replication, after
1131 // persisting and before finishing the commit to the in-memory store.
1132 // We have no followers so due to optimizations in the RaftActor, it does not
1133 // attempt replication and thus we can't send an AbortTransaction message b/c
1134 // it would be processed too late after CommitTransaction completes. So we'll
1135 // simulate an AbortTransaction message occurring during replication by calling
1136 // the shard directly.
1138 shard.underlyingActor().doAbortTransaction(transactionID, null);
1140 return preCommitFuture;
1144 MutableCompositeModification modification = new MutableCompositeModification();
1145 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1146 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1147 modification, preCommit);
1149 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1150 cohort, modification, true), getRef());
1151 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1153 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1154 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1155 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1156 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1158 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1159 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1161 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1163 // Since we're simulating an abort occurring during replication and before finish commit,
1164 // the data should still get written to the in-memory store since we've gotten past
1165 // canCommit and preCommit and persisted the data.
1166 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1168 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1173 public void testTransactionCommitTimeout() throws Throwable {
1174 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1176 new ShardTestKit(getSystem()) {{
1177 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1178 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179 "testTransactionCommitTimeout");
1181 waitUntilLeader(shard);
1183 final FiniteDuration duration = duration("5 seconds");
1185 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1187 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1188 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1189 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1191 // Create 1st Tx - will timeout
1193 String transactionID1 = "tx1";
1194 MutableCompositeModification modification1 = new MutableCompositeModification();
1195 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1196 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1197 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1198 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1203 String transactionID2 = "tx3";
1204 MutableCompositeModification modification2 = new MutableCompositeModification();
1205 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1206 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1207 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1209 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1214 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1215 cohort1, modification1, true), getRef());
1216 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1218 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1219 cohort2, modification2, true), getRef());
1220 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1222 // canCommit 1st Tx. We don't send the commit so it should timeout.
1224 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1225 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1227 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1229 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1230 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1232 // Commit the 2nd Tx.
1234 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1235 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1237 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1238 assertNotNull(listNodePath + " not found", node);
1240 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1245 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1246 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1248 new ShardTestKit(getSystem()) {{
1249 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1250 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1251 "testTransactionCommitQueueCapacityExceeded");
1253 waitUntilLeader(shard);
1255 final FiniteDuration duration = duration("5 seconds");
1257 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1259 String transactionID1 = "tx1";
1260 MutableCompositeModification modification1 = new MutableCompositeModification();
1261 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1262 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1264 String transactionID2 = "tx2";
1265 MutableCompositeModification modification2 = new MutableCompositeModification();
1266 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1267 TestModel.OUTER_LIST_PATH,
1268 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1271 String transactionID3 = "tx3";
1272 MutableCompositeModification modification3 = new MutableCompositeModification();
1273 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1274 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1278 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1279 cohort1, modification1, true), getRef());
1280 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1282 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1283 cohort2, modification2, true), getRef());
1284 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1286 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1287 cohort3, modification3, true), getRef());
1288 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1290 // canCommit 1st Tx.
1292 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1293 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1295 // canCommit the 2nd Tx - it should get queued.
1297 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1299 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1301 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1302 expectMsgClass(duration, akka.actor.Status.Failure.class);
1304 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1309 public void testCanCommitBeforeReadyFailure() throws Throwable {
1310 new ShardTestKit(getSystem()) {{
1311 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1312 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1313 "testCanCommitBeforeReadyFailure");
1315 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1316 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1318 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1323 public void testAbortTransaction() throws Throwable {
1324 new ShardTestKit(getSystem()) {{
1325 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1326 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1327 "testAbortTransaction");
1329 waitUntilLeader(shard);
1331 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1333 String transactionID1 = "tx1";
1334 MutableCompositeModification modification1 = new MutableCompositeModification();
1335 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1336 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1337 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1339 String transactionID2 = "tx2";
1340 MutableCompositeModification modification2 = new MutableCompositeModification();
1341 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1342 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1344 FiniteDuration duration = duration("5 seconds");
1345 final Timeout timeout = new Timeout(duration);
1347 // Simulate the ForwardedReadyTransaction messages that would be sent
1348 // by the ShardTransaction.
1350 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1351 cohort1, modification1, true), getRef());
1352 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1354 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1355 cohort2, modification2, true), getRef());
1356 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1358 // Send the CanCommitTransaction message for the first Tx.
1360 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1361 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1362 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1363 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1365 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1366 // processed after the first Tx completes.
1368 Future<Object> canCommitFuture = Patterns.ask(shard,
1369 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1371 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1374 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1375 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1377 // Wait for the 2nd Tx to complete the canCommit phase.
1379 Await.ready(canCommitFuture, duration);
1381 InOrder inOrder = inOrder(cohort1, cohort2);
1382 inOrder.verify(cohort1).canCommit();
1383 inOrder.verify(cohort2).canCommit();
1385 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1390 public void testCreateSnapshot() throws Exception {
1391 testCreateSnapshot(true, "testCreateSnapshot");
1395 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1396 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1399 @SuppressWarnings("serial")
1400 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1402 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1403 class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1404 DataPersistenceProvider delegate;
1406 DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1407 this.delegate = delegate;
1411 public boolean isRecoveryApplicable() {
1412 return delegate.isRecoveryApplicable();
1416 public <T> void persist(T o, Procedure<T> procedure) {
1417 delegate.persist(o, procedure);
1421 public void saveSnapshot(Object o) {
1422 savedSnapshot.set(o);
1423 delegate.saveSnapshot(o);
1427 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1428 delegate.deleteSnapshots(criteria);
1432 public void deleteMessages(long sequenceNumber) {
1433 delegate.deleteMessages(sequenceNumber);
1437 dataStoreContextBuilder.persistent(persistent);
1439 new ShardTestKit(getSystem()) {{
1440 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1441 Creator<Shard> creator = new Creator<Shard>() {
1443 public Shard create() throws Exception {
1444 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1445 newDatastoreContext(), SCHEMA_CONTEXT) {
1447 DelegatingPersistentDataProvider delegating;
1450 protected DataPersistenceProvider persistence() {
1451 if(delegating == null) {
1452 delegating = new DelegatingPersistentDataProvider(super.persistence());
1459 protected void commitSnapshot(final long sequenceNumber) {
1460 super.commitSnapshot(sequenceNumber);
1461 latch.get().countDown();
1467 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1468 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1470 waitUntilLeader(shard);
1472 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1474 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1476 CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1477 shard.tell(capture, getRef());
1479 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1481 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1482 savedSnapshot.get() instanceof Snapshot);
1484 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1486 latch.set(new CountDownLatch(1));
1487 savedSnapshot.set(null);
1489 shard.tell(capture, getRef());
1491 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1493 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1494 savedSnapshot.get() instanceof Snapshot);
1496 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1498 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1501 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1503 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1504 assertEquals("Root node", expectedRoot, actual);
1510 * This test simply verifies that the applySnapShot logic will work
1511 * @throws ReadFailedException
1514 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1515 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1517 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1519 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1520 putTransaction.write(TestModel.TEST_PATH,
1521 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1522 commitTransaction(putTransaction);
1525 NormalizedNode<?, ?> expected = readStore(store);
1527 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1529 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1530 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1532 commitTransaction(writeTransaction);
1534 NormalizedNode<?, ?> actual = readStore(store);
1536 assertEquals(expected, actual);
1540 public void testRecoveryApplicable(){
1542 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1543 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1545 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1546 persistentContext, SCHEMA_CONTEXT);
1548 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1549 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1551 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1552 nonPersistentContext, SCHEMA_CONTEXT);
1554 new ShardTestKit(getSystem()) {{
1555 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1556 persistentProps, "testPersistence1");
1558 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1560 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1562 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1563 nonPersistentProps, "testPersistence2");
1565 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1567 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1574 public void testOnDatastoreContext() {
1575 new ShardTestKit(getSystem()) {{
1576 dataStoreContextBuilder.persistent(true);
1578 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1580 assertEquals("isRecoveryApplicable", true,
1581 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1583 waitUntilLeader(shard);
1585 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1587 assertEquals("isRecoveryApplicable", false,
1588 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1590 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1592 assertEquals("isRecoveryApplicable", true,
1593 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1595 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1600 public void testRegisterRoleChangeListener() throws Exception {
1601 new ShardTestKit(getSystem()) {
1603 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1604 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1605 "testRegisterRoleChangeListener");
1607 waitUntilLeader(shard);
1609 TestActorRef<MessageCollectorActor> listener =
1610 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1612 shard.tell(new RegisterRoleChangeListener(), listener);
1614 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1615 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1617 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1619 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1621 assertEquals(1, allMatching.size());
1627 public void testFollowerInitialSyncStatus() throws Exception {
1628 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1629 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1630 "testFollowerInitialSyncStatus");
1632 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1634 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1636 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1638 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1640 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1644 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1645 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1646 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1647 transaction.read(YangInstanceIdentifier.builder().build());
1649 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1651 NormalizedNode<?, ?> normalizedNode = optional.get();
1653 transaction.close();
1655 return normalizedNode;
1658 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1659 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1660 ListenableFuture<Void> future =
1661 commitCohort.preCommit();
1664 future = commitCohort.commit();
1666 } catch (InterruptedException | ExecutionException e) {
1670 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1671 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1673 public void onDataChanged(
1674 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1680 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1681 throws ExecutionException, InterruptedException {
1682 return readStore(shard.underlyingActor().getDataStore(), id);
1685 public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1686 throws ExecutionException, InterruptedException {
1687 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1689 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1690 transaction.read(id);
1692 Optional<NormalizedNode<?, ?>> optional = future.get();
1693 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1695 transaction.close();
1700 static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1701 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1702 writeToStore(shard.underlyingActor().getDataStore(), id, node);
1705 public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1706 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1707 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1709 transaction.write(id, node);
1711 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1712 commitCohort.preCommit().get();
1713 commitCohort.commit().get();
1716 @SuppressWarnings("serial")
1717 private static final class DelegatingShardCreator implements Creator<Shard> {
1718 private final Creator<Shard> delegate;
1720 DelegatingShardCreator(final Creator<Shard> delegate) {
1721 this.delegate = delegate;
1725 public Shard create() throws Exception {
1726 return delegate.create();