Do not use Foo.toString() when logging
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcEndpoint.java
1 /*
2  * Copyright © 2013, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.lib.jsonrpc;
10
11 import com.fasterxml.jackson.core.JsonProcessingException;
12 import com.fasterxml.jackson.databind.JavaType;
13 import com.fasterxml.jackson.databind.JsonNode;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import com.fasterxml.jackson.databind.type.TypeFactory;
16 import com.google.common.reflect.Invokable;
17 import com.google.common.reflect.Reflection;
18 import com.google.common.reflect.TypeToken;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import io.netty.channel.Channel;
23 import java.lang.reflect.InvocationTargetException;
24 import java.lang.reflect.Method;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.ovsdb.lib.error.UnexpectedResultException;
35 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
36 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 public class JsonRpcEndpoint {
41
42     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
43     private static final int REAPER_THREADS = 3;
44     private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
45             .setNameFormat("OVSDB-Lib-Future-Reaper-%d")
46             .setDaemon(true).build();
47     private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
48             = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
49
50     private static int reaperInterval = 1000;
51
52     public static class CallContext {
53         Method method;
54         JsonRpc10Request request;
55         SettableFuture<Object> future;
56
57         public CallContext(JsonRpc10Request request, Method method, SettableFuture<Object> future) {
58             this.method = method;
59             this.request = request;
60             this.future = future;
61         }
62
63         public Method getMethod() {
64             return method;
65         }
66
67         public JsonRpc10Request getRequest() {
68             return request;
69         }
70
71         public SettableFuture<Object> getFuture() {
72             return future;
73         }
74     }
75
76     ObjectMapper objectMapper;
77     Channel nettyChannel;
78     Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
79     Map<Object, OvsdbRPC.Callback> requestCallbacks = new HashMap<>();
80
81     public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel) {
82         this.objectMapper = objectMapper;
83         this.nettyChannel = channel;
84     }
85
86     public <T> T getClient(final Object context, Class<T> klazz) {
87
88         return Reflection.newProxy(klazz, (proxy, method, args) -> {
89             if (method.getName().equals(OvsdbRPC.REGISTER_CALLBACK_METHOD)) {
90                 if (args == null || args.length != 1 || !(args[0] instanceof OvsdbRPC.Callback)) {
91                     return false;
92                 }
93                 requestCallbacks.put(context, (OvsdbRPC.Callback)args[0]);
94                 return true;
95             }
96
97             JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
98             request.setMethod(method.getName());
99
100             if (args != null && args.length != 0) {
101                 List<Object> params = null;
102
103                 if (args.length == 1) {
104                     if (args[0] instanceof Params) {
105                         params = ((Params) args[0]).params();
106                     } else if (args[0] instanceof List) {
107                         params = (List<Object>) args[0];
108                     }
109
110                     if (params == null) {
111                         throw new UnsupportedArgumentException("do not understand this argument yet");
112                     }
113                     request.setParams(params);
114                 }
115             }
116
117             String requestString = objectMapper.writeValueAsString(request);
118             LOG.trace("getClient Request : {}", requestString);
119
120             SettableFuture<Object> sf = SettableFuture.create();
121             methodContext.put(request.getId(), new CallContext(request, method, sf));
122             FUTURE_REAPER_SERVICE.schedule(() -> {
123                 CallContext cc = methodContext.remove(request.getId());
124                 if (cc != null) {
125                     if (cc.getFuture().isDone() || cc.getFuture().isCancelled()) {
126                         return;
127                     }
128                     cc.getFuture().cancel(false);
129                 }
130             }, reaperInterval, TimeUnit.MILLISECONDS);
131
132             nettyChannel.writeAndFlush(requestString);
133
134             return sf;
135         }
136         );
137     }
138
139     public void processResult(JsonNode response) throws NoSuchMethodException {
140
141         LOG.trace("Response : {}", response);
142         CallContext returnCtxt = methodContext.remove(response.get("id").asText());
143         if (returnCtxt == null) {
144             return;
145         }
146
147         if (ListenableFuture.class == returnCtxt.getMethod().getReturnType()) {
148             TypeToken<?> retType = TypeToken.of(
149                     returnCtxt.getMethod().getGenericReturnType())
150                     .resolveType(ListenableFuture.class.getMethod("get").getGenericReturnType());
151             JavaType javaType =  TypeFactory.defaultInstance().constructType(retType.getType());
152
153             JsonNode result = response.get("result");
154             Object result1 = objectMapper.convertValue(result, javaType);
155             JsonNode error = response.get("error");
156             if (error != null && !error.isNull()) {
157                 LOG.error("Error : {}", error);
158             }
159
160             returnCtxt.getFuture().set(result1);
161
162         } else {
163             throw new UnexpectedResultException("Don't know how to handle this");
164         }
165     }
166
167     public void processRequest(Object context, JsonNode requestJson) {
168         JsonRpc10Request request = new JsonRpc10Request(requestJson.get("id").asText());
169         request.setMethod(requestJson.get("method").asText());
170         LOG.trace("Request : {} {} {}", requestJson.get("id"), requestJson.get("method"),
171                 requestJson.get("params"));
172         OvsdbRPC.Callback callback = requestCallbacks.get(context);
173         if (callback != null) {
174             Method[] methods = callback.getClass().getDeclaredMethods();
175             for (Method method : methods) {
176                 if (method.getName().equals(request.getMethod())) {
177                     Class<?>[] parameters = method.getParameterTypes();
178                     JsonNode params = requestJson.get("params");
179                     Object param = objectMapper.convertValue(params, parameters[1]);
180                     try {
181                         Invokable from = Invokable.from(method);
182                         from.setAccessible(true);
183                         from.invoke(callback, context, param);
184                     } catch (IllegalAccessException | InvocationTargetException e) {
185                         LOG.error("Unable to invoke callback {}", method.getName(), e);
186                     }
187                     return;
188                 }
189             }
190         }
191
192         // Echo dont need any special processing. hence handling it internally.
193
194         if (request.getMethod().equals("echo")) {
195             JsonRpc10Response response = new JsonRpc10Response(request.getId());
196             response.setError(null);
197             try {
198                 String jsonString = objectMapper.writeValueAsString(response);
199                 nettyChannel.writeAndFlush(jsonString);
200             } catch (JsonProcessingException e) {
201                 LOG.error("Exception while processing JSON response {}", response, e);
202             }
203             return;
204         }
205
206         // send a null response for list_dbs
207         if (request.getMethod().equals("list_dbs")) {
208             JsonRpc10Response response = new JsonRpc10Response(request.getId());
209             response.setError(null);
210             try {
211                 String jsonString = objectMapper.writeValueAsString(response);
212                 nettyChannel.writeAndFlush(jsonString);
213             } catch (JsonProcessingException e) {
214                 LOG.error("Exception while processing JSON response {}", response, e);
215             }
216             return;
217         }
218
219         LOG.error("No handler for Request : {} on {}", requestJson, context);
220     }
221
222     public Map<String, CallContext> getMethodContext() {
223         return methodContext;
224     }
225
226     public static void setReaperInterval(int interval) {
227         reaperInterval = interval;
228         LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
229     }
230
231     public static void close() {
232         LOG.info("Shutting down reaper executor service");
233         FUTURE_REAPER_SERVICE.shutdownNow();
234     }
235 }