2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
11 import org.opendaylight.controller.sal.common.util.RpcErrors;
12 import org.opendaylight.controller.sal.common.util.Rpcs;
13 import org.opendaylight.controller.sal.connector.api.RpcRouter;
14 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.RpcImplementation;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.common.RpcError;
23 import org.opendaylight.yangtools.yang.common.RpcResult;
24 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
25 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.zeromq.ZMQ;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.FutureTask;
35 import java.util.concurrent.TimeUnit;
37 import static com.google.common.base.Preconditions.checkNotNull;
38 import static com.google.common.base.Preconditions.checkState;
41 * An implementation of {@link RpcImplementation} that makes
44 public class ClientImpl implements RemoteRpcClient {
46 private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
48 private ZMQ.Context context = ZMQ.context(1);
49 private ClientRequestHandler handler;
50 private RoutingTableProvider routingTableProvider;
53 handler = new ClientRequestHandler(context);
57 public ClientImpl(ClientRequestHandler handler){
58 this.handler = handler;
62 public RoutingTableProvider getRoutingTableProvider() {
63 return routingTableProvider;
66 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
67 this.routingTableProvider = routingTableProvider;
71 public void start() {/*NOOPS*/}
77 _logger.info("Stopped");
86 * Finds remote server that can execute this rpc and sends a message to it
87 * requesting execution.
88 * The call blocks until a response from remote server is received. Its upto
89 * the client of this API to implement a timeout functionality.
91 * @param rpc remote service to be executed
92 * @param input payload for the remote service
95 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
96 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
99 String address = lookupRemoteAddressForGlobalRpc(routeId);
100 return sendMessage(input, routeId, address);
104 * Finds remote server that can execute this routed rpc and sends a message to it
105 * requesting execution.
106 * The call blocks until a response from remote server is received. Its upto
107 * the client of this API to implement a timeout functionality.
112 * instance identifier on which rpc is to be executed
117 public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
119 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
120 routeId.setType(rpc);
121 routeId.setRoute(identifier);
123 String address = lookupRemoteAddressForRpc(routeId);
125 return sendMessage(input, routeId, address);
128 private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
129 Message request = new Message.MessageBuilder()
130 .type(Message.MessageType.REQUEST)
131 .sender(Context.getInstance().getLocalUri())
134 .payload(XmlUtils.compositeNodeToXml(input))
137 List<RpcError> errors = new ArrayList<RpcError>();
140 Message response = handler.handle(request);
141 CompositeNode payload = null;
143 if ( response != null )
144 payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
146 return Rpcs.getRpcResult(true, payload, errors);
148 } catch (Exception e){
149 collectErrors(e, errors);
150 return Rpcs.getRpcResult(false, null, errors);
155 * Find address for the given route identifier in routing table
156 * @param routeId route identifier
157 * @return remote network address
159 private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
160 checkNotNull(routeId, "route must not be null");
162 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
163 checkNotNull(routingTable.isPresent(), "Routing table is null");
165 String address = null;
167 address = routingTable.get().getGlobalRoute(routeId);
168 } catch (RoutingTableException|SystemException e) {
169 _logger.error("Exception caught while looking up remote address " + e);
171 checkState(address != null, "Address not found for route [%s]", routeId);
177 * Find address for the given route identifier in routing table
178 * @param routeId route identifier
179 * @return remote network address
181 private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
182 checkNotNull(routeId, "route must not be null");
184 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
185 checkNotNull(routingTable.isPresent(), "Routing table is null");
187 String address = routingTable.get().getLastAddedRoute(routeId);
188 checkState(address != null, "Address not found for route [%s]", routeId);
193 private void collectErrors(Exception e, List<RpcError> errors){
194 if (e == null) return;
195 if (errors == null) errors = new ArrayList<RpcError>();
197 errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
198 for (Throwable t : e.getSuppressed()) {
199 errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
204 * Closes ZMQ Context. It tries to gracefully terminate the context. If
205 * termination takes more than a second, its forcefully shutdown.
207 private void closeZmqContext() {
208 ExecutorService exec = Executors.newSingleThreadExecutor();
209 FutureTask zmqTermination = new FutureTask(new Runnable() {
216 _logger.debug("ZMQ Context terminated");
217 } catch (Exception e) {
218 _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
223 exec.execute(zmqTermination);
226 zmqTermination.get(1L, TimeUnit.SECONDS);
227 } catch (Exception e) {/*ignore and continue with shutdown*/}