1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.event.Logging;
7 import akka.japi.Creator;
8 import akka.testkit.JavaTestKit;
9 import akka.testkit.TestActorRef;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import org.junit.After;
16 import org.junit.Assert;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
23 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
25 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
26 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
27 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
29 import org.opendaylight.controller.cluster.datastore.modification.Modification;
30 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
32 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
33 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
34 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
35 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
43 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
44 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
47 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
48 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
52 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
53 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
59 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import scala.concurrent.duration.Duration;
65 import java.io.IOException;
66 import java.util.Collections;
67 import java.util.HashSet;
69 import java.util.concurrent.CountDownLatch;
70 import java.util.concurrent.ExecutionException;
71 import java.util.concurrent.TimeUnit;
72 import static org.junit.Assert.assertEquals;
73 import static org.junit.Assert.assertNotNull;
74 import static org.junit.Assert.assertTrue;
75 import static org.junit.Assert.fail;
76 import static org.mockito.Mockito.mock;
77 import static org.mockito.Mockito.doReturn;
78 import static org.mockito.Mockito.verify;
80 public class ShardTest extends AbstractActorTest {
82 private static final DatastoreContext DATA_STORE_CONTEXT =
83 new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
85 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
87 private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
88 .shardName("inventory").type("config").build();
92 System.setProperty("shard.persistent", "false");
94 InMemorySnapshotStore.clear();
95 InMemoryJournal.clear();
99 public void tearDown() {
100 InMemorySnapshotStore.clear();
101 InMemoryJournal.clear();
104 private Props newShardProps() {
105 return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
106 DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
110 public void testOnReceiveRegisterListener() throws Exception {
111 new JavaTestKit(getSystem()) {{
112 ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
114 subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
116 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
117 getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
119 EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
120 assertEquals("isEnabled", false, enable.isEnabled());
122 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
123 RegisterChangeListenerReply.class);
124 assertTrue(reply.getListenerRegistrationPath().toString().matches(
125 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
130 public void testCreateTransaction(){
131 new ShardTestKit(getSystem()) {{
132 ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
134 waitUntilLeader(subject);
136 subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
138 subject.tell(new CreateTransaction("txn-1",
139 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
141 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
142 CreateTransactionReply.class);
144 String path = reply.getTransactionActorPath().toString();
145 assertTrue("Unexpected transaction path " + path,
146 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
152 public void testCreateTransactionOnChain(){
153 new ShardTestKit(getSystem()) {{
154 final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
156 waitUntilLeader(subject);
158 subject.tell(new CreateTransaction("txn-1",
159 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
162 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
163 CreateTransactionReply.class);
165 String path = reply.getTransactionActorPath().toString();
166 assertTrue("Unexpected transaction path " + path,
167 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
173 public void testPeerAddressResolved(){
174 new JavaTestKit(getSystem()) {{
175 final ShardIdentifier identifier =
176 ShardIdentifier.builder().memberName("member-1")
177 .shardName("inventory").type("config").build();
179 Props props = Shard.props(identifier,
180 Collections.<ShardIdentifier, String>singletonMap(identifier, null),
181 DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
182 final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
184 new Within(duration("3 seconds")) {
186 protected void run() {
189 new PeerAddressResolved(identifier, "akka://foobar"),
199 public void testApplySnapshot() throws ExecutionException, InterruptedException {
200 TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
202 NormalizedNodeToNodeCodec codec =
203 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
205 ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
206 TestModel.TEST_QNAME));
208 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
209 NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
211 NormalizedNodeMessages.Container encode = codec.encode(expected);
213 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
214 encode.getNormalizedNode().toByteString().toByteArray(),
215 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
217 ref.underlyingActor().onReceiveCommand(applySnapshot);
219 NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
221 assertEquals(expected, actual);
225 public void testApplyState() throws Exception {
227 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
229 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
231 MutableCompositeModification compMod = new MutableCompositeModification();
232 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
233 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
234 ApplyState applyState = new ApplyState(null, "test",
235 new ReplicatedLogImplEntry(1, 2, payload));
237 shard.underlyingActor().onReceiveCommand(applyState);
239 NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
240 assertEquals("Applied state", node, actual);
243 @SuppressWarnings("serial")
245 public void testRecovery() throws Exception {
247 // Set up the InMemorySnapshotStore.
249 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
250 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
252 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
253 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
254 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
255 commitCohort.preCommit().get();
256 commitCohort.commit().get();
258 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
259 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
261 InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
262 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
264 getNormalizedNode().toByteString().toByteArray(),
265 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
267 // Set up the InMemoryJournal.
269 InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
270 new WriteModification(TestModel.OUTER_LIST_PATH,
271 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
274 int nListEntries = 11;
275 Set<Integer> listEntryKeys = new HashSet<>();
276 for(int i = 1; i <= nListEntries; i++) {
277 listEntryKeys.add(Integer.valueOf(i));
278 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
279 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
280 Modification mod = new MergeModification(path,
281 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
283 InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
287 InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
288 new ApplyLogEntries(nListEntries));
290 // Create the actor and wait for recovery complete.
292 final CountDownLatch recoveryComplete = new CountDownLatch(1);
294 Creator<Shard> creator = new Creator<Shard>() {
296 public Shard create() throws Exception {
297 return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
298 DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
300 protected void onRecoveryComplete() {
302 super.onRecoveryComplete();
304 recoveryComplete.countDown();
311 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
312 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
314 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
316 // Verify data in the data store.
318 NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
319 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
320 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
321 outerList.getValue() instanceof Iterable);
322 for(Object entry: (Iterable<?>) outerList.getValue()) {
323 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
324 entry instanceof MapEntryNode);
325 MapEntryNode mapEntry = (MapEntryNode)entry;
326 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
327 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
328 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
329 Object value = idLeaf.get().getValue();
330 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
331 listEntryKeys.remove(value));
334 if(!listEntryKeys.isEmpty()) {
335 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
339 assertEquals("Last log index", nListEntries,
340 shard.underlyingActor().getShardMBean().getLastLogIndex());
341 assertEquals("Commit index", nListEntries,
342 shard.underlyingActor().getShardMBean().getCommitIndex());
343 assertEquals("Last applied", nListEntries,
344 shard.underlyingActor().getShardMBean().getLastApplied());
347 private CompositeModificationPayload newPayload(Modification... mods) {
348 MutableCompositeModification compMod = new MutableCompositeModification();
349 for(Modification mod: mods) {
350 compMod.addModification(mod);
353 return new CompositeModificationPayload(compMod.toSerializable());
356 @SuppressWarnings("unchecked")
358 public void testForwardedCommitTransactionWithPersistence() throws IOException {
359 System.setProperty("shard.persistent", "true");
361 new ShardTestKit(getSystem()) {{
362 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
364 waitUntilLeader(shard);
366 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
368 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
369 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
371 MutableCompositeModification modification = new MutableCompositeModification();
372 modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
375 shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
377 expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
379 verify(cohort).commit();
381 assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
386 public void testCreateSnapshot() throws IOException, InterruptedException {
387 new ShardTestKit(getSystem()) {{
388 final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
390 waitUntilLeader(subject);
392 subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
394 waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
396 subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
398 waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
403 * This test simply verifies that the applySnapShot logic will work
404 * @throws ReadFailedException
407 public void testInMemoryDataStoreRestore() throws ReadFailedException {
408 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
409 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
411 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
413 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
414 putTransaction.write(TestModel.TEST_PATH,
415 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
416 commitTransaction(putTransaction);
419 NormalizedNode expected = readStore(store);
421 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
423 writeTransaction.delete(YangInstanceIdentifier.builder().build());
424 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
426 commitTransaction(writeTransaction);
428 NormalizedNode actual = readStore(store);
430 assertEquals(expected, actual);
434 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
435 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
436 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
437 transaction.read(YangInstanceIdentifier.builder().build());
439 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
441 NormalizedNode<?, ?> normalizedNode = optional.get();
445 return normalizedNode;
448 private void commitTransaction(DOMStoreWriteTransaction transaction) {
449 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
450 ListenableFuture<Void> future =
451 commitCohort.preCommit();
454 future = commitCohort.commit();
456 } catch (InterruptedException | ExecutionException e) {
460 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
461 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
463 public void onDataChanged(
464 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
470 private static final class DelegatingShardCreator implements Creator<Shard> {
471 private final Creator<Shard> delegate;
473 DelegatingShardCreator(Creator<Shard> delegate) {
474 this.delegate = delegate;
478 public Shard create() throws Exception {
479 return delegate.create();
483 private static class ShardTestKit extends JavaTestKit {
485 private ShardTestKit(ActorSystem actorSystem) {
489 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
490 // Wait for a specific log message to show up
491 final boolean result =
492 new JavaTestKit.EventFilter<Boolean>(logLevel
495 protected Boolean run() {
498 }.from(subject.path().toString())
500 .occurrences(1).exec();
502 Assert.assertEquals(true, result);
506 protected void waitUntilLeader(ActorRef subject) {
507 waitForLogMessage(Logging.Info.class, subject,
508 "Switching from state Candidate to Leader");