1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
7 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
8 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
9 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
10 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
11 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
12 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
13 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
14 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
15 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
16 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
20 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
21 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
24 import java.util.Collections;
25 import java.util.HashMap;
28 import static junit.framework.Assert.assertFalse;
29 import static org.junit.Assert.assertEquals;
30 import static org.junit.Assert.assertTrue;
32 public class ShardTest extends AbstractActorTest {
34 public void testOnReceiveCreateTransactionChain() throws Exception {
35 new JavaTestKit(getSystem()) {{
36 final Props props = Shard.props("config", Collections.EMPTY_MAP);
37 final ActorRef subject =
38 getSystem().actorOf(props, "testCreateTransactionChain");
41 // Wait for Shard to become a Leader
44 } catch (InterruptedException e) {
48 new Within(duration("1 seconds")) {
49 protected void run() {
51 subject.tell(new CreateTransactionChain().toSerializable(), getRef());
53 final String out = new ExpectMsg<String>("match hint") {
54 // do not put code outside this method, will run afterwards
55 protected String match(Object in) {
56 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
57 CreateTransactionChainReply reply =
58 CreateTransactionChainReply.fromSerializable(getSystem(),in);
59 return reply.getTransactionChainPath()
65 }.get(); // this extracts the received message
67 assertEquals("Unexpected transaction path " + out,
68 "akka://test/user/testCreateTransactionChain/$a",
80 public void testOnReceiveRegisterListener() throws Exception {
81 new JavaTestKit(getSystem()) {{
82 final Props props = Shard.props("config", Collections.EMPTY_MAP);
83 final ActorRef subject =
84 getSystem().actorOf(props, "testRegisterChangeListener");
86 new Within(duration("1 seconds")) {
87 protected void run() {
90 new UpdateSchemaContext(SchemaContextHelper.full()),
93 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
94 getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
97 final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
98 // do not put code outside this method, will run afterwards
99 protected Boolean match(Object in) {
100 if(in instanceof EnableNotification){
101 return ((EnableNotification) in).isEnabled();
106 }.get(); // this extracts the received message
108 assertFalse(notificationEnabled);
110 final String out = new ExpectMsg<String>("match hint") {
111 // do not put code outside this method, will run afterwards
112 protected String match(Object in) {
113 if (in.getClass().equals(RegisterChangeListenerReply.SERIALIZABLE_CLASS)) {
114 RegisterChangeListenerReply reply =
115 RegisterChangeListenerReply.fromSerializable(getSystem(),in);
116 return reply.getListenerRegistrationPath()
122 }.get(); // this extracts the received message
124 assertTrue(out.matches(
125 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
134 public void testCreateTransaction(){
135 new JavaTestKit(getSystem()) {{
136 final Props props = Shard.props("config", Collections.EMPTY_MAP);
137 final ActorRef subject =
138 getSystem().actorOf(props, "testCreateTransaction");
141 // Wait for Shard to become a Leader
144 } catch (InterruptedException e) {
149 new Within(duration("1 seconds")) {
150 protected void run() {
153 new UpdateSchemaContext(TestModel.createTestContext()),
156 subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
159 final String out = new ExpectMsg<String>("match hint") {
160 // do not put code outside this method, will run afterwards
161 protected String match(Object in) {
162 if (in instanceof CreateTransactionReply) {
163 CreateTransactionReply reply =
164 (CreateTransactionReply) in;
165 return reply.getTransactionActorPath()
171 }.get(); // this extracts the received message
173 assertTrue("Unexpected transaction path " + out,
174 out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
184 public void testPeerAddressResolved(){
185 new JavaTestKit(getSystem()) {{
186 Map<String, String> peerAddresses = new HashMap<>();
187 peerAddresses.put("member-2", null);
188 final Props props = Shard.props("config", peerAddresses);
189 final ActorRef subject =
190 getSystem().actorOf(props, "testPeerAddressResolved");
192 new Within(duration("1 seconds")) {
193 protected void run() {
196 new PeerAddressResolved("member-2", "akka://foobar"),
207 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
208 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
210 public void onDataChanged(
211 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {