1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.mockito.invocation.InvocationOnMock;
48 import org.mockito.stubbing.Answer;
49 import org.opendaylight.controller.cluster.DataPersistenceProvider;
50 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.Modification;
67 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
68 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
90 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
91 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
92 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
93 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
98 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
99 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
100 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
101 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
102 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
103 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
104 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
106 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
107 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
109 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
110 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
111 import scala.concurrent.Await;
112 import scala.concurrent.Future;
113 import scala.concurrent.duration.FiniteDuration;
115 public class ShardTest extends AbstractActorTest {
117 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
119 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
121 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
122 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
124 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
125 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
126 shardHeartbeatIntervalInMillis(100);
129 public void setUp() {
130 Builder newBuilder = DatastoreContext.newBuilder();
131 InMemorySnapshotStore.clear();
132 InMemoryJournal.clear();
136 public void tearDown() {
137 InMemorySnapshotStore.clear();
138 InMemoryJournal.clear();
141 private DatastoreContext newDatastoreContext() {
142 return dataStoreContextBuilder.build();
145 private Props newShardProps() {
146 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
147 newDatastoreContext(), SCHEMA_CONTEXT);
151 public void testRegisterChangeListener() throws Exception {
152 new ShardTestKit(getSystem()) {{
153 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
154 newShardProps(), "testRegisterChangeListener");
156 waitUntilLeader(shard);
158 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
160 MockDataChangeListener listener = new MockDataChangeListener(1);
161 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
162 "testRegisterChangeListener-DataChangeListener");
164 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
165 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
167 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
168 RegisterChangeListenerReply.class);
169 String replyPath = reply.getListenerRegistrationPath().toString();
170 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
171 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
173 YangInstanceIdentifier path = TestModel.TEST_PATH;
174 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
176 listener.waitForChangeEvents(path);
178 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
179 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
183 @SuppressWarnings("serial")
185 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
186 // This test tests the timing window in which a change listener is registered before the
187 // shard becomes the leader. We verify that the listener is registered and notified of the
188 // existing data when the shard becomes the leader.
189 new ShardTestKit(getSystem()) {{
190 // For this test, we want to send the RegisterChangeListener message after the shard
191 // has recovered from persistence and before it becomes the leader. So we subclass
192 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
193 // we know that the shard has been initialized to a follower and has started the
194 // election process. The following 2 CountDownLatches are used to coordinate the
195 // ElectionTimeout with the sending of the RegisterChangeListener message.
196 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
197 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
198 Creator<Shard> creator = new Creator<Shard>() {
199 boolean firstElectionTimeout = true;
202 public Shard create() throws Exception {
203 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
204 newDatastoreContext(), SCHEMA_CONTEXT) {
206 public void onReceiveCommand(final Object message) throws Exception {
207 if(message instanceof ElectionTimeout && firstElectionTimeout) {
208 // Got the first ElectionTimeout. We don't forward it to the
209 // base Shard yet until we've sent the RegisterChangeListener
210 // message. So we signal the onFirstElectionTimeout latch to tell
211 // the main thread to send the RegisterChangeListener message and
212 // start a thread to wait on the onChangeListenerRegistered latch,
213 // which the main thread signals after it has sent the message.
214 // After the onChangeListenerRegistered is triggered, we send the
215 // original ElectionTimeout message to proceed with the election.
216 firstElectionTimeout = false;
217 final ActorRef self = getSelf();
221 Uninterruptibles.awaitUninterruptibly(
222 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
223 self.tell(message, self);
227 onFirstElectionTimeout.countDown();
229 super.onReceiveCommand(message);
236 MockDataChangeListener listener = new MockDataChangeListener(1);
237 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
238 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
240 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
241 Props.create(new DelegatingShardCreator(creator)),
242 "testRegisterChangeListenerWhenNotLeaderInitially");
244 // Write initial data into the in-memory store.
245 YangInstanceIdentifier path = TestModel.TEST_PATH;
246 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
248 // Wait until the shard receives the first ElectionTimeout message.
249 assertEquals("Got first ElectionTimeout", true,
250 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
252 // Now send the RegisterChangeListener and wait for the reply.
253 shard.tell(new RegisterChangeListener(path, dclActor.path(),
254 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
256 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
257 RegisterChangeListenerReply.class);
258 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
260 // Sanity check - verify the shard is not the leader yet.
261 shard.tell(new FindLeader(), getRef());
262 FindLeaderReply findLeadeReply =
263 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
264 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
266 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
267 // with the election process.
268 onChangeListenerRegistered.countDown();
270 // Wait for the shard to become the leader and notify our listener with the existing
271 // data in the store.
272 listener.waitForChangeEvents(path);
274 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
275 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
280 public void testCreateTransaction(){
281 new ShardTestKit(getSystem()) {{
282 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
284 waitUntilLeader(shard);
286 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
288 shard.tell(new CreateTransaction("txn-1",
289 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
291 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
292 CreateTransactionReply.class);
294 String path = reply.getTransactionActorPath().toString();
295 assertTrue("Unexpected transaction path " + path,
296 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
298 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
303 public void testCreateTransactionOnChain(){
304 new ShardTestKit(getSystem()) {{
305 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
307 waitUntilLeader(shard);
309 shard.tell(new CreateTransaction("txn-1",
310 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
313 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
314 CreateTransactionReply.class);
316 String path = reply.getTransactionActorPath().toString();
317 assertTrue("Unexpected transaction path " + path,
318 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
320 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
324 @SuppressWarnings("serial")
326 public void testPeerAddressResolved() throws Exception {
327 new ShardTestKit(getSystem()) {{
328 final CountDownLatch recoveryComplete = new CountDownLatch(1);
329 class TestShard extends Shard {
331 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
332 newDatastoreContext(), SCHEMA_CONTEXT);
335 Map<String, String> getPeerAddresses() {
336 return getRaftActorContext().getPeerAddresses();
340 protected void onRecoveryComplete() {
342 super.onRecoveryComplete();
344 recoveryComplete.countDown();
349 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
350 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
352 public TestShard create() throws Exception {
353 return new TestShard();
355 })), "testPeerAddressResolved");
357 //waitUntilLeader(shard);
358 assertEquals("Recovery complete", true,
359 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
361 String address = "akka://foobar";
362 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
364 assertEquals("getPeerAddresses", address,
365 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
367 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
372 public void testApplySnapshot() throws Exception {
373 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
374 "testApplySnapshot");
376 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
377 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
379 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
381 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
382 NormalizedNode<?,?> expected = readStore(store, root);
384 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
385 SerializationUtils.serializeNormalizedNode(expected),
386 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
388 shard.underlyingActor().onReceiveCommand(applySnapshot);
390 NormalizedNode<?,?> actual = readStore(shard, root);
392 assertEquals("Root node", expected, actual);
394 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
398 public void testApplyHelium2VersionSnapshot() throws Exception {
399 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
400 "testApplySnapshot");
402 NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
404 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
405 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
407 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
410 NormalizedNode<?,?> expected = readStore(store, root);
412 NormalizedNodeMessages.Container encode = codec.encode(expected);
414 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
415 encode.getNormalizedNode().toByteString().toByteArray(),
416 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
418 shard.underlyingActor().onReceiveCommand(applySnapshot);
420 NormalizedNode<?,?> actual = readStore(shard, root);
422 assertEquals("Root node", expected, actual);
424 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
428 public void testApplyState() throws Exception {
430 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
432 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
434 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
435 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
437 shard.underlyingActor().onReceiveCommand(applyState);
439 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
440 assertEquals("Applied state", node, actual);
442 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
446 public void testApplyStateLegacy() throws Exception {
448 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
450 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
452 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
453 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
455 shard.underlyingActor().onReceiveCommand(applyState);
457 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
458 assertEquals("Applied state", node, actual);
460 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
464 public void testRecovery() throws Exception {
466 // Set up the InMemorySnapshotStore.
468 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
469 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
471 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
473 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
475 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
476 SerializationUtils.serializeNormalizedNode(root),
477 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
479 // Set up the InMemoryJournal.
481 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
482 new WriteModification(TestModel.OUTER_LIST_PATH,
483 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
485 int nListEntries = 16;
486 Set<Integer> listEntryKeys = new HashSet<>();
488 // Add some ModificationPayload entries
489 for(int i = 1; i <= nListEntries; i++) {
490 listEntryKeys.add(Integer.valueOf(i));
491 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
492 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
493 Modification mod = new MergeModification(path,
494 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
495 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
496 newModificationPayload(mod)));
499 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
500 new ApplyJournalEntries(nListEntries));
502 testRecovery(listEntryKeys);
506 public void testHelium2VersionRecovery() throws Exception {
508 // Set up the InMemorySnapshotStore.
510 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
511 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
513 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
515 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
517 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
518 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
519 getNormalizedNode().toByteString().toByteArray(),
520 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
522 // Set up the InMemoryJournal.
524 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
525 new WriteModification(TestModel.OUTER_LIST_PATH,
526 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
528 int nListEntries = 16;
529 Set<Integer> listEntryKeys = new HashSet<>();
532 // Add some CompositeModificationPayload entries
534 listEntryKeys.add(Integer.valueOf(i));
535 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
536 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
537 Modification mod = new MergeModification(path,
538 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
539 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
540 newLegacyPayload(mod)));
543 // Add some CompositeModificationByteStringPayload entries
544 for(; i <= nListEntries; i++) {
545 listEntryKeys.add(Integer.valueOf(i));
546 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
547 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
548 Modification mod = new MergeModification(path,
549 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
550 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
551 newLegacyByteStringPayload(mod)));
554 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
556 testRecovery(listEntryKeys);
559 private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
560 // Create the actor and wait for recovery complete.
562 int nListEntries = listEntryKeys.size();
564 final CountDownLatch recoveryComplete = new CountDownLatch(1);
566 @SuppressWarnings("serial")
567 Creator<Shard> creator = new Creator<Shard>() {
569 public Shard create() throws Exception {
570 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
571 newDatastoreContext(), SCHEMA_CONTEXT) {
573 protected void onRecoveryComplete() {
575 super.onRecoveryComplete();
577 recoveryComplete.countDown();
584 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
585 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
587 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
589 // Verify data in the data store.
591 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
592 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
593 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
594 outerList.getValue() instanceof Iterable);
595 for(Object entry: (Iterable<?>) outerList.getValue()) {
596 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
597 entry instanceof MapEntryNode);
598 MapEntryNode mapEntry = (MapEntryNode)entry;
599 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
600 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
601 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
602 Object value = idLeaf.get().getValue();
603 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
604 listEntryKeys.remove(value));
607 if(!listEntryKeys.isEmpty()) {
608 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
612 assertEquals("Last log index", nListEntries,
613 shard.underlyingActor().getShardMBean().getLastLogIndex());
614 assertEquals("Commit index", nListEntries,
615 shard.underlyingActor().getShardMBean().getCommitIndex());
616 assertEquals("Last applied", nListEntries,
617 shard.underlyingActor().getShardMBean().getLastApplied());
619 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
622 private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
623 MutableCompositeModification compMod = new MutableCompositeModification();
624 for(Modification mod: mods) {
625 compMod.addModification(mod);
628 return new CompositeModificationPayload(compMod.toSerializable());
631 private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
632 MutableCompositeModification compMod = new MutableCompositeModification();
633 for(Modification mod: mods) {
634 compMod.addModification(mod);
637 return new CompositeModificationByteStringPayload(compMod.toSerializable());
640 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
641 MutableCompositeModification compMod = new MutableCompositeModification();
642 for(Modification mod: mods) {
643 compMod.addModification(mod);
646 return new ModificationPayload(compMod);
649 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
650 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
651 final MutableCompositeModification modification) {
652 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
655 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
656 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
657 final MutableCompositeModification modification,
658 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
660 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
661 tx.write(path, data);
662 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
663 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
665 doAnswer(new Answer<ListenableFuture<Boolean>>() {
667 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
668 return realCohort.canCommit();
670 }).when(cohort).canCommit();
672 doAnswer(new Answer<ListenableFuture<Void>>() {
674 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
675 if(preCommit != null) {
676 return preCommit.apply(realCohort);
678 return realCohort.preCommit();
681 }).when(cohort).preCommit();
683 doAnswer(new Answer<ListenableFuture<Void>>() {
685 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
686 return realCohort.commit();
688 }).when(cohort).commit();
690 doAnswer(new Answer<ListenableFuture<Void>>() {
692 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
693 return realCohort.abort();
695 }).when(cohort).abort();
697 modification.addModification(new WriteModification(path, data));
702 @SuppressWarnings({ "unchecked" })
704 public void testConcurrentThreePhaseCommits() throws Throwable {
705 new ShardTestKit(getSystem()) {{
706 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
707 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
708 "testConcurrentThreePhaseCommits");
710 waitUntilLeader(shard);
712 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
714 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
716 String transactionID1 = "tx1";
717 MutableCompositeModification modification1 = new MutableCompositeModification();
718 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
719 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
721 String transactionID2 = "tx2";
722 MutableCompositeModification modification2 = new MutableCompositeModification();
723 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
724 TestModel.OUTER_LIST_PATH,
725 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
728 String transactionID3 = "tx3";
729 MutableCompositeModification modification3 = new MutableCompositeModification();
730 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
731 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
732 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
733 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
737 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
738 final Timeout timeout = new Timeout(duration);
740 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
741 // by the ShardTransaction.
743 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
744 cohort1, modification1, true), getRef());
745 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
746 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
747 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
749 // Send the CanCommitTransaction message for the first Tx.
751 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
752 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
753 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
754 assertEquals("Can commit", true, canCommitReply.getCanCommit());
756 // Send the ForwardedReadyTransaction for the next 2 Tx's.
758 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
759 cohort2, modification2, true), getRef());
760 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
762 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
763 cohort3, modification3, true), getRef());
764 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
766 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
767 // processed after the first Tx completes.
769 Future<Object> canCommitFuture1 = Patterns.ask(shard,
770 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
772 Future<Object> canCommitFuture2 = Patterns.ask(shard,
773 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
775 // Send the CommitTransaction message for the first Tx. After it completes, it should
776 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
778 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
779 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
781 // Wait for the next 2 Tx's to complete.
783 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
784 final CountDownLatch commitLatch = new CountDownLatch(2);
786 class OnFutureComplete extends OnComplete<Object> {
787 private final Class<?> expRespType;
789 OnFutureComplete(final Class<?> expRespType) {
790 this.expRespType = expRespType;
794 public void onComplete(final Throwable error, final Object resp) {
796 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
799 assertEquals("Commit response type", expRespType, resp.getClass());
801 } catch (Exception e) {
807 void onSuccess(final Object resp) throws Exception {
811 class OnCommitFutureComplete extends OnFutureComplete {
812 OnCommitFutureComplete() {
813 super(CommitTransactionReply.SERIALIZABLE_CLASS);
817 public void onComplete(final Throwable error, final Object resp) {
818 super.onComplete(error, resp);
819 commitLatch.countDown();
823 class OnCanCommitFutureComplete extends OnFutureComplete {
824 private final String transactionID;
826 OnCanCommitFutureComplete(final String transactionID) {
827 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
828 this.transactionID = transactionID;
832 void onSuccess(final Object resp) throws Exception {
833 CanCommitTransactionReply canCommitReply =
834 CanCommitTransactionReply.fromSerializable(resp);
835 assertEquals("Can commit", true, canCommitReply.getCanCommit());
837 Future<Object> commitFuture = Patterns.ask(shard,
838 new CommitTransaction(transactionID).toSerializable(), timeout);
839 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
843 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
844 getSystem().dispatcher());
846 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
847 getSystem().dispatcher());
849 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
851 if(caughtEx.get() != null) {
852 throw caughtEx.get();
855 assertEquals("Commits complete", true, done);
857 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
858 inOrder.verify(cohort1).canCommit();
859 inOrder.verify(cohort1).preCommit();
860 inOrder.verify(cohort1).commit();
861 inOrder.verify(cohort2).canCommit();
862 inOrder.verify(cohort2).preCommit();
863 inOrder.verify(cohort2).commit();
864 inOrder.verify(cohort3).canCommit();
865 inOrder.verify(cohort3).preCommit();
866 inOrder.verify(cohort3).commit();
868 // Verify data in the data store.
870 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
871 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
872 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
873 outerList.getValue() instanceof Iterable);
874 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
875 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
876 entry instanceof MapEntryNode);
877 MapEntryNode mapEntry = (MapEntryNode)entry;
878 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
879 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
880 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
881 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
883 verifyLastLogIndex(shard, 2);
885 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
889 private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
890 for(int i = 0; i < 20 * 5; i++) {
891 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
892 if(lastLogIndex == expectedValue) {
895 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
898 assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
902 public void testCommitWithPersistenceDisabled() throws Throwable {
903 dataStoreContextBuilder.persistent(false);
904 new ShardTestKit(getSystem()) {{
905 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
906 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
907 "testCommitPhaseFailure");
909 waitUntilLeader(shard);
911 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
913 // Setup a simulated transactions with a mock cohort.
915 String transactionID = "tx";
916 MutableCompositeModification modification = new MutableCompositeModification();
917 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
918 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
919 TestModel.TEST_PATH, containerNode, modification);
921 FiniteDuration duration = duration("5 seconds");
923 // Simulate the ForwardedReadyTransaction messages that would be sent
924 // by the ShardTransaction.
926 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
927 cohort, modification, true), getRef());
928 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
930 // Send the CanCommitTransaction message.
932 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
933 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
934 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
935 assertEquals("Can commit", true, canCommitReply.getCanCommit());
937 // Send the CanCommitTransaction message.
939 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
940 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
942 InOrder inOrder = inOrder(cohort);
943 inOrder.verify(cohort).canCommit();
944 inOrder.verify(cohort).preCommit();
945 inOrder.verify(cohort).commit();
947 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
948 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
950 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
955 public void testCommitPhaseFailure() throws Throwable {
956 new ShardTestKit(getSystem()) {{
957 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
958 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
959 "testCommitPhaseFailure");
961 waitUntilLeader(shard);
963 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
966 String transactionID1 = "tx1";
967 MutableCompositeModification modification1 = new MutableCompositeModification();
968 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
969 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
970 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
971 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
973 String transactionID2 = "tx2";
974 MutableCompositeModification modification2 = new MutableCompositeModification();
975 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
976 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
978 FiniteDuration duration = duration("5 seconds");
979 final Timeout timeout = new Timeout(duration);
981 // Simulate the ForwardedReadyTransaction messages that would be sent
982 // by the ShardTransaction.
984 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
985 cohort1, modification1, true), getRef());
986 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
988 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
989 cohort2, modification2, true), getRef());
990 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
992 // Send the CanCommitTransaction message for the first Tx.
994 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
995 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
996 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
997 assertEquals("Can commit", true, canCommitReply.getCanCommit());
999 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1000 // processed after the first Tx completes.
1002 Future<Object> canCommitFuture = Patterns.ask(shard,
1003 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1005 // Send the CommitTransaction message for the first Tx. This should send back an error
1006 // and trigger the 2nd Tx to proceed.
1008 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1009 expectMsgClass(duration, akka.actor.Status.Failure.class);
1011 // Wait for the 2nd Tx to complete the canCommit phase.
1013 final CountDownLatch latch = new CountDownLatch(1);
1014 canCommitFuture.onComplete(new OnComplete<Object>() {
1016 public void onComplete(final Throwable t, final Object resp) {
1019 }, getSystem().dispatcher());
1021 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1023 InOrder inOrder = inOrder(cohort1, cohort2);
1024 inOrder.verify(cohort1).canCommit();
1025 inOrder.verify(cohort1).preCommit();
1026 inOrder.verify(cohort1).commit();
1027 inOrder.verify(cohort2).canCommit();
1029 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1034 public void testPreCommitPhaseFailure() throws Throwable {
1035 new ShardTestKit(getSystem()) {{
1036 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1037 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038 "testPreCommitPhaseFailure");
1040 waitUntilLeader(shard);
1042 String transactionID = "tx1";
1043 MutableCompositeModification modification = new MutableCompositeModification();
1044 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1045 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1046 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1048 FiniteDuration duration = duration("5 seconds");
1050 // Simulate the ForwardedReadyTransaction messages that would be sent
1051 // by the ShardTransaction.
1053 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1054 cohort, modification, true), getRef());
1055 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1057 // Send the CanCommitTransaction message.
1059 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1060 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1061 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1062 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1064 // Send the CommitTransaction message. This should send back an error
1065 // for preCommit failure.
1067 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1068 expectMsgClass(duration, akka.actor.Status.Failure.class);
1070 InOrder inOrder = inOrder(cohort);
1071 inOrder.verify(cohort).canCommit();
1072 inOrder.verify(cohort).preCommit();
1074 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1079 public void testCanCommitPhaseFailure() throws Throwable {
1080 new ShardTestKit(getSystem()) {{
1081 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1082 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1083 "testCanCommitPhaseFailure");
1085 waitUntilLeader(shard);
1087 final FiniteDuration duration = duration("5 seconds");
1089 String transactionID = "tx1";
1090 MutableCompositeModification modification = new MutableCompositeModification();
1091 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1092 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1094 // Simulate the ForwardedReadyTransaction messages that would be sent
1095 // by the ShardTransaction.
1097 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1098 cohort, modification, true), getRef());
1099 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1101 // Send the CanCommitTransaction message.
1103 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1104 expectMsgClass(duration, akka.actor.Status.Failure.class);
1106 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1111 public void testAbortBeforeFinishCommit() throws Throwable {
1112 new ShardTestKit(getSystem()) {{
1113 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1114 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1115 "testAbortBeforeFinishCommit");
1117 waitUntilLeader(shard);
1119 final FiniteDuration duration = duration("5 seconds");
1120 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1122 final String transactionID = "tx1";
1123 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1124 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1126 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1127 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1129 // Simulate an AbortTransaction message occurring during replication, after
1130 // persisting and before finishing the commit to the in-memory store.
1131 // We have no followers so due to optimizations in the RaftActor, it does not
1132 // attempt replication and thus we can't send an AbortTransaction message b/c
1133 // it would be processed too late after CommitTransaction completes. So we'll
1134 // simulate an AbortTransaction message occurring during replication by calling
1135 // the shard directly.
1137 shard.underlyingActor().doAbortTransaction(transactionID, null);
1139 return preCommitFuture;
1143 MutableCompositeModification modification = new MutableCompositeModification();
1144 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1145 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1146 modification, preCommit);
1148 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1149 cohort, modification, true), getRef());
1150 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1152 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1153 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1154 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1155 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1157 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1158 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1160 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1162 // Since we're simulating an abort occurring during replication and before finish commit,
1163 // the data should still get written to the in-memory store since we've gotten past
1164 // canCommit and preCommit and persisted the data.
1165 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1167 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1172 public void testTransactionCommitTimeout() throws Throwable {
1173 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1175 new ShardTestKit(getSystem()) {{
1176 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1177 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1178 "testTransactionCommitTimeout");
1180 waitUntilLeader(shard);
1182 final FiniteDuration duration = duration("5 seconds");
1184 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1186 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1187 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1188 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1190 // Create 1st Tx - will timeout
1192 String transactionID1 = "tx1";
1193 MutableCompositeModification modification1 = new MutableCompositeModification();
1194 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1195 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1196 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1197 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1202 String transactionID2 = "tx3";
1203 MutableCompositeModification modification2 = new MutableCompositeModification();
1204 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1205 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1206 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1208 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1213 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1214 cohort1, modification1, true), getRef());
1215 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1217 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1218 cohort2, modification2, true), getRef());
1219 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1221 // canCommit 1st Tx. We don't send the commit so it should timeout.
1223 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1224 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1226 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1228 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1229 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1231 // Commit the 2nd Tx.
1233 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1234 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1236 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1237 assertNotNull(listNodePath + " not found", node);
1239 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1244 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1245 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1247 new ShardTestKit(getSystem()) {{
1248 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1249 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1250 "testTransactionCommitQueueCapacityExceeded");
1252 waitUntilLeader(shard);
1254 final FiniteDuration duration = duration("5 seconds");
1256 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1258 String transactionID1 = "tx1";
1259 MutableCompositeModification modification1 = new MutableCompositeModification();
1260 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1261 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1263 String transactionID2 = "tx2";
1264 MutableCompositeModification modification2 = new MutableCompositeModification();
1265 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1266 TestModel.OUTER_LIST_PATH,
1267 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1270 String transactionID3 = "tx3";
1271 MutableCompositeModification modification3 = new MutableCompositeModification();
1272 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1273 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1277 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1278 cohort1, modification1, true), getRef());
1279 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1281 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1282 cohort2, modification2, true), getRef());
1283 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1285 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1286 cohort3, modification3, true), getRef());
1287 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1289 // canCommit 1st Tx.
1291 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1292 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1294 // canCommit the 2nd Tx - it should get queued.
1296 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1298 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1300 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1301 expectMsgClass(duration, akka.actor.Status.Failure.class);
1303 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1308 public void testCanCommitBeforeReadyFailure() throws Throwable {
1309 new ShardTestKit(getSystem()) {{
1310 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1311 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1312 "testCanCommitBeforeReadyFailure");
1314 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1315 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1317 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1322 public void testAbortTransaction() throws Throwable {
1323 new ShardTestKit(getSystem()) {{
1324 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1325 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1326 "testAbortTransaction");
1328 waitUntilLeader(shard);
1330 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1332 String transactionID1 = "tx1";
1333 MutableCompositeModification modification1 = new MutableCompositeModification();
1334 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1335 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1336 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1338 String transactionID2 = "tx2";
1339 MutableCompositeModification modification2 = new MutableCompositeModification();
1340 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1341 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1343 FiniteDuration duration = duration("5 seconds");
1344 final Timeout timeout = new Timeout(duration);
1346 // Simulate the ForwardedReadyTransaction messages that would be sent
1347 // by the ShardTransaction.
1349 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1350 cohort1, modification1, true), getRef());
1351 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1353 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1354 cohort2, modification2, true), getRef());
1355 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1357 // Send the CanCommitTransaction message for the first Tx.
1359 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1360 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1361 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1362 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1364 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1365 // processed after the first Tx completes.
1367 Future<Object> canCommitFuture = Patterns.ask(shard,
1368 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1370 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1373 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1374 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1376 // Wait for the 2nd Tx to complete the canCommit phase.
1378 Await.ready(canCommitFuture, duration);
1380 InOrder inOrder = inOrder(cohort1, cohort2);
1381 inOrder.verify(cohort1).canCommit();
1382 inOrder.verify(cohort2).canCommit();
1384 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1389 public void testCreateSnapshot() throws Exception {
1390 testCreateSnapshot(true, "testCreateSnapshot");
1394 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1395 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1398 @SuppressWarnings("serial")
1399 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1401 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1402 class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1403 DataPersistenceProvider delegate;
1405 DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1406 this.delegate = delegate;
1410 public boolean isRecoveryApplicable() {
1411 return delegate.isRecoveryApplicable();
1415 public <T> void persist(T o, Procedure<T> procedure) {
1416 delegate.persist(o, procedure);
1420 public void saveSnapshot(Object o) {
1421 savedSnapshot.set(o);
1422 delegate.saveSnapshot(o);
1426 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1427 delegate.deleteSnapshots(criteria);
1431 public void deleteMessages(long sequenceNumber) {
1432 delegate.deleteMessages(sequenceNumber);
1436 dataStoreContextBuilder.persistent(persistent);
1438 new ShardTestKit(getSystem()) {{
1439 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1440 Creator<Shard> creator = new Creator<Shard>() {
1442 public Shard create() throws Exception {
1443 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1444 newDatastoreContext(), SCHEMA_CONTEXT) {
1446 DelegatingPersistentDataProvider delegating;
1449 protected DataPersistenceProvider persistence() {
1450 if(delegating == null) {
1451 delegating = new DelegatingPersistentDataProvider(super.persistence());
1458 protected void commitSnapshot(final long sequenceNumber) {
1459 super.commitSnapshot(sequenceNumber);
1460 latch.get().countDown();
1466 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1467 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1469 waitUntilLeader(shard);
1471 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1473 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1475 CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1476 shard.tell(capture, getRef());
1478 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1480 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1481 savedSnapshot.get() instanceof Snapshot);
1483 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1485 latch.set(new CountDownLatch(1));
1486 savedSnapshot.set(null);
1488 shard.tell(capture, getRef());
1490 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1492 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1493 savedSnapshot.get() instanceof Snapshot);
1495 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1497 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1500 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1502 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1503 assertEquals("Root node", expectedRoot, actual);
1509 * This test simply verifies that the applySnapShot logic will work
1510 * @throws ReadFailedException
1513 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1514 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1516 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1518 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1519 putTransaction.write(TestModel.TEST_PATH,
1520 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1521 commitTransaction(putTransaction);
1524 NormalizedNode<?, ?> expected = readStore(store);
1526 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1528 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1529 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1531 commitTransaction(writeTransaction);
1533 NormalizedNode<?, ?> actual = readStore(store);
1535 assertEquals(expected, actual);
1539 public void testRecoveryApplicable(){
1541 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1542 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1544 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1545 persistentContext, SCHEMA_CONTEXT);
1547 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1548 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1550 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1551 nonPersistentContext, SCHEMA_CONTEXT);
1553 new ShardTestKit(getSystem()) {{
1554 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1555 persistentProps, "testPersistence1");
1557 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1559 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1561 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1562 nonPersistentProps, "testPersistence2");
1564 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1566 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1573 public void testOnDatastoreContext() {
1574 new ShardTestKit(getSystem()) {{
1575 dataStoreContextBuilder.persistent(true);
1577 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1579 assertEquals("isRecoveryApplicable", true,
1580 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1582 waitUntilLeader(shard);
1584 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1586 assertEquals("isRecoveryApplicable", false,
1587 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1589 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1591 assertEquals("isRecoveryApplicable", true,
1592 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1594 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1599 public void testRegisterRoleChangeListener() throws Exception {
1600 new ShardTestKit(getSystem()) {
1602 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1603 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1604 "testRegisterRoleChangeListener");
1606 waitUntilLeader(shard);
1608 TestActorRef<MessageCollectorActor> listener =
1609 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1611 shard.tell(new RegisterRoleChangeListener(), listener);
1613 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1614 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1616 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1618 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1620 assertEquals(1, allMatching.size());
1625 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1626 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1627 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1628 transaction.read(YangInstanceIdentifier.builder().build());
1630 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1632 NormalizedNode<?, ?> normalizedNode = optional.get();
1634 transaction.close();
1636 return normalizedNode;
1639 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1640 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1641 ListenableFuture<Void> future =
1642 commitCohort.preCommit();
1645 future = commitCohort.commit();
1647 } catch (InterruptedException | ExecutionException e) {
1651 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1652 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1654 public void onDataChanged(
1655 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1661 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1662 throws ExecutionException, InterruptedException {
1663 return readStore(shard.underlyingActor().getDataStore(), id);
1666 public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1667 throws ExecutionException, InterruptedException {
1668 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1670 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1671 transaction.read(id);
1673 Optional<NormalizedNode<?, ?>> optional = future.get();
1674 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1676 transaction.close();
1681 static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1682 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1683 writeToStore(shard.underlyingActor().getDataStore(), id, node);
1686 public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1687 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1688 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1690 transaction.write(id, node);
1692 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1693 commitCohort.preCommit().get();
1694 commitCohort.commit().get();
1697 @SuppressWarnings("serial")
1698 private static final class DelegatingShardCreator implements Creator<Shard> {
1699 private final Creator<Shard> delegate;
1701 DelegatingShardCreator(final Creator<Shard> delegate) {
1702 this.delegate = delegate;
1706 public Shard create() throws Exception {
1707 return delegate.create();