2 * Copyright (C) 2014 Cisco Systems, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Thomas Bachman
11 package org.opendaylight.groupbasedpolicy.renderer.opflex.lib;
13 import static io.netty.buffer.Unpooled.copiedBuffer;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.verify;
16 import static org.mockito.Mockito.when;
17 import io.netty.channel.embedded.EmbeddedChannel;
18 import io.netty.util.CharsetUtil;
20 import java.io.IOException;
21 import java.net.ServerSocket;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.mockito.Mock;
30 import org.mockito.Mockito;
31 import org.mockito.MockitoAnnotations;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
37 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
38 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcDecoder;
39 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcEndpoint;
40 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.JsonRpcServiceBinderHandler;
41 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcMessageMap;
42 import org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc.RpcServer;
43 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexAgent;
44 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexConnectionService;
45 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.OpflexRpcServer;
46 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.Role;
47 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.IdentityResponse;
48 import org.opendaylight.groupbasedpolicy.renderer.opflex.lib.messages.OpflexMessageTest;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitions;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.DiscoveryDefinitionsBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistry;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.EndpointRegistryBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.Observer;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.ObserverBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepository;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.opflex.rev140528.discovery.definitions.PolicyRepositoryBuilder;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import com.fasterxml.jackson.databind.DeserializationFeature;
61 import com.fasterxml.jackson.databind.ObjectMapper;
62 import com.google.common.base.Optional;
63 import com.google.common.util.concurrent.CheckedFuture;
67 * Test the serialization and deserialization of RPC Messages,
68 * and check against expected structure and values.
70 public class OpflexConnectionServiceTest {
71 protected static final Logger logger = LoggerFactory.getLogger(OpflexMessageTest.class);
73 static private final String TEST_EP_UUID = "85d53c32-47af-4eaf-82fd-ced653ff74da";
74 static public final String TEST_IP = "127.0.0.1";
76 static private final String ID_UUID = "2da9e3d7-0bbe-4099-b343-12783777452f";
77 static private final String SEND_IDENTITY = "send_identity";
78 static private final String DOMAIN_UUID = "default";
79 static private final String NAME = "vm1";
80 static private final String IDENTITY = "192.168.0.1:56732";
81 static private final String opflexIdentityRequest =
82 "{ \"id\": \"" + ID_UUID + "\"," +
83 " \"method\": \"" + SEND_IDENTITY + "\"," +
85 " \"name\": \"" + NAME + "\"," +
86 " \"domain\": \"" + DOMAIN_UUID + "\"," +
87 " \"my_role\": [\"" + Role.POLICY_ELEMENT.toString() + "\"]" +
91 private DataBroker mockDataBroker;
92 private DiscoveryDefinitionsBuilder discoveryBuilder;
93 private EndpointRegistryBuilder eprBuilder;
94 private PolicyRepositoryBuilder prBuilder;
95 private ObserverBuilder oBuilder;
96 private DiscoveryDefinitions dummyDefinitions;
97 private List<EndpointRegistry> registries;
98 private List<PolicyRepository> repositories;
99 private List<Observer> observers;
100 private OpflexConnectionService opflexService;
102 private EmbeddedChannel mockChannel;
104 private JsonRpcEndpoint mockEp;
105 private JsonRpcDecoder decoder;
107 private ReadOnlyTransaction mockRead;
109 private WriteTransaction mockWrite;
111 private CheckedFuture<Optional<DiscoveryDefinitions>, ReadFailedException> mockOption;
113 CheckedFuture<Void, TransactionCommitFailedException> mockStatus;
115 private Optional<DiscoveryDefinitions> mockDao;
117 private OpflexRpcServer mockOpflexServer;
119 private OpflexAgent mockAgent;
121 private ScheduledExecutorService executor;
124 private OpflexRpcServer mockServer;
126 private RpcServer mockRpcServer;
128 private ServerSocket create(int[] ports) throws IOException {
129 for (int port : ports) {
131 return new ServerSocket(port);
132 } catch (IOException ex) {
133 continue; // try next port
137 // if the program gets here, no port in the range was found
138 throw new IOException("no free port found");
141 private int getAvailableServerPort() {
144 ServerSocket s = create(new int[]
145 { 6670, 6671, 6672, 6673, 6674, 6675, 6676, 6677, 6678 });
146 freePort = s.getLocalPort();
149 } catch (IOException ex) {
155 public void setUp() throws Exception {
156 MockitoAnnotations.initMocks(this);
158 int numCPU = Runtime.getRuntime().availableProcessors();
159 executor = Executors.newScheduledThreadPool(numCPU * 2);
164 when(mockDataBroker.newReadOnlyTransaction()).thenReturn(mockRead);
165 when(mockDataBroker.newWriteOnlyTransaction()).thenReturn(mockWrite);
166 when(mockWrite.submit()).thenReturn(mockStatus);
167 when(mockRead.read(LogicalDatastoreType.CONFIGURATION,
168 OpflexConnectionService.DISCOVERY_IID)).thenReturn(mockOption);
169 when(mockOption.get()).thenReturn(mockDao);
170 when(mockDao.get()).thenReturn(dummyDefinitions);
173 * Builders for creating our own discovery definitions
175 discoveryBuilder = new DiscoveryDefinitionsBuilder();
176 eprBuilder = new EndpointRegistryBuilder();
177 prBuilder = new PolicyRepositoryBuilder();
178 oBuilder = new ObserverBuilder();
180 int testPort = getAvailableServerPort();
181 if ( testPort == 0) {
184 System.setProperty(OpflexConnectionService.OPFLEX_LISTENPORT, Integer.toString(testPort));
185 System.setProperty(OpflexConnectionService.OPFLEX_LISTENIP, TEST_IP);
189 public void testNoDefinitions() throws Exception {
191 opflexService = new OpflexConnectionService(mockDataBroker, executor);
192 verify(mockDataBroker).newReadOnlyTransaction();
196 public void testInitialSet() throws Exception {
197 registries = new ArrayList<EndpointRegistry>();
198 repositories = new ArrayList<PolicyRepository>();
199 observers = new ArrayList<Observer>();
200 int serverPort = getAvailableServerPort();
201 EndpointRegistry epr = eprBuilder.setId(TEST_IP)
202 .setPort(serverPort).build();
203 PolicyRepository pr = prBuilder.setId(TEST_IP)
204 .setPort(serverPort).build();
205 Observer o = oBuilder.setId(TEST_IP)
206 .setPort(serverPort).build();
208 repositories.add(pr);
210 dummyDefinitions = discoveryBuilder.setObserver(observers)
211 .setEndpointRegistry(registries)
212 .setPolicyRepository(repositories).build();
213 opflexService = new OpflexConnectionService(mockDataBroker, executor);
215 verify(mockDataBroker).newReadOnlyTransaction();
220 public void testAddConnection() throws Exception {
221 when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
222 when(mockEp.getContext()).thenReturn(mockOpflexServer);
223 when(mockOpflexServer.getDomain()).thenReturn(DOMAIN_UUID);
225 opflexService = new OpflexConnectionService(mockDataBroker, executor);
226 opflexService.addConnection(mockEp);
227 verify(mockEp, Mockito.times(2)).getIdentifier();
231 public void testChannelClosed() throws Exception {
232 when(mockEp.getIdentifier()).thenReturn(TEST_EP_UUID);
233 when(mockEp.getContext()).thenReturn(mockOpflexServer);
235 opflexService = new OpflexConnectionService(mockDataBroker, executor);
236 when(mockOpflexServer.getDomain()).
237 thenReturn(OpflexConnectionService.OPFLEX_DOMAIN);
238 opflexService.addConnection(mockEp);
240 verify(mockEp, Mockito.times(2)).getIdentifier();
242 assertTrue(opflexService.getOpflexAgents().size() > 0);
243 when(mockAgent.getIdentity()).thenReturn(TEST_EP_UUID);
244 opflexService.channelClosed(mockEp);
245 assertTrue(opflexService.getOpflexAgents().size() <=0);
249 public void testPublishSubscribeCallback() throws Exception {
251 List<Role> testRoles = new ArrayList<Role>();
252 testRoles.add(Role.POLICY_REPOSITORY);
253 testRoles.add(Role.ENDPOINT_REGISTRY);
254 testRoles.add(Role.OBSERVER);
257 * This is *far* from UT, but worthwhile for now
259 opflexService = new OpflexConnectionService(mockDataBroker, executor);
261 ObjectMapper objectMapper = new ObjectMapper();
262 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
263 decoder = new JsonRpcDecoder(1000);
264 JsonRpcServiceBinderHandler binderHandler =
265 new JsonRpcServiceBinderHandler(null);
266 EmbeddedChannel channel = new EmbeddedChannel(decoder, binderHandler);
268 RpcMessageMap messageMap = new RpcMessageMap();
269 messageMap.addList(Role.DISCOVERY.getMessages());
271 JsonRpcEndpoint ep = new JsonRpcEndpoint(IDENTITY , opflexService,
272 objectMapper, channel, messageMap, opflexService);
273 ep.setContext(mockOpflexServer);
274 binderHandler.setEndpoint(ep);
276 when(mockOpflexServer.getRoles()).thenReturn(testRoles);
277 when(mockOpflexServer.getDomain()).
278 thenReturn(OpflexConnectionService.OPFLEX_DOMAIN);
279 opflexService.addConnection(ep);
280 channel.writeInbound(copiedBuffer(opflexIdentityRequest, CharsetUtil.UTF_8));
281 Object result = channel.readOutbound();
282 result = channel.readOutbound();
283 assertTrue(result != null);
284 IdentityResponse resp = objectMapper.readValue(result.toString(), IdentityResponse.class);
285 assertTrue(resp != null);
286 assertTrue(resp.getResult().getMy_role()
287 .contains(Role.ENDPOINT_REGISTRY.toString()));
288 assertTrue(resp.getResult().getMy_role()
289 .contains(Role.POLICY_REPOSITORY.toString()));
290 assertTrue(resp.getResult().getMy_role()
291 .contains(Role.OBSERVER.toString()));