Merge "Fixed deserialization of IdentityRefs in Restconf URI."
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / Client.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import com.google.common.base.Optional;
11
12 import org.opendaylight.controller.sal.common.util.RpcErrors;
13 import org.opendaylight.controller.sal.common.util.Rpcs;
14 import org.opendaylight.controller.sal.connector.api.RpcRouter;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
16 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
17 import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
20 import org.opendaylight.controller.sal.core.api.RpcImplementation;
21 import org.opendaylight.yangtools.yang.common.QName;
22 import org.opendaylight.yangtools.yang.common.RpcError;
23 import org.opendaylight.yangtools.yang.common.RpcResult;
24 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.zeromq.ZMQ;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.*;
35
36 import static com.google.common.base.Preconditions.*;
37
38 /**
39  * An implementation of {@link RpcImplementation} that makes remote RPC calls
40  */
41 public class Client implements RemoteRpcClient {
42
43     private final Logger _logger = LoggerFactory.getLogger(Client.class);
44
45     private final LinkedBlockingQueue<MessageWrapper> requestQueue = new LinkedBlockingQueue<MessageWrapper>(100);
46
47     private final ExecutorService pool = Executors.newSingleThreadExecutor();
48     private final long TIMEOUT = 5000; // in ms
49
50     private  RoutingTableProvider routingTableProvider;
51
52     public RoutingTableProvider getRoutingTableProvider() {
53         return routingTableProvider;
54     }
55
56     public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
57         this.routingTableProvider = routingTableProvider;
58     }
59
60     public LinkedBlockingQueue<MessageWrapper> getRequestQueue() {
61         return requestQueue;
62     }
63
64     @Override
65     public Set<QName> getSupportedRpcs() {
66         // TODO: Find the entries from routing table
67         return Collections.emptySet();
68     }
69
70     public void start() {
71         pool.execute(new Sender(this));
72
73     }
74
75     public void stop() {
76
77         _logger.debug("Client stopping...");
78         Context.getInstance().getZmqContext().term();
79         _logger.debug("ZMQ context terminated");
80
81         pool.shutdown(); // intiate shutdown
82         try {
83             if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
84                 pool.shutdownNow();
85                 if (!pool.awaitTermination(10, TimeUnit.SECONDS))
86                     _logger.error("Client thread pool did not shut down");
87             }
88         } catch (InterruptedException e) {
89             // (Re-)Cancel if current thread also interrupted
90             pool.shutdownNow();
91             // Preserve interrupt status
92             Thread.currentThread().interrupt();
93         }
94         _logger.debug("Client stopped");
95     }
96
97     @Override
98     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
99
100         RouteIdentifierImpl routeId = new RouteIdentifierImpl();
101         routeId.setType(rpc);
102
103         String address = lookupRemoteAddress(routeId);
104
105         Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST)
106                 .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId)
107                 .payload(XmlUtils.compositeNodeToXml(input)).build();
108
109         List<RpcError> errors = new ArrayList<RpcError>();
110
111         try (SocketPair pair = new SocketPair()) {
112
113             MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender());
114             process(messageWrapper);
115             Message response = parseMessage(pair.getReceiver());
116
117             CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
118
119             return Rpcs.getRpcResult(true, payload, errors);
120
121         } catch (Exception e) {
122             collectErrors(e, errors);
123             return Rpcs.getRpcResult(false, null, errors);
124         }
125
126     }
127
128     public void process(MessageWrapper msg) throws TimeoutException, InterruptedException {
129         _logger.debug("Processing message [{}]", msg);
130
131         boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS);
132         if (!success)
133             throw new TimeoutException("Queue is full");
134     }
135
136     /**
137      * Block on socket for reply
138      * 
139      * @param receiver
140      * @return
141      */
142     private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException {
143         return (Message) Message.deserialize(receiver.recv());
144     }
145
146     /**
147      * Find address for the given route identifier in routing table
148      * 
149      * @param routeId
150      *            route identifier
151      * @return remote network address
152      */
153     private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) {
154         checkNotNull(routeId, "route must not be null");
155
156         Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
157         checkNotNull(routingTable.isPresent(), "Routing table is null");
158
159         Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
160         checkNotNull(addresses, "Address not found for route [%s]", routeId);
161         checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its
162                                                                                                                          // a
163                                                                                                                          // global
164                                                                                                                          // service.
165
166         String address = addresses.iterator().next();
167         checkNotNull(address, "Address not found for route [%s]", routeId);
168
169         return address;
170     }
171
172     private void collectErrors(Exception e, List<RpcError> errors) {
173         if (e == null)
174             return;
175         if (errors == null)
176             errors = new ArrayList<RpcError>();
177
178         errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
179         for (Throwable t : e.getSuppressed()) {
180             errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
181         }
182     }
183
184     @Override
185     public void close() throws Exception {
186         stop();
187     }
188 }