d9f6dfdb40c87e469999865b1c9c8619dc08c68e
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcEndpoint.java
1 /*
2  * Copyright (c) 2013, 2015 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.lib.jsonrpc;
10
11 import com.fasterxml.jackson.core.JsonProcessingException;
12 import com.fasterxml.jackson.databind.JavaType;
13 import com.fasterxml.jackson.databind.JsonNode;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import com.fasterxml.jackson.databind.type.TypeFactory;
16 import com.google.common.collect.Maps;
17 import com.google.common.reflect.Invokable;
18 import com.google.common.reflect.Reflection;
19 import com.google.common.reflect.TypeToken;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import io.netty.channel.Channel;
24 import java.lang.reflect.InvocationHandler;
25 import java.lang.reflect.InvocationTargetException;
26 import java.lang.reflect.Method;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34
35 import org.opendaylight.ovsdb.lib.error.UnexpectedResultException;
36 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
37 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public class JsonRpcEndpoint {
42
43     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
44     private static final int REAPER_INTERVAL = 300;
45     private static final int REAPER_THREADS = 3;
46     private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
47             .setNameFormat("OVSDB-Lib-Future-Reaper-%d").build();
48     private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
49             = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
50
51     public class CallContext {
52         Method method;
53         JsonRpc10Request request;
54         SettableFuture<Object> future;
55
56         public CallContext(JsonRpc10Request request, Method method, SettableFuture<Object> future) {
57             this.method = method;
58             this.request = request;
59             this.future = future;
60         }
61
62         public Method getMethod() {
63             return method;
64         }
65
66         public JsonRpc10Request getRequest() {
67             return request;
68         }
69
70         public SettableFuture<Object> getFuture() {
71             return future;
72         }
73     }
74
75     ObjectMapper objectMapper;
76     Channel nettyChannel;
77     Map<String, CallContext> methodContext = Maps.newHashMap();
78     Map<Object, OvsdbRPC.Callback> requestCallbacks = Maps.newHashMap();
79
80     public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel) {
81         this.objectMapper = objectMapper;
82         this.nettyChannel = channel;
83     }
84
85     public <T> T getClient(final Object context, Class<T> klazz) {
86
87         return Reflection.newProxy(klazz, new InvocationHandler() {
88             @Override
89             public Object invoke(Object proxy, Method method, Object[] args) throws Exception {
90                 if (method.getName().equals(OvsdbRPC.REGISTER_CALLBACK_METHOD)) {
91                     if ((args == null) || args.length != 1 || !(args[0] instanceof OvsdbRPC.Callback)) {
92                         return false;
93                     }
94                     requestCallbacks.put(context, (OvsdbRPC.Callback)args[0]);
95                     return true;
96                 }
97
98                 JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
99                 request.setMethod(method.getName());
100
101                 if (args != null && args.length != 0) {
102                     List<Object> params = null;
103
104                     if (args.length == 1) {
105                         if (args[0] instanceof Params) {
106                             params = ((Params) args[0]).params();
107                         } else if (args[0] instanceof List) {
108                             params = (List<Object>) args[0];
109                         }
110
111                         if (params == null) {
112                             throw new UnsupportedArgumentException("do not understand this argument yet");
113                         }
114                         request.setParams(params);
115                     }
116                 }
117
118                 String requestString = objectMapper.writeValueAsString(request);
119                 LOG.trace("getClient Request : {}", requestString);
120
121                 SettableFuture<Object> sf = SettableFuture.create();
122                 methodContext.put(request.getId(), new CallContext(request, method, sf));
123                 FUTURE_REAPER_SERVICE.schedule(new Runnable() {
124                     @Override
125                     public void run() {
126                         CallContext cc = methodContext.remove(request.getId());
127                         if ( cc != null) {
128                             if ( cc.getFuture().isDone() || cc.getFuture().isCancelled()) {
129                                 return;
130                             }
131                             cc.getFuture().cancel(false);
132                         }
133                     }
134                 },REAPER_INTERVAL, TimeUnit.MILLISECONDS);
135
136                 nettyChannel.writeAndFlush(requestString);
137
138                 return sf;
139             }
140         }
141         );
142     }
143
144     public void processResult(JsonNode response) throws NoSuchMethodException {
145
146         LOG.trace("Response : {}", response.toString());
147         CallContext returnCtxt = methodContext.remove(response.get("id").asText());
148         if (returnCtxt == null) {
149             return;
150         }
151
152         if (ListenableFuture.class == returnCtxt.getMethod().getReturnType()) {
153             TypeToken<?> retType = TypeToken.of(
154                     returnCtxt.getMethod().getGenericReturnType())
155                     .resolveType(ListenableFuture.class.getMethod("get").getGenericReturnType());
156             JavaType javaType =  TypeFactory.defaultInstance().constructType(retType.getType());
157
158             JsonNode result = response.get("result");
159             Object result1 = objectMapper.convertValue(result, javaType);
160             JsonNode error = response.get("error");
161             if (error != null && !error.isNull()) {
162                 LOG.error("Error : {}", error.toString());
163             }
164
165             returnCtxt.getFuture().set(result1);
166
167         } else {
168             throw new UnexpectedResultException("Don't know how to handle this");
169         }
170     }
171
172     public void processRequest(Object context, JsonNode requestJson) {
173         JsonRpc10Request request = new JsonRpc10Request(requestJson.get("id").asText());
174         request.setMethod(requestJson.get("method").asText());
175         LOG.trace("Request : {} {} {}", requestJson.get("id"), requestJson.get("method"),
176                 requestJson.get("params"));
177         OvsdbRPC.Callback callback = requestCallbacks.get(context);
178         if (callback != null) {
179             Method[] methods = callback.getClass().getDeclaredMethods();
180             for (Method method : methods) {
181                 if (method.getName().equals(request.getMethod())) {
182                     Class<?>[] parameters = method.getParameterTypes();
183                     JsonNode params = requestJson.get("params");
184                     Object param = objectMapper.convertValue(params, parameters[1]);
185                     try {
186                         Invokable from = Invokable.from(method);
187                         from.setAccessible(true);
188                         from.invoke(callback, context, param);
189                     } catch (IllegalAccessException | InvocationTargetException e) {
190                         LOG.error("Unable to invoke callback {}", method.getName(), e);
191                     }
192                     return;
193                 }
194             }
195         }
196
197         // Echo dont need any special processing. hence handling it internally.
198
199         if (request.getMethod().equals("echo")) {
200             JsonRpc10Response response = new JsonRpc10Response(request.getId());
201             response.setError(null);
202             String jsonString = null;
203             try {
204                 jsonString = objectMapper.writeValueAsString(response);
205                 nettyChannel.writeAndFlush(jsonString);
206             } catch (JsonProcessingException e) {
207                 LOG.error("Exception while processing JSON string {}", jsonString, e);
208             }
209             return;
210         }
211
212         // send a null response for list_dbs
213         if (request.getMethod().equals("list_dbs")) {
214             JsonRpc10Response response = new JsonRpc10Response(request.getId());
215             response.setError(null);
216             String jsonString = null;
217             try {
218                 jsonString = objectMapper.writeValueAsString(response);
219                 nettyChannel.writeAndFlush(jsonString);
220             } catch (JsonProcessingException e) {
221                 LOG.error("Exception while processing JSON string {}", jsonString, e);
222             }
223             return;
224         }
225
226         LOG.error("No handler for Request : {} on {}", requestJson.toString(), context);
227     }
228
229     public Map<String, CallContext> getMethodContext() {
230         return methodContext;
231     }
232 }