1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.event.Logging;
6 import akka.testkit.JavaTestKit;
8 import org.junit.Assert;
10 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
11 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
12 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
13 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
14 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
15 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
16 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
17 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
18 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
19 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
20 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
24 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import java.util.Collections;
29 import java.util.HashMap;
32 import static org.junit.Assert.assertFalse;
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertTrue;
36 public class ShardTest extends AbstractActorTest {
38 private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
41 public void testOnReceiveCreateTransactionChain() throws Exception {
42 new JavaTestKit(getSystem()) {{
43 final ShardIdentifier identifier =
44 ShardIdentifier.builder().memberName("member-1")
45 .shardName("inventory").type("config").build();
47 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
48 final ActorRef subject =
49 getSystem().actorOf(props, "testCreateTransactionChain");
52 // Wait for a specific log message to show up
53 final boolean result =
54 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
57 protected Boolean run() {
60 }.from(subject.path().toString())
61 .message("Switching from state Candidate to Leader")
62 .occurrences(1).exec();
64 Assert.assertEquals(true, result);
66 new Within(duration("3 seconds")) {
68 protected void run() {
70 subject.tell(new CreateTransactionChain().toSerializable(), getRef());
72 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
73 // do not put code outside this method, will run afterwards
75 protected String match(Object in) {
76 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
77 CreateTransactionChainReply reply =
78 CreateTransactionChainReply.fromSerializable(getSystem(),in);
79 return reply.getTransactionChainPath()
85 }.get(); // this extracts the received message
87 assertEquals("Unexpected transaction path " + out,
88 "akka://test/user/testCreateTransactionChain/$a",
100 public void testOnReceiveRegisterListener() throws Exception {
101 new JavaTestKit(getSystem()) {{
102 final ShardIdentifier identifier =
103 ShardIdentifier.builder().memberName("member-1")
104 .shardName("inventory").type("config").build();
106 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
107 final ActorRef subject =
108 getSystem().actorOf(props, "testRegisterChangeListener");
110 new Within(duration("3 seconds")) {
112 protected void run() {
115 new UpdateSchemaContext(SchemaContextHelper.full()),
118 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
119 getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
122 final Boolean notificationEnabled = new ExpectMsg<Boolean>(
123 duration("3 seconds"), "enable notification") {
124 // do not put code outside this method, will run afterwards
126 protected Boolean match(Object in) {
127 if(in instanceof EnableNotification){
128 return ((EnableNotification) in).isEnabled();
133 }.get(); // this extracts the received message
135 assertFalse(notificationEnabled);
137 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
138 // do not put code outside this method, will run afterwards
140 protected String match(Object in) {
141 if (in.getClass().equals(RegisterChangeListenerReply.class)) {
142 RegisterChangeListenerReply reply =
143 (RegisterChangeListenerReply) in;
144 return reply.getListenerRegistrationPath()
150 }.get(); // this extracts the received message
152 assertTrue(out.matches(
153 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
162 public void testCreateTransaction(){
163 new JavaTestKit(getSystem()) {{
164 final ShardIdentifier identifier =
165 ShardIdentifier.builder().memberName("member-1")
166 .shardName("inventory").type("config").build();
168 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT);
169 final ActorRef subject =
170 getSystem().actorOf(props, "testCreateTransaction");
172 // Wait for a specific log message to show up
173 final boolean result =
174 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
177 protected Boolean run() {
180 }.from(subject.path().toString())
181 .message("Switching from state Candidate to Leader")
182 .occurrences(1).exec();
184 Assert.assertEquals(true, result);
186 new Within(duration("3 seconds")) {
188 protected void run() {
191 new UpdateSchemaContext(TestModel.createTestContext()),
194 subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
197 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
198 // do not put code outside this method, will run afterwards
200 protected String match(Object in) {
201 if (in instanceof CreateTransactionReply) {
202 CreateTransactionReply reply =
203 (CreateTransactionReply) in;
204 return reply.getTransactionActorPath()
210 }.get(); // this extracts the received message
212 assertTrue("Unexpected transaction path " + out,
213 out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
221 public void testPeerAddressResolved(){
222 new JavaTestKit(getSystem()) {{
223 Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
225 final ShardIdentifier identifier =
226 ShardIdentifier.builder().memberName("member-1")
227 .shardName("inventory").type("config").build();
229 peerAddresses.put(identifier, null);
230 final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT);
231 final ActorRef subject =
232 getSystem().actorOf(props, "testPeerAddressResolved");
234 new Within(duration("3 seconds")) {
236 protected void run() {
239 new PeerAddressResolved(identifier, "akka://foobar"),
248 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
249 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
251 public void onDataChanged(
252 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {