2 * Copyright © 2013, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.lib.jsonrpc;
10 import static java.util.Objects.requireNonNull;
12 import com.fasterxml.jackson.annotation.JsonInclude.Include;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import com.fasterxml.jackson.databind.DeserializationFeature;
15 import com.fasterxml.jackson.databind.JavaType;
16 import com.fasterxml.jackson.databind.JsonNode;
17 import com.fasterxml.jackson.databind.ObjectMapper;
18 import com.fasterxml.jackson.databind.type.TypeFactory;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelInboundHandlerAdapter;
27 import java.util.List;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
36 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
37 import org.opendaylight.ovsdb.lib.message.Response;
38 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
39 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 public class JsonRpcEndpoint extends ChannelInboundHandlerAdapter implements OvsdbRPC {
45 private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
46 private static final int REAPER_THREADS = 3;
47 private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
48 .setNameFormat("OVSDB-Lib-Future-Reaper-%d")
49 .setDaemon(true).build();
50 private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
51 = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
53 private static final JavaType JT_OBJECT = TypeFactory.defaultInstance().constructType(Object.class);
54 private static final JavaType JT_JSON_NODE = TypeFactory.defaultInstance().constructType(JsonNode.class);
55 private static final JavaType JT_LIST_JSON_NODE = TypeFactory.defaultInstance().constructParametricType(
56 List.class, JsonNode.class);
57 private static final JavaType JT_LIST_STRING = TypeFactory.defaultInstance().constructParametricType(
58 List.class, String.class);
59 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
60 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
61 .setSerializationInclusion(Include.NON_NULL);
63 private static int reaperInterval = 1000;
65 private static final class CallContext {
66 final JavaType resultType;
67 final SettableFuture future;
69 CallContext(final JavaType resultType, final SettableFuture future) {
70 this.resultType = resultType;
75 private final Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
76 private final Channel nettyChannel;
78 private volatile Callback currentCallback = null;
80 public JsonRpcEndpoint(final Channel channel) {
81 this.nettyChannel = requireNonNull(channel);
84 // FIXME: the reaper service should probably be split out
85 public static void setReaperInterval(final int interval) {
86 reaperInterval = interval;
87 LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
90 public static void close() {
91 LOG.info("Shutting down reaper executor service");
92 FUTURE_REAPER_SERVICE.shutdownNow();
96 public ListenableFuture<JsonNode> get_schema(final List<String> dbNames) {
97 return sendRequest(JT_JSON_NODE, "get_schema", dbNames);
101 public ListenableFuture<List<String>> echo() {
102 return sendRequest(JT_LIST_STRING, "echo");
106 public ListenableFuture<JsonNode> monitor(final Params equest) {
107 return sendRequest(JT_JSON_NODE, "monitor", equest);
111 public ListenableFuture<List<String>> list_dbs() {
112 return sendRequest(JT_LIST_STRING, "list_dbs");
116 public ListenableFuture<List<JsonNode>> transact(final TransactBuilder transact) {
117 return sendRequest(JT_LIST_JSON_NODE, "transact", transact);
121 public ListenableFuture<Response> cancel(final String id) {
122 // FIXME: reflection-based access did not handle this, this keeps equivalent functionality
123 throw new UnsupportedArgumentException("do not understand this argument yet");
127 public ListenableFuture<JsonNode> monitor_cancel(final Params jsonValue) {
128 return sendRequest(JT_JSON_NODE, "monitor_cancel", jsonValue);
132 public ListenableFuture<Object> lock(final List<String> id) {
133 return sendRequest(JT_OBJECT, "lock", id);
137 public ListenableFuture<Object> steal(final List<String> id) {
138 return sendRequest(JT_OBJECT, "steal", id);
142 public ListenableFuture<Object> unlock(final List<String> id) {
143 return sendRequest(JT_OBJECT, "unlock", id);
147 public boolean registerCallback(final Callback callback) {
148 if (callback == null) {
151 this.currentCallback = callback;
156 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
157 if (!(msg instanceof JsonNode)) {
158 LOG.debug("Unexpected message {}, closing channel {}", msg, nettyChannel);
159 ctx.channel().close();
163 final JsonNode jsonNode = (JsonNode) msg;
164 final JsonNode result = jsonNode.get("result");
165 if (result != null) {
166 handleResponse(jsonNode, result);
169 final JsonNode method = jsonNode.get("method");
170 if (method != null && !method.isNull()) {
171 handleRequest(jsonNode, method);
175 LOG.debug("Ignoring message {} on channel {}", jsonNode, nettyChannel);
179 public void channelReadComplete(final ChannelHandlerContext ctx) {
183 private void handleRequest(final JsonNode jsonRequest, final JsonNode jsonMethod) {
184 final JsonNode id = jsonRequest.get("id");
185 final JsonNode params = jsonRequest.get("params");
187 LOG.debug("Ignoring request with non-existent id field: {} {}", jsonMethod, params);
191 final String requestId = id.asText();
192 if (Strings.isNullOrEmpty(requestId)) {
193 LOG.debug("Ignoring equest with null or empty id field: {} {}", jsonMethod, params);
197 LOG.trace("Request : {} {} {}", id, jsonMethod, params);
199 final String method = jsonMethod.asText();
202 // Echo does not need any special processing. hence handling it internally.
203 sendEmptyResponse(requestId);
206 // send a null response for list_dbs
207 sendEmptyResponse(requestId);
210 if (!handleCallbackRequest(currentCallback, requestId, method, params)) {
211 LOG.error("No handler for Request : {} on {}", jsonRequest, nettyChannel);
217 private boolean handleCallbackRequest(final Callback callback, final String requestId, final String method,
218 final JsonNode params) {
219 if (callback == null) {
220 // No callback registered: bail out
226 final UpdateNotification arg;
228 arg = OBJECT_MAPPER.convertValue(params, UpdateNotification.class);
229 } catch (IllegalArgumentException e) {
230 return reportedMalformedParameters(requestId, e);
233 callback.update(nettyChannel, arg);
237 final List<String> arg;
239 arg = OBJECT_MAPPER.convertValue(params, JT_LIST_STRING);
240 } catch (IllegalArgumentException e) {
241 return reportedMalformedParameters(requestId, e);
244 callback.locked(nettyChannel, arg);
248 final List<String> arg;
250 arg = OBJECT_MAPPER.convertValue(params, JT_LIST_STRING);
251 } catch (IllegalArgumentException e) {
252 return reportedMalformedParameters(requestId, e);
255 callback.stolen(nettyChannel, arg);
263 private boolean reportedMalformedParameters(final String requestId, final Exception cause) {
264 LOG.debug("Request {} failed to map parameters", requestId, cause);
265 sendErrorResponse(requestId, cause.getMessage());
269 private void sendEmptyResponse(final String requestId) {
270 sendErrorResponse(requestId, null);
273 private void sendErrorResponse(final String requestId, final String error) {
274 JsonRpc10Response response = new JsonRpc10Response(requestId);
275 response.setError(error);
277 final String jsonString;
279 jsonString = OBJECT_MAPPER.writeValueAsString(response);
280 } catch (JsonProcessingException e) {
281 LOG.error("Exception while processing JSON response {}", response, e);
285 nettyChannel.writeAndFlush(jsonString);
288 private void handleResponse(final JsonNode response, final JsonNode result) {
289 LOG.trace("Response : {}", response);
290 final String requestId = response.get("id").asText();
291 final CallContext returnCtxt = methodContext.remove(requestId);
292 if (returnCtxt == null) {
293 LOG.debug("Ignoring response for unknown request {}", requestId);
297 final JsonNode error = response.get("error");
298 if (error != null && !error.isNull()) {
299 LOG.error("Request {} failed with error {}", requestId, error);
302 final Object mappedResult = OBJECT_MAPPER.convertValue(result, returnCtxt.resultType);
303 if (!returnCtxt.future.set(mappedResult)) {
304 LOG.debug("Request {} did not accept result {}", requestId, mappedResult);
308 private <T> ListenableFuture<T> sendRequest(final JsonRpc10Request request, final JavaType resultType) {
309 final String requestString;
311 requestString = OBJECT_MAPPER.writeValueAsString(request);
312 } catch (JsonProcessingException e) {
313 return Futures.immediateFailedFuture(e);
315 LOG.trace("getClient Request : {}", requestString);
317 final SettableFuture<T> sf = SettableFuture.create();
318 methodContext.put(request.getId(), new CallContext(resultType, sf));
319 FUTURE_REAPER_SERVICE.schedule(() -> {
320 CallContext cc = methodContext.remove(request.getId());
322 if (cc.future.isDone() || cc.future.isCancelled()) {
325 cc.future.cancel(false);
327 }, reaperInterval, TimeUnit.MILLISECONDS);
329 nettyChannel.writeAndFlush(requestString);
333 private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method) {
334 return sendRequest(createRequest(method), resultType);
337 private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final List params) {
338 final JsonRpc10Request request = createRequest(method);
339 request.setParams(params);
340 return sendRequest(request, resultType);
343 private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final Params params) {
344 final JsonRpc10Request request = createRequest(method);
345 request.setParams(params.params());
346 return sendRequest(request, resultType);
349 private static JsonRpc10Request createRequest(final String method) {
350 JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
351 request.setMethod(method);