1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.event.Logging;
6 import akka.testkit.JavaTestKit;
7 import junit.framework.Assert;
9 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
11 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
12 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
13 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
14 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
15 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
16 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
17 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
18 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
22 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
26 import java.util.Collections;
27 import java.util.HashMap;
30 import static junit.framework.Assert.assertFalse;
31 import static org.junit.Assert.assertEquals;
32 import static org.junit.Assert.assertTrue;
34 public class ShardTest extends AbstractActorTest {
36 public void testOnReceiveCreateTransactionChain() throws Exception {
37 new JavaTestKit(getSystem()) {{
38 final Props props = Shard.props("config", Collections.EMPTY_MAP);
39 final ActorRef subject =
40 getSystem().actorOf(props, "testCreateTransactionChain");
43 // Wait for a specific log message to show up
44 final boolean result =
45 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
47 protected Boolean run() {
50 }.from(subject.path().toString())
51 .message("Switching from state Candidate to Leader")
52 .occurrences(1).exec();
54 Assert.assertEquals(true, result);
56 new Within(duration("1 seconds")) {
57 protected void run() {
59 subject.tell(new CreateTransactionChain().toSerializable(), getRef());
61 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
62 // do not put code outside this method, will run afterwards
63 protected String match(Object in) {
64 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
65 CreateTransactionChainReply reply =
66 CreateTransactionChainReply.fromSerializable(getSystem(),in);
67 return reply.getTransactionChainPath()
73 }.get(); // this extracts the received message
75 assertEquals("Unexpected transaction path " + out,
76 "akka://test/user/testCreateTransactionChain/$a",
88 public void testOnReceiveRegisterListener() throws Exception {
89 new JavaTestKit(getSystem()) {{
90 final Props props = Shard.props("config", Collections.EMPTY_MAP);
91 final ActorRef subject =
92 getSystem().actorOf(props, "testRegisterChangeListener");
94 new Within(duration("1 seconds")) {
95 protected void run() {
98 new UpdateSchemaContext(SchemaContextHelper.full()),
101 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
102 getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
105 final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
106 // do not put code outside this method, will run afterwards
107 protected Boolean match(Object in) {
108 if(in instanceof EnableNotification){
109 return ((EnableNotification) in).isEnabled();
114 }.get(); // this extracts the received message
116 assertFalse(notificationEnabled);
118 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
119 // do not put code outside this method, will run afterwards
120 protected String match(Object in) {
121 if (in.getClass().equals(RegisterChangeListenerReply.class)) {
122 RegisterChangeListenerReply reply =
123 (RegisterChangeListenerReply) in;
124 return reply.getListenerRegistrationPath()
130 }.get(); // this extracts the received message
132 assertTrue(out.matches(
133 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
142 public void testCreateTransaction(){
143 new JavaTestKit(getSystem()) {{
144 final Props props = Shard.props("config", Collections.EMPTY_MAP);
145 final ActorRef subject =
146 getSystem().actorOf(props, "testCreateTransaction");
149 // Wait for a specific log message to show up
150 final boolean result =
151 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
153 protected Boolean run() {
156 }.from(subject.path().toString())
157 .message("Switching from state Candidate to Leader")
158 .occurrences(1).exec();
160 Assert.assertEquals(true, result);
162 new Within(duration("1 seconds")) {
163 protected void run() {
166 new UpdateSchemaContext(TestModel.createTestContext()),
169 subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
172 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
173 // do not put code outside this method, will run afterwards
174 protected String match(Object in) {
175 if (in instanceof CreateTransactionReply) {
176 CreateTransactionReply reply =
177 (CreateTransactionReply) in;
178 return reply.getTransactionActorPath()
184 }.get(); // this extracts the received message
186 assertTrue("Unexpected transaction path " + out,
187 out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
197 public void testPeerAddressResolved(){
198 new JavaTestKit(getSystem()) {{
199 Map<String, String> peerAddresses = new HashMap<>();
200 peerAddresses.put("member-2", null);
201 final Props props = Shard.props("config", peerAddresses);
202 final ActorRef subject =
203 getSystem().actorOf(props, "testPeerAddressResolved");
205 new Within(duration("1 seconds")) {
206 protected void run() {
209 new PeerAddressResolved("member-2", "akka://foobar"),
220 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
221 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
223 public void onDataChanged(
224 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {