1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.event.Logging;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import org.junit.Assert;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
18 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
19 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
20 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
21 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
22 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
23 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
24 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
26 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
27 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
33 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
34 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
42 import java.io.IOException;
43 import java.util.Collections;
44 import java.util.HashMap;
46 import java.util.concurrent.ExecutionException;
48 import static org.junit.Assert.assertEquals;
49 import static org.junit.Assert.assertFalse;
50 import static org.junit.Assert.assertTrue;
52 public class ShardTest extends AbstractActorTest {
54 private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
57 public void testOnReceiveCreateTransactionChain() throws Exception {
58 new JavaTestKit(getSystem()) {{
59 final ShardIdentifier identifier =
60 ShardIdentifier.builder().memberName("member-1")
61 .shardName("inventory").type("config").build();
63 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
64 final ActorRef subject =
65 getSystem().actorOf(props, "testCreateTransactionChain");
68 // Wait for a specific log message to show up
69 final boolean result =
70 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
73 protected Boolean run() {
76 }.from(subject.path().toString())
77 .message("Switching from state Candidate to Leader")
78 .occurrences(1).exec();
80 Assert.assertEquals(true, result);
82 new Within(duration("3 seconds")) {
84 protected void run() {
86 subject.tell(new CreateTransactionChain().toSerializable(), getRef());
88 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
89 // do not put code outside this method, will run afterwards
91 protected String match(Object in) {
92 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
93 CreateTransactionChainReply reply =
94 CreateTransactionChainReply.fromSerializable(getSystem(),in);
95 return reply.getTransactionChainPath()
101 }.get(); // this extracts the received message
103 assertEquals("Unexpected transaction path " + out,
104 "akka://test/user/testCreateTransactionChain/$a",
116 public void testOnReceiveRegisterListener() throws Exception {
117 new JavaTestKit(getSystem()) {{
118 final ShardIdentifier identifier =
119 ShardIdentifier.builder().memberName("member-1")
120 .shardName("inventory").type("config").build();
122 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
123 final ActorRef subject =
124 getSystem().actorOf(props, "testRegisterChangeListener");
126 new Within(duration("3 seconds")) {
128 protected void run() {
131 new UpdateSchemaContext(SchemaContextHelper.full()),
134 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
135 getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
138 final Boolean notificationEnabled = new ExpectMsg<Boolean>(
139 duration("3 seconds"), "enable notification") {
140 // do not put code outside this method, will run afterwards
142 protected Boolean match(Object in) {
143 if(in instanceof EnableNotification){
144 return ((EnableNotification) in).isEnabled();
149 }.get(); // this extracts the received message
151 assertFalse(notificationEnabled);
153 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
154 // do not put code outside this method, will run afterwards
156 protected String match(Object in) {
157 if (in.getClass().equals(RegisterChangeListenerReply.class)) {
158 RegisterChangeListenerReply reply =
159 (RegisterChangeListenerReply) in;
160 return reply.getListenerRegistrationPath()
166 }.get(); // this extracts the received message
168 assertTrue(out.matches(
169 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
178 public void testCreateTransaction(){
179 new JavaTestKit(getSystem()) {{
180 final ShardIdentifier identifier =
181 ShardIdentifier.builder().memberName("member-1")
182 .shardName("inventory").type("config").build();
184 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
185 final ActorRef subject =
186 getSystem().actorOf(props, "testCreateTransaction");
188 // Wait for a specific log message to show up
189 final boolean result =
190 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
193 protected Boolean run() {
196 }.from(subject.path().toString())
197 .message("Switching from state Candidate to Leader")
198 .occurrences(1).exec();
200 Assert.assertEquals(true, result);
202 new Within(duration("3 seconds")) {
204 protected void run() {
207 new UpdateSchemaContext(TestModel.createTestContext()),
210 subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
213 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
214 // do not put code outside this method, will run afterwards
216 protected String match(Object in) {
217 if (in instanceof CreateTransactionReply) {
218 CreateTransactionReply reply =
219 (CreateTransactionReply) in;
220 return reply.getTransactionActorPath()
226 }.get(); // this extracts the received message
228 assertTrue("Unexpected transaction path " + out,
229 out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
237 public void testPeerAddressResolved(){
238 new JavaTestKit(getSystem()) {{
239 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
241 final ShardIdentifier identifier =
242 ShardIdentifier.builder().memberName("member-1")
243 .shardName("inventory").type("config").build();
245 peerAddresses.put(identifier, null);
246 final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
247 final ActorRef subject =
248 getSystem().actorOf(props, "testPeerAddressResolved");
250 new Within(duration("3 seconds")) {
252 protected void run() {
255 new PeerAddressResolved(identifier, "akka://foobar"),
265 public void testApplySnapshot() throws ExecutionException, InterruptedException {
266 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
268 final ShardIdentifier identifier =
269 ShardIdentifier.builder().memberName("member-1")
270 .shardName("inventory").type("config").build();
272 peerAddresses.put(identifier, null);
273 final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
275 TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
277 ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
279 NormalizedNodeToNodeCodec codec =
280 new NormalizedNodeToNodeCodec(TestModel.createTestContext());
282 ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
284 NormalizedNode expected = ref.underlyingActor().readStore();
286 NormalizedNodeMessages.Container encode = codec
287 .encode(YangInstanceIdentifier.builder().build(), expected);
290 ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
292 NormalizedNode actual = ref.underlyingActor().readStore();
294 assertEquals(expected, actual);
297 private static class ShardTestKit extends JavaTestKit {
299 private ShardTestKit(ActorSystem actorSystem) {
303 protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
304 // Wait for a specific log message to show up
305 final boolean result =
306 new JavaTestKit.EventFilter<Boolean>(logLevel
309 protected Boolean run() {
312 }.from(subject.path().toString())
314 .occurrences(1).exec();
316 Assert.assertEquals(true, result);
323 public void testCreateSnapshot() throws IOException, InterruptedException {
324 new ShardTestKit(getSystem()) {{
325 final ShardIdentifier identifier =
326 ShardIdentifier.builder().memberName("member-1")
327 .shardName("inventory").type("config").build();
329 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
330 final ActorRef subject =
331 getSystem().actorOf(props, "testCreateSnapshot");
333 // Wait for a specific log message to show up
334 this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
337 new Within(duration("3 seconds")) {
339 protected void run() {
342 new UpdateSchemaContext(TestModel.createTestContext()),
345 subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
348 waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
353 deletePersistenceFiles();
358 * This test simply verifies that the applySnapShot logic will work
359 * @throws ReadFailedException
362 public void testInMemoryDataStoreRestore() throws ReadFailedException {
363 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
364 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
366 store.onGlobalContextUpdated(TestModel.createTestContext());
368 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
369 putTransaction.write(TestModel.TEST_PATH,
370 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
371 commitTransaction(putTransaction);
374 NormalizedNode expected = readStore(store);
376 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
378 writeTransaction.delete(YangInstanceIdentifier.builder().build());
379 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
381 commitTransaction(writeTransaction);
383 NormalizedNode actual = readStore(store);
385 assertEquals(expected, actual);
389 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
390 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
391 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
392 transaction.read(YangInstanceIdentifier.builder().build());
394 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
396 NormalizedNode<?, ?> normalizedNode = optional.get();
400 return normalizedNode;
403 private void commitTransaction(DOMStoreWriteTransaction transaction) {
404 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
405 ListenableFuture<Void> future =
406 commitCohort.preCommit();
409 future = commitCohort.commit();
411 } catch (InterruptedException | ExecutionException e) {
415 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
416 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
418 public void onDataChanged(
419 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {