Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / test / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImplTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10
11 import com.google.common.base.Optional;
12 import junit.framework.Assert;
13 import org.junit.*;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
16 import org.opendaylight.controller.sal.core.api.Broker;
17 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
18 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
19 import org.zeromq.ZMQ;
20 import zmq.Ctx;
21 import zmq.SocketBase;
22
23 import java.lang.reflect.Field;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.ThreadPoolExecutor;
29
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.when;
32
33 public class ServerImplTest {
34
35   private static ZMQ.Context context;
36   private ServerImpl server;
37   private Broker.ProviderSession brokerSession;
38   private RoutingTableProvider routingTableProvider;
39   private RpcRegistrationListener listener;
40
41   ExecutorService pool;
42
43   //Server configuration
44   private final int HANDLER_COUNT = 2;
45   private final int HWM = 200;
46   private final int port = 5554;
47   //server address
48   private final String SERVER_ADDRESS = "tcp://localhost:5554";
49
50   //@BeforeClass
51   public static void init() {
52     context = ZMQ.context(1);
53   }
54
55   //@AfterClass
56   public static void destroy() {
57     MessagingUtil.closeZmqContext(context);
58   }
59
60   @Before
61   public void setup() throws InterruptedException {
62     context = ZMQ.context(1);
63     brokerSession = mock(Broker.ProviderSession.class);
64     routingTableProvider = mock(RoutingTableProvider.class);
65     listener = mock(RpcRegistrationListener.class);
66
67     server = new ServerImpl(port);
68     server.setBrokerSession(brokerSession);
69     server.setRoutingTableProvider(routingTableProvider);
70
71     RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
72     Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
73     when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
74
75     when(brokerSession.addRpcRegistrationListener(listener)).thenReturn(null);
76     when(brokerSession.getSupportedRpcs()).thenReturn(Collections.EMPTY_SET);
77     when(brokerSession.rpc(null, mock(CompositeNode.class))).thenReturn(null);
78     server.start();
79     Thread.sleep(5000);//wait for server to start
80   }
81
82   @After
83   public void tearDown() throws InterruptedException {
84
85     if (pool != null)
86       pool.shutdown();
87
88     if (server != null)
89       server.stop();
90
91     MessagingUtil.closeZmqContext(context);
92
93     Thread.sleep(5000);//wait for server to stop
94     Assert.assertEquals(ServerImpl.State.STOPPED, server.getStatus());
95   }
96
97   @Test
98   public void getRoutingTableProvider_Call_ShouldReturnRoutingTable() throws Exception {
99     Assert.assertNotNull(server.getRoutingTableProvider());
100   }
101
102   @Test
103   public void getBrokerSession_Call_ShouldReturnBrokerSession() throws Exception {
104     Optional<Broker.ProviderSession> mayBeBroker = server.getBrokerSession();
105
106     if (mayBeBroker.isPresent())
107       Assert.assertEquals(brokerSession, mayBeBroker.get());
108     else
109       Assert.fail("Broker does not exist in Remote RPC Server");
110
111   }
112
113   @Test
114   public void start_Call_ShouldSetServerStatusToStarted() throws Exception {
115     Assert.assertEquals(ServerImpl.State.STARTED, server.getStatus());
116
117   }
118
119   @Test
120   public void start_Call_ShouldCreateNZmqSockets() throws Exception {
121     final int EXPECTED_COUNT = 2 + HANDLER_COUNT; //1 ROUTER + 1 DEALER + HANDLER_COUNT
122
123     Optional<ZMQ.Context> mayBeContext = server.getZmqContext();
124     if (mayBeContext.isPresent())
125       Assert.assertEquals(EXPECTED_COUNT, findSocketCount(mayBeContext.get()));
126     else
127       Assert.fail("ZMQ Context does not exist in Remote RPC Server");
128   }
129
130   @Test
131   public void start_Call_ShouldCreate1ServerThread() {
132     final String SERVER_THREAD_NAME = "remote-rpc-server";
133     final int EXPECTED_COUNT = 1;
134     List<Thread> serverThreads = findThreadsWithName(SERVER_THREAD_NAME);
135     Assert.assertEquals(EXPECTED_COUNT, serverThreads.size());
136   }
137
138   @Test
139   public void start_Call_ShouldCreateNHandlerThreads() {
140     //final String WORKER_THREAD_NAME = "remote-rpc-worker";
141     final int EXPECTED_COUNT = HANDLER_COUNT;
142
143     Optional<ServerRequestHandler> serverRequestHandlerOptional = server.getHandler();
144     if (serverRequestHandlerOptional.isPresent()){
145       ServerRequestHandler handler = serverRequestHandlerOptional.get();
146       ThreadPoolExecutor workerPool = handler.getWorkerPool();
147       Assert.assertEquals(EXPECTED_COUNT, workerPool.getPoolSize());
148     } else {
149       Assert.fail("Server is in illegal state. ServerHandler does not exist");
150     }
151
152   }
153
154   @Test
155   public void testStop() throws Exception {
156
157   }
158
159   @Test
160   public void testOnRouteUpdated() throws Exception {
161
162   }
163
164   @Test
165   public void testOnRouteDeleted() throws Exception {
166
167   }
168
169   private int findSocketCount(ZMQ.Context context)
170       throws NoSuchFieldException, IllegalAccessException {
171     Field ctxField = context.getClass().getDeclaredField("ctx");
172     ctxField.setAccessible(true);
173     Ctx ctx = Ctx.class.cast(ctxField.get(context));
174
175     Field socketListField = ctx.getClass().getDeclaredField("sockets");
176     socketListField.setAccessible(true);
177     List<SocketBase> sockets = List.class.cast(socketListField.get(ctx));
178
179     return sockets.size();
180   }
181
182   private List<Thread> findThreadsWithName(String name) {
183     Thread[] threads = new Thread[Thread.activeCount()];
184     Thread.enumerate(threads);
185
186     List<Thread> foundThreads = new ArrayList();
187     for (Thread t : threads) {
188       if (t.getName().startsWith(name))
189         foundThreads.add(t);
190     }
191
192     return foundThreads;
193   }
194 }