1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.testkit.JavaTestKit;
7 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
8 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
9 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
10 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
11 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
12 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
13 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
14 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
15 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
20 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
21 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import java.util.Collections;
24 import java.util.HashMap;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertTrue;
30 public class ShardTest extends AbstractActorTest {
32 public void testOnReceiveCreateTransactionChain() throws Exception {
33 new JavaTestKit(getSystem()) {{
34 final Props props = Shard.props("config", Collections.EMPTY_MAP);
35 final ActorRef subject =
36 getSystem().actorOf(props, "testCreateTransactionChain");
39 // Wait for Shard to become a Leader
42 } catch (InterruptedException e) {
46 new Within(duration("1 seconds")) {
47 protected void run() {
49 subject.tell(new CreateTransactionChain().toSerializable(), getRef());
51 final String out = new ExpectMsg<String>("match hint") {
52 // do not put code outside this method, will run afterwards
53 protected String match(Object in) {
54 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
55 CreateTransactionChainReply reply =
56 CreateTransactionChainReply.fromSerializable(getSystem(),in);
57 return reply.getTransactionChainPath()
63 }.get(); // this extracts the received message
65 assertEquals("Unexpected transaction path " + out,
66 "akka://test/user/testCreateTransactionChain/$a",
78 public void testOnReceiveRegisterListener() throws Exception {
79 new JavaTestKit(getSystem()) {{
80 final Props props = Shard.props("config", Collections.EMPTY_MAP);
81 final ActorRef subject =
82 getSystem().actorOf(props, "testRegisterChangeListener");
84 new Within(duration("1 seconds")) {
85 protected void run() {
88 new UpdateSchemaContext(SchemaContextHelper.full()),
91 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
92 getRef().path(), AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
95 final String out = new ExpectMsg<String>("match hint") {
96 // do not put code outside this method, will run afterwards
97 protected String match(Object in) {
98 if (in.getClass().equals(RegisterChangeListenerReply.SERIALIZABLE_CLASS)) {
99 RegisterChangeListenerReply reply =
100 RegisterChangeListenerReply.fromSerializable(getSystem(),in);
101 return reply.getListenerRegistrationPath()
107 }.get(); // this extracts the received message
109 assertTrue(out.matches(
110 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
111 // Will wait for the rest of the 3 seconds
121 public void testCreateTransaction(){
122 new JavaTestKit(getSystem()) {{
123 final Props props = Shard.props("config", Collections.EMPTY_MAP);
124 final ActorRef subject =
125 getSystem().actorOf(props, "testCreateTransaction");
128 // Wait for Shard to become a Leader
131 } catch (InterruptedException e) {
136 new Within(duration("1 seconds")) {
137 protected void run() {
140 new UpdateSchemaContext(TestModel.createTestContext()),
143 subject.tell(new CreateTransaction("txn-1").toSerializable(),
146 final String out = new ExpectMsg<String>("match hint") {
147 // do not put code outside this method, will run afterwards
148 protected String match(Object in) {
149 if (in instanceof CreateTransactionReply) {
150 CreateTransactionReply reply =
151 (CreateTransactionReply) in;
152 return reply.getTransactionActorPath()
158 }.get(); // this extracts the received message
160 assertTrue("Unexpected transaction path " + out,
161 out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
171 public void testPeerAddressResolved(){
172 new JavaTestKit(getSystem()) {{
173 Map<String, String> peerAddresses = new HashMap<>();
174 peerAddresses.put("member-2", null);
175 final Props props = Shard.props("config", peerAddresses);
176 final ActorRef subject =
177 getSystem().actorOf(props, "testPeerAddressResolved");
179 new Within(duration("1 seconds")) {
180 protected void run() {
183 new PeerAddressResolved("member-2", "akka://foobar"),
194 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
195 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
197 public void onDataChanged(
198 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {