2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import org.opendaylight.controller.sal.common.util.RpcErrors;
12 import org.opendaylight.controller.sal.common.util.Rpcs;
13 import org.opendaylight.controller.sal.connector.api.RpcRouter;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.RpcImplementation;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.common.RpcError;
23 import org.opendaylight.yangtools.yang.common.RpcResult;
24 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
25 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.List;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.FutureTask;
36 import java.util.concurrent.TimeUnit;
38 import static com.google.common.base.Preconditions.checkNotNull;
39 import static com.google.common.base.Preconditions.checkState;
42 * An implementation of {@link RpcImplementation} that makes
45 public class ClientImpl implements RemoteRpcClient {
47 private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
49 private ZMQ.Context context = ZMQ.context(1);
50 private ClientRequestHandler handler;
51 private RoutingTableProvider routingTableProvider;
54 handler = new ClientRequestHandler(context);
58 public ClientImpl(ClientRequestHandler handler){
59 this.handler = handler;
63 public RoutingTableProvider getRoutingTableProvider() {
64 return routingTableProvider;
67 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
68 this.routingTableProvider = routingTableProvider;
72 public void start() {/*NOOPS*/}
78 _logger.info("Stopped");
87 * Finds remote server that can execute this rpc and sends a message to it
88 * requesting execution.
89 * The call blocks until a response from remote server is received. Its upto
90 * the client of this API to implement a timeout functionality.
92 * @param rpc remote service to be executed
93 * @param input payload for the remote service
96 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
97 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
100 String address = lookupRemoteAddressForGlobalRpc(routeId);
101 return sendMessage(input, routeId, address);
105 * Finds remote server that can execute this routed rpc and sends a message to it
106 * requesting execution.
107 * The call blocks until a response from remote server is received. Its upto
108 * the client of this API to implement a timeout functionality.
113 * instance identifier on which rpc is to be executed
118 public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
120 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
121 routeId.setType(rpc);
122 routeId.setRoute(identifier);
124 String address = lookupRemoteAddressForRpc(routeId);
126 return sendMessage(input, routeId, address);
129 private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
130 Message request = new Message.MessageBuilder()
131 .type(Message.MessageType.REQUEST)
132 .sender(Context.getInstance().getLocalUri())
135 .payload(XmlUtils.compositeNodeToXml(input))
138 List<RpcError> errors = new ArrayList<RpcError>();
141 Message response = handler.handle(request);
142 CompositeNode payload = null;
144 if ( response != null ) {
146 _logger.info("Received response [{}]", response);
148 Object rawPayload = response.getPayload();
149 switch (response.getType()) {
151 if ( rawPayload instanceof List )
152 errors = (List) rawPayload;
156 payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
161 RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
167 return Rpcs.getRpcResult(true, payload, errors);
169 } catch (Exception e){
170 collectErrors(e, errors);
171 return Rpcs.getRpcResult(false, null, errors);
176 * Find address for the given route identifier in routing table
177 * @param routeId route identifier
178 * @return remote network address
180 private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
181 checkNotNull(routeId, "route must not be null");
183 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
184 checkNotNull(routingTable.isPresent(), "Routing table is null");
186 String address = null;
188 address = routingTable.get().getGlobalRoute(routeId);
189 } catch (RoutingTableException|SystemException e) {
190 _logger.error("Exception caught while looking up remote address " + e);
192 checkState(address != null, "Address not found for route [%s]", routeId);
198 * Find address for the given route identifier in routing table
199 * @param routeId route identifier
200 * @return remote network address
202 private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
203 checkNotNull(routeId, "route must not be null");
205 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
206 checkNotNull(routingTable.isPresent(), "Routing table is null");
208 String address = routingTable.get().getLastAddedRoute(routeId);
209 checkState(address != null, "Address not found for route [%s]", routeId);
214 private void collectErrors(Exception e, List<RpcError> errors){
215 if (e == null) return;
216 if (errors == null) errors = new ArrayList<RpcError>();
218 errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
219 for (Throwable t : e.getSuppressed()) {
220 errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
225 * Closes ZMQ Context. It tries to gracefully terminate the context. If
226 * termination takes more than a second, its forcefully shutdown.
228 private void closeZmqContext() {
229 ExecutorService exec = Executors.newSingleThreadExecutor();
230 FutureTask zmqTermination = new FutureTask(new Runnable() {
237 _logger.debug("ZMQ Context terminated");
238 } catch (Exception e) {
239 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
244 exec.execute(zmqTermination);
247 zmqTermination.get(1L, TimeUnit.SECONDS);
248 } catch (Exception e) {/*ignore and continue with shutdown*/}