2 * Copyright © 2013, 2017 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.lib.jsonrpc;
11 import com.fasterxml.jackson.core.JsonProcessingException;
12 import com.fasterxml.jackson.databind.JavaType;
13 import com.fasterxml.jackson.databind.JsonNode;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import com.fasterxml.jackson.databind.type.TypeFactory;
16 import com.google.common.reflect.Invokable;
17 import com.google.common.reflect.Reflection;
18 import com.google.common.reflect.TypeToken;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import io.netty.channel.Channel;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
36 import org.opendaylight.ovsdb.lib.error.UnexpectedResultException;
37 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
38 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public class JsonRpcEndpoint {
44 private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
45 private static final int REAPER_THREADS = 3;
46 private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
47 .setNameFormat("OVSDB-Lib-Future-Reaper-%d")
48 .setDaemon(true).build();
49 private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
50 = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
52 private static int reaperInterval = 1000;
54 public class CallContext {
56 JsonRpc10Request request;
57 SettableFuture<Object> future;
59 public CallContext(JsonRpc10Request request, Method method, SettableFuture<Object> future) {
61 this.request = request;
65 public Method getMethod() {
69 public JsonRpc10Request getRequest() {
73 public SettableFuture<Object> getFuture() {
78 ObjectMapper objectMapper;
80 Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
81 Map<Object, OvsdbRPC.Callback> requestCallbacks = new HashMap<>();
83 public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel) {
84 this.objectMapper = objectMapper;
85 this.nettyChannel = channel;
88 public <T> T getClient(final Object context, Class<T> klazz) {
90 return Reflection.newProxy(klazz, (proxy, method, args) -> {
91 if (method.getName().equals(OvsdbRPC.REGISTER_CALLBACK_METHOD)) {
92 if ((args == null) || args.length != 1 || !(args[0] instanceof OvsdbRPC.Callback)) {
95 requestCallbacks.put(context, (OvsdbRPC.Callback)args[0]);
99 JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
100 request.setMethod(method.getName());
102 if (args != null && args.length != 0) {
103 List<Object> params = null;
105 if (args.length == 1) {
106 if (args[0] instanceof Params) {
107 params = ((Params) args[0]).params();
108 } else if (args[0] instanceof List) {
109 params = (List<Object>) args[0];
112 if (params == null) {
113 throw new UnsupportedArgumentException("do not understand this argument yet");
115 request.setParams(params);
119 String requestString = objectMapper.writeValueAsString(request);
120 LOG.trace("getClient Request : {}", requestString);
122 SettableFuture<Object> sf = SettableFuture.create();
123 methodContext.put(request.getId(), new CallContext(request, method, sf));
124 FUTURE_REAPER_SERVICE.schedule(() -> {
125 CallContext cc = methodContext.remove(request.getId());
127 if (cc.getFuture().isDone() || cc.getFuture().isCancelled()) {
130 cc.getFuture().cancel(false);
132 }, reaperInterval, TimeUnit.MILLISECONDS);
134 nettyChannel.writeAndFlush(requestString);
141 public void processResult(JsonNode response) throws NoSuchMethodException {
143 LOG.trace("Response : {}", response.toString());
144 CallContext returnCtxt = methodContext.remove(response.get("id").asText());
145 if (returnCtxt == null) {
149 if (ListenableFuture.class == returnCtxt.getMethod().getReturnType()) {
150 TypeToken<?> retType = TypeToken.of(
151 returnCtxt.getMethod().getGenericReturnType())
152 .resolveType(ListenableFuture.class.getMethod("get").getGenericReturnType());
153 JavaType javaType = TypeFactory.defaultInstance().constructType(retType.getType());
155 JsonNode result = response.get("result");
156 Object result1 = objectMapper.convertValue(result, javaType);
157 JsonNode error = response.get("error");
158 if (error != null && !error.isNull()) {
159 LOG.error("Error : {}", error.toString());
162 returnCtxt.getFuture().set(result1);
165 throw new UnexpectedResultException("Don't know how to handle this");
169 public void processRequest(Object context, JsonNode requestJson) {
170 JsonRpc10Request request = new JsonRpc10Request(requestJson.get("id").asText());
171 request.setMethod(requestJson.get("method").asText());
172 LOG.trace("Request : {} {} {}", requestJson.get("id"), requestJson.get("method"),
173 requestJson.get("params"));
174 OvsdbRPC.Callback callback = requestCallbacks.get(context);
175 if (callback != null) {
176 Method[] methods = callback.getClass().getDeclaredMethods();
177 for (Method method : methods) {
178 if (method.getName().equals(request.getMethod())) {
179 Class<?>[] parameters = method.getParameterTypes();
180 JsonNode params = requestJson.get("params");
181 Object param = objectMapper.convertValue(params, parameters[1]);
183 Invokable from = Invokable.from(method);
184 from.setAccessible(true);
185 from.invoke(callback, context, param);
186 } catch (IllegalAccessException | InvocationTargetException e) {
187 LOG.error("Unable to invoke callback {}", method.getName(), e);
194 // Echo dont need any special processing. hence handling it internally.
196 if (request.getMethod().equals("echo")) {
197 JsonRpc10Response response = new JsonRpc10Response(request.getId());
198 response.setError(null);
199 String jsonString = null;
201 jsonString = objectMapper.writeValueAsString(response);
202 nettyChannel.writeAndFlush(jsonString);
203 } catch (JsonProcessingException e) {
204 LOG.error("Exception while processing JSON string {}", jsonString, e);
209 // send a null response for list_dbs
210 if (request.getMethod().equals("list_dbs")) {
211 JsonRpc10Response response = new JsonRpc10Response(request.getId());
212 response.setError(null);
213 String jsonString = null;
215 jsonString = objectMapper.writeValueAsString(response);
216 nettyChannel.writeAndFlush(jsonString);
217 } catch (JsonProcessingException e) {
218 LOG.error("Exception while processing JSON string {}", jsonString, e);
223 LOG.error("No handler for Request : {} on {}", requestJson.toString(), context);
226 public Map<String, CallContext> getMethodContext() {
227 return methodContext;
230 public static void setReaperInterval(int interval) {
231 reaperInterval = interval;
232 LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
235 public static void close() {
236 LOG.info("Shutting down reaper executor service");
237 FUTURE_REAPER_SERVICE.shutdownNow();