Turn JsonRpcEndpoint into a proper OvsdbRPC implementation
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcEndpoint.java
1 /*
2  * Copyright © 2013, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.lib.jsonrpc;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.fasterxml.jackson.core.JsonProcessingException;
13 import com.fasterxml.jackson.databind.JavaType;
14 import com.fasterxml.jackson.databind.JsonNode;
15 import com.fasterxml.jackson.databind.ObjectMapper;
16 import com.fasterxml.jackson.databind.type.TypeFactory;
17 import com.google.common.base.Strings;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
34 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
35 import org.opendaylight.ovsdb.lib.message.Response;
36 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
37 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public class JsonRpcEndpoint extends ChannelInboundHandlerAdapter implements OvsdbRPC {
42
43     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
44     private static final int REAPER_THREADS = 3;
45     private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
46             .setNameFormat("OVSDB-Lib-Future-Reaper-%d")
47             .setDaemon(true).build();
48     private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
49             = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
50
51     private static final JavaType JT_OBJECT = TypeFactory.defaultInstance().constructType(Object.class);
52     private static final JavaType JT_JSON_NODE = TypeFactory.defaultInstance().constructType(JsonNode.class);
53     private static final JavaType JT_LIST_JSON_NODE = TypeFactory.defaultInstance().constructParametricType(
54         List.class, JsonNode.class);
55     private static final JavaType JT_LIST_STRING = TypeFactory.defaultInstance().constructParametricType(
56         List.class, String.class);
57
58     private static int reaperInterval = 1000;
59
60     private static final class CallContext {
61         final JavaType resultType;
62         final SettableFuture future;
63
64         CallContext(final JavaType resultType, final SettableFuture future) {
65             this.resultType = resultType;
66             this.future = future;
67         }
68     }
69
70     private final Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
71     private final ObjectMapper objectMapper;
72     private final Channel nettyChannel;
73
74     private volatile Callback currentCallback = null;
75
76     public JsonRpcEndpoint(final ObjectMapper objectMapper, final Channel channel) {
77         this.objectMapper = requireNonNull(objectMapper);
78         this.nettyChannel = requireNonNull(channel);
79     }
80
81     // FIXME: the reaper service should probably be split out
82     public static void setReaperInterval(final int interval) {
83         reaperInterval = interval;
84         LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
85     }
86
87     public static void close() {
88         LOG.info("Shutting down reaper executor service");
89         FUTURE_REAPER_SERVICE.shutdownNow();
90     }
91
92     @Override
93     public ListenableFuture<JsonNode> get_schema(final List<String> dbNames) {
94         return sendRequest(JT_JSON_NODE, "get_schema", dbNames);
95     }
96
97     @Override
98     public ListenableFuture<List<String>> echo() {
99         return sendRequest(JT_LIST_STRING, "echo");
100     }
101
102     @Override
103     public ListenableFuture<JsonNode> monitor(final Params equest) {
104         return sendRequest(JT_JSON_NODE, "monitor", equest);
105     }
106
107     @Override
108     public ListenableFuture<List<String>> list_dbs() {
109         return sendRequest(JT_LIST_STRING, "list_dbs");
110     }
111
112     @Override
113     public ListenableFuture<List<JsonNode>> transact(final TransactBuilder transact) {
114         return sendRequest(JT_LIST_JSON_NODE, "transact", transact);
115     }
116
117     @Override
118     public ListenableFuture<Response> cancel(final String id) {
119         // FIXME: reflection-based access did not handle this, this keeps equivalent functionality
120         throw new UnsupportedArgumentException("do not understand this argument yet");
121     }
122
123     @Override
124     public ListenableFuture<JsonNode> monitor_cancel(final Params jsonValue) {
125         return sendRequest(JT_JSON_NODE, "monitor_cancel", jsonValue);
126     }
127
128     @Override
129     public ListenableFuture<Object> lock(final List<String> id) {
130         return sendRequest(JT_OBJECT, "lock", id);
131     }
132
133     @Override
134     public ListenableFuture<Object> steal(final List<String> id) {
135         return sendRequest(JT_OBJECT, "steal", id);
136     }
137
138     @Override
139     public ListenableFuture<Object> unlock(final List<String> id) {
140         return sendRequest(JT_OBJECT, "unlock", id);
141     }
142
143     @Override
144     public boolean registerCallback(final Callback callback) {
145         if (callback == null) {
146             return false;
147         }
148         this.currentCallback = callback;
149         return true;
150     }
151
152     @Override
153     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
154         if (!(msg instanceof JsonNode)) {
155             LOG.debug("Unexpected message {}, closing channel {}", msg, nettyChannel);
156             ctx.channel().close();
157             return;
158         }
159
160         final JsonNode jsonNode = (JsonNode) msg;
161         final JsonNode result = jsonNode.get("result");
162         if (result != null) {
163             handleResponse(jsonNode, result);
164             return;
165         }
166         final JsonNode method = jsonNode.get("method");
167         if (method != null && !method.isNull()) {
168             handleRequest(jsonNode, method);
169             return;
170         }
171
172         LOG.debug("Ignoring message {} on channel {}", jsonNode, nettyChannel);
173     }
174
175     @Override
176     public void channelReadComplete(final ChannelHandlerContext ctx) {
177         ctx.flush();
178     }
179
180     private void handleRequest(final JsonNode jsonRequest, final JsonNode jsonMethod) {
181         final JsonNode id = jsonRequest.get("id");
182         final JsonNode params = jsonRequest.get("params");
183         if (id == null) {
184             LOG.debug("Ignoring request with non-existent id field: {} {}", jsonMethod, params);
185             return;
186         }
187
188         final String requestId = id.asText();
189         if (Strings.isNullOrEmpty(requestId)) {
190             LOG.debug("Ignoring equest with null or empty id field: {} {}", jsonMethod, params);
191             return;
192         }
193
194         LOG.trace("Request : {} {} {}", id, jsonMethod, params);
195
196         final String method = jsonMethod.asText();
197         switch (method) {
198             case "echo":
199                 // Echo does not need any special processing. hence handling it internally.
200                 sendEmptyResponse(requestId);
201                 return;
202             case "list_dbs":
203                 // send a null response for list_dbs
204                 sendEmptyResponse(requestId);
205                 return;
206             default:
207                 if (!handleCallbackRequest(currentCallback, requestId, method, params)) {
208                     LOG.error("No handler for Request : {} on {}", jsonRequest, nettyChannel);
209                 }
210         }
211
212     }
213
214     private boolean handleCallbackRequest(final Callback callback, final String requestId, final String method,
215             final JsonNode params) {
216         if (callback == null) {
217             // No callback registered: bail out
218             return false;
219         }
220
221         switch (method) {
222             case "update": {
223                 final UpdateNotification arg;
224                 try {
225                     arg = objectMapper.convertValue(params, UpdateNotification.class);
226                 } catch (IllegalArgumentException e) {
227                     return reportedMalformedParameters(requestId, e);
228                 }
229
230                 callback.update(nettyChannel, arg);
231                 return true;
232             }
233             case "locked": {
234                 final List<String> arg;
235                 try {
236                     arg = objectMapper.convertValue(params, JT_LIST_STRING);
237                 } catch (IllegalArgumentException e) {
238                     return reportedMalformedParameters(requestId, e);
239                 }
240
241                 callback.locked(nettyChannel, arg);
242                 return true;
243             }
244             case "stolen": {
245                 final List<String> arg;
246                 try {
247                     arg = objectMapper.convertValue(params, JT_LIST_STRING);
248                 } catch (IllegalArgumentException e) {
249                     return reportedMalformedParameters(requestId, e);
250                 }
251
252                 callback.stolen(nettyChannel, arg);
253                 return true;
254             }
255             default:
256                 return false;
257         }
258     }
259
260     private boolean reportedMalformedParameters(final String requestId, final Exception cause) {
261         LOG.debug("Request {} failed to map parameters", requestId, cause);
262         sendErrorResponse(requestId, cause.getMessage());
263         return true;
264     }
265
266     private void sendEmptyResponse(final String requestId) {
267         sendErrorResponse(requestId, null);
268     }
269
270     private void sendErrorResponse(final String requestId, final String error) {
271         JsonRpc10Response response = new JsonRpc10Response(requestId);
272         response.setError(error);
273
274         final String jsonString;
275         try {
276             jsonString = objectMapper.writeValueAsString(response);
277         } catch (JsonProcessingException e) {
278             LOG.error("Exception while processing JSON response {}", response, e);
279             return;
280         }
281
282         nettyChannel.writeAndFlush(jsonString);
283     }
284
285     private void handleResponse(final JsonNode response, final JsonNode result) {
286         LOG.trace("Response : {}", response);
287         final String requestId = response.get("id").asText();
288         final CallContext returnCtxt = methodContext.remove(requestId);
289         if (returnCtxt == null) {
290             LOG.debug("Ignoring response for unknown request {}", requestId);
291             return;
292         }
293
294         final JsonNode error = response.get("error");
295         if (error != null && !error.isNull()) {
296             LOG.error("Request {} failed with error {}", requestId, error);
297         }
298
299         final Object mappedResult = objectMapper.convertValue(result, returnCtxt.resultType);
300         if (!returnCtxt.future.set(mappedResult)) {
301             LOG.debug("Request {} did not accept result {}", requestId, mappedResult);
302         }
303     }
304
305     private <T> ListenableFuture<T> sendRequest(final JsonRpc10Request request, final JavaType resultType) {
306         final String requestString;
307         try {
308             requestString = objectMapper.writeValueAsString(request);
309         } catch (JsonProcessingException e) {
310             return Futures.immediateFailedFuture(e);
311         }
312         LOG.trace("getClient Request : {}", requestString);
313
314         final SettableFuture<T> sf = SettableFuture.create();
315         methodContext.put(request.getId(), new CallContext(resultType, sf));
316         FUTURE_REAPER_SERVICE.schedule(() -> {
317             CallContext cc = methodContext.remove(request.getId());
318             if (cc != null) {
319                 if (cc.future.isDone() || cc.future.isCancelled()) {
320                     return;
321                 }
322                 cc.future.cancel(false);
323             }
324         }, reaperInterval, TimeUnit.MILLISECONDS);
325
326         nettyChannel.writeAndFlush(requestString);
327         return sf;
328     }
329
330     private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method) {
331         return sendRequest(createRequest(method), resultType);
332     }
333
334     private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final List params) {
335         final JsonRpc10Request request = createRequest(method);
336         request.setParams(params);
337         return sendRequest(request, resultType);
338     }
339
340     private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final Params params) {
341         final JsonRpc10Request request = createRequest(method);
342         request.setParams(params.params());
343         return sendRequest(request, resultType);
344     }
345
346     private static JsonRpc10Request createRequest(final String method) {
347         JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
348         request.setMethod(method);
349         return request;
350     }
351 }