2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
12 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
13 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
16 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcImplementation;
22 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
30 import java.io.IOException;
31 import java.util.HashSet;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
39 * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
40 * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
41 * from config file using existing(?) ODL properties framework
43 public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
45 private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
47 private ExecutorService serverPool;
49 // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
50 private RoutingTableProvider routingTable;
51 private Set<QName> remoteServices;
52 private ProviderSession brokerSession;
53 private ZMQ.Context context;
54 private ZMQ.Socket replySocket;
56 private final RpcListener listener = new RpcListener();
58 private final String localUri = Context.getInstance().getLocalUri();
60 private final int rpcPort;
62 private RpcImplementation client;
64 public RpcImplementation getClient() {
68 public void setClient(RpcImplementation client) {
72 // Prevent instantiation
73 public ServerImpl(int rpcPort) {
74 this.rpcPort = rpcPort;
77 public void setBrokerSession(ProviderSession session) {
78 this.brokerSession = session;
81 public ExecutorService getServerPool() {
85 public void setServerPool(ExecutorService serverPool) {
86 this.serverPool = serverPool;
90 context = ZMQ.context(1);
91 serverPool = Executors.newSingleThreadExecutor();
92 remoteServices = new HashSet<QName>();
94 // Start listening rpc requests
95 serverPool.execute(receive());
97 brokerSession.addRpcRegistrationListener(listener);
98 // routingTable.registerRouteChangeListener(routeChangeListener);
100 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
101 for (QName rpc : currentlySupported) {
102 listener.onRpcImplementationAdded(rpc);
105 _logger.debug("RPC Server started [{}]", localUri);
109 // TODO: un-subscribe
111 // if (context != null)
114 // _logger.debug("ZMQ Context is terminated.");
116 if (serverPool != null)
117 serverPool.shutdown();
119 _logger.debug("Thread pool is closed.");
122 private Runnable receive() {
123 return new Runnable() {
126 // Bind to RPC reply socket
127 replySocket = context.socket(ZMQ.REP);
128 replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
130 // Poller enables listening on multiple sockets using a single
132 ZMQ.Poller poller = new ZMQ.Poller(1);
133 poller.register(replySocket, ZMQ.Poller.POLLIN);
135 // TODO: Add code to restart the thread after exception
136 while (!Thread.currentThread().isInterrupted()) {
140 if (poller.pollin(0)) {
144 } catch (Exception e) {
146 _logger.error("Unhandled exception [{}]", e);
148 poller.unregister(replySocket);
157 * @throws InterruptedException
158 * @throws ExecutionException
160 private void handleRpcCall() {
162 Message request = parseMessage(replySocket);
164 _logger.debug("Received rpc request [{}]", request);
166 // Call broker to process the message then reply
167 Future<RpcResult<CompositeNode>> rpc = null;
168 RpcResult<CompositeNode> result = null;
170 rpc = brokerSession.rpc((QName) request.getRoute().getType(),
171 XmlUtils.xmlToCompositeNode((String) request.getPayload()));
173 result = (rpc != null) ? rpc.get() : null;
175 } catch (Exception e) {
176 _logger.debug("Broker threw [{}]", e);
179 CompositeNode payload = (result != null) ? result.getResult() : null;
181 Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
182 .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
184 _logger.debug("Sending rpc response [{}]", response);
187 replySocket.send(Message.serialize(response));
188 } catch (Exception e) {
189 _logger.debug("rpc response send failed for message [{}]", response);
190 _logger.debug("{}", e);
199 private Message parseMessage(ZMQ.Socket socket) {
203 byte[] bytes = socket.recv();
204 _logger.debug("Received bytes:[{}]", bytes.length);
205 msg = (Message) Message.deserialize(bytes);
206 } catch (Throwable t) {
213 public void onRouteUpdated(String key, Set values) {
214 RouteIdentifierImpl rId = new RouteIdentifierImpl();
216 _logger.debug("Updating key/value {}-{}", key, values);
217 brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
219 } catch (Exception e) {
220 _logger.info("Route update failed {}", e);
225 public void onRouteDeleted(String key) {
226 // TODO: Broker session needs to be updated to support this
227 throw new UnsupportedOperationException();
231 * Listener for rpc registrations
233 private class RpcListener implements RpcRegistrationListener {
238 public void onRpcImplementationAdded(QName name) {
240 // if the service name exists in the set, this notice
241 // has bounced back from the broker. It should be ignored
242 if (remoteServices.contains(name))
245 _logger.debug("Adding registration for [{}]", name);
246 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
247 routeId.setType(name);
250 routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
251 _logger.debug("Route added [{}-{}]", name, localUri);
252 } catch (RoutingTableException | SystemException e) {
253 // TODO: This can be thrown when route already exists in the
255 // needs to handle this.
256 _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
262 public void onRpcImplementationRemoved(QName name) {
264 _logger.debug("Removing registration for [{}]", name);
265 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
266 routeId.setType(name);
269 routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
270 } catch (RoutingTableException | SystemException e) {
271 _logger.error("Route delete failed {}", e);
277 public void close() throws Exception {
281 public void setRoutingTableProvider(RoutingTableProvider provider) {
282 this.routingTable = provider;