2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.connector.remoterpc;
10 import com.google.common.base.Optional;
12 import org.opendaylight.controller.sal.common.util.RpcErrors;
13 import org.opendaylight.controller.sal.common.util.Rpcs;
14 import org.opendaylight.controller.sal.connector.api.RpcRouter;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
16 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.RpcImplementation;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.common.RpcError;
23 import org.opendaylight.yangtools.yang.common.RpcResult;
24 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.zeromq.ZMQ;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
34 import java.util.concurrent.*;
36 import static com.google.common.base.Preconditions.*;
39 * An implementation of {@link RpcImplementation} that makes remote RPC calls
41 public class Client implements RemoteRpcClient {
43 private final Logger _logger = LoggerFactory.getLogger(Client.class);
45 private final LinkedBlockingQueue<MessageWrapper> requestQueue = new LinkedBlockingQueue<MessageWrapper>(100);
47 private final ExecutorService pool = Executors.newSingleThreadExecutor();
48 private final long TIMEOUT = 5000; // in ms
50 private RoutingTableProvider routingTableProvider;
52 public RoutingTableProvider getRoutingTableProvider() {
53 return routingTableProvider;
56 public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
57 this.routingTableProvider = routingTableProvider;
60 public LinkedBlockingQueue<MessageWrapper> getRequestQueue() {
65 public Set<QName> getSupportedRpcs() {
66 // TODO: Find the entries from routing table
67 return Collections.emptySet();
71 pool.execute(new Sender(this));
77 _logger.debug("Client stopping...");
78 Context.getInstance().getZmqContext().term();
79 _logger.debug("ZMQ context terminated");
81 pool.shutdown(); // intiate shutdown
83 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
85 if (!pool.awaitTermination(10, TimeUnit.SECONDS))
86 _logger.error("Client thread pool did not shut down");
88 } catch (InterruptedException e) {
89 // (Re-)Cancel if current thread also interrupted
91 // Preserve interrupt status
92 Thread.currentThread().interrupt();
94 _logger.debug("Client stopped");
98 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
100 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
101 routeId.setType(rpc);
103 String address = lookupRemoteAddress(routeId);
105 Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST)
106 .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId)
107 .payload(XmlUtils.compositeNodeToXml(input)).build();
109 List<RpcError> errors = new ArrayList<RpcError>();
111 try (SocketPair pair = new SocketPair()) {
113 MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender());
114 process(messageWrapper);
115 Message response = parseMessage(pair.getReceiver());
117 CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
119 return Rpcs.getRpcResult(true, payload, errors);
121 } catch (Exception e) {
122 collectErrors(e, errors);
123 return Rpcs.getRpcResult(false, null, errors);
128 public void process(MessageWrapper msg) throws TimeoutException, InterruptedException {
129 _logger.debug("Processing message [{}]", msg);
131 boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS);
133 throw new TimeoutException("Queue is full");
137 * Block on socket for reply
142 private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException {
143 return (Message) Message.deserialize(receiver.recv());
147 * Find address for the given route identifier in routing table
151 * @return remote network address
153 private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) {
154 checkNotNull(routeId, "route must not be null");
156 Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
157 checkNotNull(routingTable.isPresent(), "Routing table is null");
159 Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
160 checkNotNull(addresses, "Address not found for route [%s]", routeId);
161 checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its
166 String address = addresses.iterator().next();
167 checkNotNull(address, "Address not found for route [%s]", routeId);
172 private void collectErrors(Exception e, List<RpcError> errors) {
176 errors = new ArrayList<RpcError>();
178 errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
179 for (Throwable t : e.getSuppressed()) {
180 errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
185 public void close() throws Exception {