Tune replication and stabilize tests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.event.Logging;
6 import akka.testkit.JavaTestKit;
7 import junit.framework.Assert;
8 import org.junit.Test;
9 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
11 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
12 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
13 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
14 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
15 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
16 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
17 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
18 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
22 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
25
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29
30 import static junit.framework.Assert.assertFalse;
31 import static org.junit.Assert.assertEquals;
32 import static org.junit.Assert.assertTrue;
33
34 public class ShardTest extends AbstractActorTest {
35     @Test
36     public void testOnReceiveCreateTransactionChain() throws Exception {
37         new JavaTestKit(getSystem()) {{
38             final Props props = Shard.props("config", Collections.EMPTY_MAP);
39             final ActorRef subject =
40                 getSystem().actorOf(props, "testCreateTransactionChain");
41
42
43             // Wait for a specific log message to show up
44             final boolean result =
45                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
46                 ) {
47                     protected Boolean run() {
48                         return true;
49                     }
50                 }.from(subject.path().toString())
51                     .message("Switching from state Candidate to Leader")
52                     .occurrences(1).exec();
53
54             Assert.assertEquals(true, result);
55
56             new Within(duration("1 seconds")) {
57                 protected void run() {
58
59                     subject.tell(new CreateTransactionChain().toSerializable(), getRef());
60
61                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
62                         // do not put code outside this method, will run afterwards
63                         protected String match(Object in) {
64                             if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
65                                 CreateTransactionChainReply reply =
66                                     CreateTransactionChainReply.fromSerializable(getSystem(),in);
67                                 return reply.getTransactionChainPath()
68                                     .toString();
69                             } else {
70                                 throw noMatch();
71                             }
72                         }
73                     }.get(); // this extracts the received message
74
75                     assertEquals("Unexpected transaction path " + out,
76                         "akka://test/user/testCreateTransactionChain/$a",
77                         out);
78
79                     expectNoMsg();
80                 }
81
82
83             };
84         }};
85     }
86
87     @Test
88     public void testOnReceiveRegisterListener() throws Exception {
89         new JavaTestKit(getSystem()) {{
90             final Props props = Shard.props("config", Collections.EMPTY_MAP);
91             final ActorRef subject =
92                 getSystem().actorOf(props, "testRegisterChangeListener");
93
94             new Within(duration("1 seconds")) {
95                 protected void run() {
96
97                     subject.tell(
98                         new UpdateSchemaContext(SchemaContextHelper.full()),
99                         getRef());
100
101                     subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
102                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
103                         getRef());
104
105                     final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
106                         // do not put code outside this method, will run afterwards
107                         protected Boolean match(Object in) {
108                             if(in instanceof EnableNotification){
109                                 return ((EnableNotification) in).isEnabled();
110                             } else {
111                                 throw noMatch();
112                             }
113                         }
114                     }.get(); // this extracts the received message
115
116                     assertFalse(notificationEnabled);
117
118                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
119                         // do not put code outside this method, will run afterwards
120                         protected String match(Object in) {
121                             if (in.getClass().equals(RegisterChangeListenerReply.class)) {
122                                 RegisterChangeListenerReply reply =
123                                     (RegisterChangeListenerReply) in;
124                                 return reply.getListenerRegistrationPath()
125                                     .toString();
126                             } else {
127                                 throw noMatch();
128                             }
129                         }
130                     }.get(); // this extracts the received message
131
132                     assertTrue(out.matches(
133                         "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
134                 }
135
136
137             };
138         }};
139     }
140
141     @Test
142     public void testCreateTransaction(){
143         new JavaTestKit(getSystem()) {{
144             final Props props = Shard.props("config", Collections.EMPTY_MAP);
145             final ActorRef subject =
146                 getSystem().actorOf(props, "testCreateTransaction");
147
148
149             // Wait for a specific log message to show up
150             final boolean result =
151                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
152                 ) {
153                     protected Boolean run() {
154                         return true;
155                     }
156                 }.from(subject.path().toString())
157                     .message("Switching from state Candidate to Leader")
158                     .occurrences(1).exec();
159
160             Assert.assertEquals(true, result);
161
162             new Within(duration("1 seconds")) {
163                 protected void run() {
164
165                     subject.tell(
166                         new UpdateSchemaContext(TestModel.createTestContext()),
167                         getRef());
168
169                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
170                         getRef());
171
172                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
173                         // do not put code outside this method, will run afterwards
174                         protected String match(Object in) {
175                             if (in instanceof CreateTransactionReply) {
176                                 CreateTransactionReply reply =
177                                     (CreateTransactionReply) in;
178                                 return reply.getTransactionActorPath()
179                                     .toString();
180                             } else {
181                                 throw noMatch();
182                             }
183                         }
184                     }.get(); // this extracts the received message
185
186                     assertTrue("Unexpected transaction path " + out,
187                         out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
188                     expectNoMsg();
189                 }
190
191
192             };
193         }};
194     }
195
196     @Test
197     public void testPeerAddressResolved(){
198         new JavaTestKit(getSystem()) {{
199             Map<String, String> peerAddresses = new HashMap<>();
200             peerAddresses.put("member-2", null);
201             final Props props = Shard.props("config", peerAddresses);
202             final ActorRef subject =
203                 getSystem().actorOf(props, "testPeerAddressResolved");
204
205             new Within(duration("1 seconds")) {
206                 protected void run() {
207
208                     subject.tell(
209                         new PeerAddressResolved("member-2", "akka://foobar"),
210                         getRef());
211
212                     expectNoMsg();
213                 }
214
215
216             };
217         }};
218     }
219
220     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
221         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
222             @Override
223             public void onDataChanged(
224                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
225
226             }
227         };
228     }
229 }