Merge "Fixed deserialization of IdentityRefs in Restconf URI."
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11
12 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
13 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
16 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
21 import org.opendaylight.controller.sal.core.api.RpcImplementation;
22 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
29
30 import java.io.IOException;
31 import java.util.HashSet;
32 import java.util.Set;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37
38 /**
39  * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
40  * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
41  * from config file using existing(?) ODL properties framework
42  */
43 public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
44
45     private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
46
47     private ExecutorService serverPool;
48
49     // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
50     private RoutingTableProvider routingTable;
51     private Set<QName> remoteServices;
52     private ProviderSession brokerSession;
53     private ZMQ.Context context;
54     private ZMQ.Socket replySocket;
55
56     private final RpcListener listener = new RpcListener();
57
58     private final String localUri = Context.getInstance().getLocalUri();
59
60     private final int rpcPort;
61
62     private RpcImplementation client;
63
64     public RpcImplementation getClient() {
65         return client;
66     }
67
68     public void setClient(RpcImplementation client) {
69         this.client = client;
70     }
71
72     // Prevent instantiation
73     public ServerImpl(int rpcPort) {
74         this.rpcPort = rpcPort;
75     }
76
77     public void setBrokerSession(ProviderSession session) {
78         this.brokerSession = session;
79     }
80
81     public ExecutorService getServerPool() {
82         return serverPool;
83     }
84
85     public void setServerPool(ExecutorService serverPool) {
86         this.serverPool = serverPool;
87     }
88
89     public void start() {
90         context = ZMQ.context(1);
91         serverPool = Executors.newSingleThreadExecutor();
92         remoteServices = new HashSet<QName>();
93
94         // Start listening rpc requests
95         serverPool.execute(receive());
96
97         brokerSession.addRpcRegistrationListener(listener);
98         // routingTable.registerRouteChangeListener(routeChangeListener);
99
100         Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
101         for (QName rpc : currentlySupported) {
102             listener.onRpcImplementationAdded(rpc);
103         }
104
105         _logger.debug("RPC Server started [{}]", localUri);
106     }
107
108     public void stop() {
109         // TODO: un-subscribe
110
111         // if (context != null)
112         // context.term();
113         //
114         // _logger.debug("ZMQ Context is terminated.");
115
116         if (serverPool != null)
117             serverPool.shutdown();
118
119         _logger.debug("Thread pool is closed.");
120     }
121
122     private Runnable receive() {
123         return new Runnable() {
124             public void run() {
125
126                 // Bind to RPC reply socket
127                 replySocket = context.socket(ZMQ.REP);
128                 replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
129
130                 // Poller enables listening on multiple sockets using a single
131                 // thread
132                 ZMQ.Poller poller = new ZMQ.Poller(1);
133                 poller.register(replySocket, ZMQ.Poller.POLLIN);
134                 try {
135                     // TODO: Add code to restart the thread after exception
136                     while (!Thread.currentThread().isInterrupted()) {
137
138                         poller.poll();
139
140                         if (poller.pollin(0)) {
141                             handleRpcCall();
142                         }
143                     }
144                 } catch (Exception e) {
145                     // log and continue
146                     _logger.error("Unhandled exception [{}]", e);
147                 } finally {
148                     poller.unregister(replySocket);
149                     replySocket.close();
150                 }
151
152             }
153         };
154     }
155
156     /**
157      * @throws InterruptedException
158      * @throws ExecutionException
159      */
160     private void handleRpcCall() {
161
162         Message request = parseMessage(replySocket);
163
164         _logger.debug("Received rpc request [{}]", request);
165
166         // Call broker to process the message then reply
167         Future<RpcResult<CompositeNode>> rpc = null;
168         RpcResult<CompositeNode> result = null;
169         try {
170             rpc = brokerSession.rpc((QName) request.getRoute().getType(),
171                     XmlUtils.xmlToCompositeNode((String) request.getPayload()));
172
173             result = (rpc != null) ? rpc.get() : null;
174
175         } catch (Exception e) {
176             _logger.debug("Broker threw  [{}]", e);
177         }
178
179         CompositeNode payload = (result != null) ? result.getResult() : null;
180
181         Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
182                 .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
183
184         _logger.debug("Sending rpc response [{}]", response);
185
186         try {
187             replySocket.send(Message.serialize(response));
188         } catch (Exception e) {
189             _logger.debug("rpc response send failed for message [{}]", response);
190             _logger.debug("{}", e);
191         }
192
193     }
194
195     /**
196      * @param socket
197      * @return
198      */
199     private Message parseMessage(ZMQ.Socket socket) {
200
201         Message msg = null;
202         try {
203             byte[] bytes = socket.recv();
204             _logger.debug("Received bytes:[{}]", bytes.length);
205             msg = (Message) Message.deserialize(bytes);
206         } catch (Throwable t) {
207             t.printStackTrace();
208         }
209         return msg;
210     }
211
212     @Override
213     public void onRouteUpdated(String key, Set values) {
214         RouteIdentifierImpl rId = new RouteIdentifierImpl();
215         try {
216             _logger.debug("Updating key/value {}-{}", key, values);
217             brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
218
219         } catch (Exception e) {
220             _logger.info("Route update failed {}", e);
221         }
222     }
223
224     @Override
225     public void onRouteDeleted(String key) {
226         // TODO: Broker session needs to be updated to support this
227         throw new UnsupportedOperationException();
228     }
229     
230     /**
231      * Listener for rpc registrations
232      */
233     private class RpcListener implements RpcRegistrationListener {
234
235         
236
237         @Override
238         public void onRpcImplementationAdded(QName name) {
239
240             // if the service name exists in the set, this notice
241             // has bounced back from the broker. It should be ignored
242             if (remoteServices.contains(name))
243                 return;
244
245             _logger.debug("Adding registration for [{}]", name);
246             RouteIdentifierImpl routeId = new RouteIdentifierImpl();
247             routeId.setType(name);
248
249             try {
250                 routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
251                 _logger.debug("Route added [{}-{}]", name, localUri);
252             } catch (RoutingTableException | SystemException e) {
253                 // TODO: This can be thrown when route already exists in the
254                 // table. Broker
255                 // needs to handle this.
256                 _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
257
258             }
259         }
260
261         @Override
262         public void onRpcImplementationRemoved(QName name) {
263
264             _logger.debug("Removing registration for [{}]", name);
265             RouteIdentifierImpl routeId = new RouteIdentifierImpl();
266             routeId.setType(name);
267
268             try {
269                 routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
270             } catch (RoutingTableException | SystemException e) {
271                 _logger.error("Route delete failed {}", e);
272             }
273         }
274     }
275
276     @Override
277     public void close() throws Exception {
278         stop();
279     }
280
281     public void setRoutingTableProvider(RoutingTableProvider provider) {
282         this.routingTable = provider;
283     }
284
285 }