2 * Copyright © 2013, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.lib.jsonrpc;
10 import static java.util.Objects.requireNonNull;
12 import com.fasterxml.jackson.core.JsonProcessingException;
13 import com.fasterxml.jackson.databind.JavaType;
14 import com.fasterxml.jackson.databind.JsonNode;
15 import com.fasterxml.jackson.databind.ObjectMapper;
16 import com.fasterxml.jackson.databind.type.TypeFactory;
17 import com.google.common.base.Strings;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import java.util.List;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
34 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
35 import org.opendaylight.ovsdb.lib.message.Response;
36 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
37 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 public class JsonRpcEndpoint extends ChannelInboundHandlerAdapter implements OvsdbRPC {
43 private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
44 private static final int REAPER_THREADS = 3;
45 private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
46 .setNameFormat("OVSDB-Lib-Future-Reaper-%d")
47 .setDaemon(true).build();
48 private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
49 = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
51 private static final JavaType JT_OBJECT = TypeFactory.defaultInstance().constructType(Object.class);
52 private static final JavaType JT_JSON_NODE = TypeFactory.defaultInstance().constructType(JsonNode.class);
53 private static final JavaType JT_LIST_JSON_NODE = TypeFactory.defaultInstance().constructParametricType(
54 List.class, JsonNode.class);
55 private static final JavaType JT_LIST_STRING = TypeFactory.defaultInstance().constructParametricType(
56 List.class, String.class);
58 private static int reaperInterval = 1000;
60 private static final class CallContext {
61 final JavaType resultType;
62 final SettableFuture future;
64 CallContext(final JavaType resultType, final SettableFuture future) {
65 this.resultType = resultType;
70 private final Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
71 private final ObjectMapper objectMapper;
72 private final Channel nettyChannel;
74 private volatile Callback currentCallback = null;
76 public JsonRpcEndpoint(final ObjectMapper objectMapper, final Channel channel) {
77 this.objectMapper = requireNonNull(objectMapper);
78 this.nettyChannel = requireNonNull(channel);
81 // FIXME: the reaper service should probably be split out
82 public static void setReaperInterval(final int interval) {
83 reaperInterval = interval;
84 LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
87 public static void close() {
88 LOG.info("Shutting down reaper executor service");
89 FUTURE_REAPER_SERVICE.shutdownNow();
93 public ListenableFuture<JsonNode> get_schema(final List<String> dbNames) {
94 return sendRequest(JT_JSON_NODE, "get_schema", dbNames);
98 public ListenableFuture<List<String>> echo() {
99 return sendRequest(JT_LIST_STRING, "echo");
103 public ListenableFuture<JsonNode> monitor(final Params equest) {
104 return sendRequest(JT_JSON_NODE, "monitor", equest);
108 public ListenableFuture<List<String>> list_dbs() {
109 return sendRequest(JT_LIST_STRING, "list_dbs");
113 public ListenableFuture<List<JsonNode>> transact(final TransactBuilder transact) {
114 return sendRequest(JT_LIST_JSON_NODE, "transact", transact);
118 public ListenableFuture<Response> cancel(final String id) {
119 // FIXME: reflection-based access did not handle this, this keeps equivalent functionality
120 throw new UnsupportedArgumentException("do not understand this argument yet");
124 public ListenableFuture<JsonNode> monitor_cancel(final Params jsonValue) {
125 return sendRequest(JT_JSON_NODE, "monitor_cancel", jsonValue);
129 public ListenableFuture<Object> lock(final List<String> id) {
130 return sendRequest(JT_OBJECT, "lock", id);
134 public ListenableFuture<Object> steal(final List<String> id) {
135 return sendRequest(JT_OBJECT, "steal", id);
139 public ListenableFuture<Object> unlock(final List<String> id) {
140 return sendRequest(JT_OBJECT, "unlock", id);
144 public boolean registerCallback(final Callback callback) {
145 if (callback == null) {
148 this.currentCallback = callback;
153 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
154 if (!(msg instanceof JsonNode)) {
155 LOG.debug("Unexpected message {}, closing channel {}", msg, nettyChannel);
156 ctx.channel().close();
160 final JsonNode jsonNode = (JsonNode) msg;
161 final JsonNode result = jsonNode.get("result");
162 if (result != null) {
163 handleResponse(jsonNode, result);
166 final JsonNode method = jsonNode.get("method");
167 if (method != null && !method.isNull()) {
168 handleRequest(jsonNode, method);
172 LOG.debug("Ignoring message {} on channel {}", jsonNode, nettyChannel);
176 public void channelReadComplete(final ChannelHandlerContext ctx) {
180 private void handleRequest(final JsonNode jsonRequest, final JsonNode jsonMethod) {
181 final JsonNode id = jsonRequest.get("id");
182 final JsonNode params = jsonRequest.get("params");
184 LOG.debug("Ignoring request with non-existent id field: {} {}", jsonMethod, params);
188 final String requestId = id.asText();
189 if (Strings.isNullOrEmpty(requestId)) {
190 LOG.debug("Ignoring equest with null or empty id field: {} {}", jsonMethod, params);
194 LOG.trace("Request : {} {} {}", id, jsonMethod, params);
196 final String method = jsonMethod.asText();
199 // Echo does not need any special processing. hence handling it internally.
200 sendEmptyResponse(requestId);
203 // send a null response for list_dbs
204 sendEmptyResponse(requestId);
207 if (!handleCallbackRequest(currentCallback, requestId, method, params)) {
208 LOG.error("No handler for Request : {} on {}", jsonRequest, nettyChannel);
214 private boolean handleCallbackRequest(final Callback callback, final String requestId, final String method,
215 final JsonNode params) {
216 if (callback == null) {
217 // No callback registered: bail out
223 final UpdateNotification arg;
225 arg = objectMapper.convertValue(params, UpdateNotification.class);
226 } catch (IllegalArgumentException e) {
227 return reportedMalformedParameters(requestId, e);
230 callback.update(nettyChannel, arg);
234 final List<String> arg;
236 arg = objectMapper.convertValue(params, JT_LIST_STRING);
237 } catch (IllegalArgumentException e) {
238 return reportedMalformedParameters(requestId, e);
241 callback.locked(nettyChannel, arg);
245 final List<String> arg;
247 arg = objectMapper.convertValue(params, JT_LIST_STRING);
248 } catch (IllegalArgumentException e) {
249 return reportedMalformedParameters(requestId, e);
252 callback.stolen(nettyChannel, arg);
260 private boolean reportedMalformedParameters(final String requestId, final Exception cause) {
261 LOG.debug("Request {} failed to map parameters", requestId, cause);
262 sendErrorResponse(requestId, cause.getMessage());
266 private void sendEmptyResponse(final String requestId) {
267 sendErrorResponse(requestId, null);
270 private void sendErrorResponse(final String requestId, final String error) {
271 JsonRpc10Response response = new JsonRpc10Response(requestId);
272 response.setError(error);
274 final String jsonString;
276 jsonString = objectMapper.writeValueAsString(response);
277 } catch (JsonProcessingException e) {
278 LOG.error("Exception while processing JSON response {}", response, e);
282 nettyChannel.writeAndFlush(jsonString);
285 private void handleResponse(final JsonNode response, final JsonNode result) {
286 LOG.trace("Response : {}", response);
287 final String requestId = response.get("id").asText();
288 final CallContext returnCtxt = methodContext.remove(requestId);
289 if (returnCtxt == null) {
290 LOG.debug("Ignoring response for unknown request {}", requestId);
294 final JsonNode error = response.get("error");
295 if (error != null && !error.isNull()) {
296 LOG.error("Request {} failed with error {}", requestId, error);
299 final Object mappedResult = objectMapper.convertValue(result, returnCtxt.resultType);
300 if (!returnCtxt.future.set(mappedResult)) {
301 LOG.debug("Request {} did not accept result {}", requestId, mappedResult);
305 private <T> ListenableFuture<T> sendRequest(final JsonRpc10Request request, final JavaType resultType) {
306 final String requestString;
308 requestString = objectMapper.writeValueAsString(request);
309 } catch (JsonProcessingException e) {
310 return Futures.immediateFailedFuture(e);
312 LOG.trace("getClient Request : {}", requestString);
314 final SettableFuture<T> sf = SettableFuture.create();
315 methodContext.put(request.getId(), new CallContext(resultType, sf));
316 FUTURE_REAPER_SERVICE.schedule(() -> {
317 CallContext cc = methodContext.remove(request.getId());
319 if (cc.future.isDone() || cc.future.isCancelled()) {
322 cc.future.cancel(false);
324 }, reaperInterval, TimeUnit.MILLISECONDS);
326 nettyChannel.writeAndFlush(requestString);
330 private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method) {
331 return sendRequest(createRequest(method), resultType);
334 private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final List params) {
335 final JsonRpc10Request request = createRequest(method);
336 request.setParams(params);
337 return sendRequest(request, resultType);
340 private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final Params params) {
341 final JsonRpc10Request request = createRequest(method);
342 request.setParams(params.params());
343 return sendRequest(request, resultType);
346 private static JsonRpc10Request createRequest(final String method) {
347 JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
348 request.setMethod(method);