Do not allow TableSchema columns to be directly set
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / jsonrpc / JsonRpcEndpoint.java
1 /*
2  * Copyright © 2013, 2017 EBay Software Foundation and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.lib.jsonrpc;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.fasterxml.jackson.annotation.JsonInclude.Include;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import com.fasterxml.jackson.databind.DeserializationFeature;
15 import com.fasterxml.jackson.databind.JavaType;
16 import com.fasterxml.jackson.databind.JsonNode;
17 import com.fasterxml.jackson.databind.ObjectMapper;
18 import com.fasterxml.jackson.databind.type.TypeFactory;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelInboundHandlerAdapter;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
36 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
37 import org.opendaylight.ovsdb.lib.message.Response;
38 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
39 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public class JsonRpcEndpoint extends ChannelInboundHandlerAdapter implements OvsdbRPC {
44
45     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
46     private static final int REAPER_THREADS = 3;
47     private static final ThreadFactory FUTURE_REAPER_THREAD_FACTORY = new ThreadFactoryBuilder()
48             .setNameFormat("OVSDB-Lib-Future-Reaper-%d")
49             .setDaemon(true).build();
50     private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
51             = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
52
53     private static final JavaType JT_OBJECT = TypeFactory.defaultInstance().constructType(Object.class);
54     private static final JavaType JT_JSON_NODE = TypeFactory.defaultInstance().constructType(JsonNode.class);
55     private static final JavaType JT_LIST_JSON_NODE = TypeFactory.defaultInstance().constructParametricType(
56         List.class, JsonNode.class);
57     private static final JavaType JT_LIST_STRING = TypeFactory.defaultInstance().constructParametricType(
58         List.class, String.class);
59     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
60             .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
61             .setSerializationInclusion(Include.NON_NULL);
62
63     private static int reaperInterval = 1000;
64
65     private static final class CallContext {
66         final JavaType resultType;
67         final SettableFuture future;
68
69         CallContext(final JavaType resultType, final SettableFuture future) {
70             this.resultType = resultType;
71             this.future = future;
72         }
73     }
74
75     private final Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
76     private final Channel nettyChannel;
77
78     private volatile Callback currentCallback = null;
79
80     public JsonRpcEndpoint(final Channel channel) {
81         this.nettyChannel = requireNonNull(channel);
82     }
83
84     // FIXME: the reaper service should probably be split out
85     public static void setReaperInterval(final int interval) {
86         reaperInterval = interval;
87         LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
88     }
89
90     public static void close() {
91         LOG.info("Shutting down reaper executor service");
92         FUTURE_REAPER_SERVICE.shutdownNow();
93     }
94
95     @Override
96     public ListenableFuture<JsonNode> get_schema(final List<String> dbNames) {
97         return sendRequest(JT_JSON_NODE, "get_schema", dbNames);
98     }
99
100     @Override
101     public ListenableFuture<List<String>> echo() {
102         return sendRequest(JT_LIST_STRING, "echo");
103     }
104
105     @Override
106     public ListenableFuture<JsonNode> monitor(final Params equest) {
107         return sendRequest(JT_JSON_NODE, "monitor", equest);
108     }
109
110     @Override
111     public ListenableFuture<List<String>> list_dbs() {
112         return sendRequest(JT_LIST_STRING, "list_dbs");
113     }
114
115     @Override
116     public ListenableFuture<List<JsonNode>> transact(final TransactBuilder transact) {
117         return sendRequest(JT_LIST_JSON_NODE, "transact", transact);
118     }
119
120     @Override
121     public ListenableFuture<Response> cancel(final String id) {
122         // FIXME: reflection-based access did not handle this, this keeps equivalent functionality
123         throw new UnsupportedArgumentException("do not understand this argument yet");
124     }
125
126     @Override
127     public ListenableFuture<JsonNode> monitor_cancel(final Params jsonValue) {
128         return sendRequest(JT_JSON_NODE, "monitor_cancel", jsonValue);
129     }
130
131     @Override
132     public ListenableFuture<Object> lock(final List<String> id) {
133         return sendRequest(JT_OBJECT, "lock", id);
134     }
135
136     @Override
137     public ListenableFuture<Object> steal(final List<String> id) {
138         return sendRequest(JT_OBJECT, "steal", id);
139     }
140
141     @Override
142     public ListenableFuture<Object> unlock(final List<String> id) {
143         return sendRequest(JT_OBJECT, "unlock", id);
144     }
145
146     @Override
147     public boolean registerCallback(final Callback callback) {
148         if (callback == null) {
149             return false;
150         }
151         this.currentCallback = callback;
152         return true;
153     }
154
155     @Override
156     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
157         if (!(msg instanceof JsonNode)) {
158             LOG.debug("Unexpected message {}, closing channel {}", msg, nettyChannel);
159             ctx.channel().close();
160             return;
161         }
162
163         final JsonNode jsonNode = (JsonNode) msg;
164         final JsonNode result = jsonNode.get("result");
165         if (result != null) {
166             handleResponse(jsonNode, result);
167             return;
168         }
169         final JsonNode method = jsonNode.get("method");
170         if (method != null && !method.isNull()) {
171             handleRequest(jsonNode, method);
172             return;
173         }
174
175         LOG.debug("Ignoring message {} on channel {}", jsonNode, nettyChannel);
176     }
177
178     @Override
179     public void channelReadComplete(final ChannelHandlerContext ctx) {
180         ctx.flush();
181     }
182
183     private void handleRequest(final JsonNode jsonRequest, final JsonNode jsonMethod) {
184         final JsonNode id = jsonRequest.get("id");
185         final JsonNode params = jsonRequest.get("params");
186         if (id == null) {
187             LOG.debug("Ignoring request with non-existent id field: {} {}", jsonMethod, params);
188             return;
189         }
190
191         final String requestId = id.asText();
192         if (Strings.isNullOrEmpty(requestId)) {
193             LOG.debug("Ignoring equest with null or empty id field: {} {}", jsonMethod, params);
194             return;
195         }
196
197         LOG.trace("Request : {} {} {}", id, jsonMethod, params);
198
199         final String method = jsonMethod.asText();
200         switch (method) {
201             case "echo":
202                 // Echo does not need any special processing. hence handling it internally.
203                 sendEmptyResponse(requestId);
204                 return;
205             case "list_dbs":
206                 // send a null response for list_dbs
207                 sendEmptyResponse(requestId);
208                 return;
209             default:
210                 if (!handleCallbackRequest(currentCallback, requestId, method, params)) {
211                     LOG.error("No handler for Request : {} on {}", jsonRequest, nettyChannel);
212                 }
213         }
214
215     }
216
217     private boolean handleCallbackRequest(final Callback callback, final String requestId, final String method,
218             final JsonNode params) {
219         if (callback == null) {
220             // No callback registered: bail out
221             return false;
222         }
223
224         switch (method) {
225             case "update": {
226                 final UpdateNotification arg;
227                 try {
228                     arg = OBJECT_MAPPER.convertValue(params, UpdateNotification.class);
229                 } catch (IllegalArgumentException e) {
230                     return reportedMalformedParameters(requestId, e);
231                 }
232
233                 callback.update(nettyChannel, arg);
234                 return true;
235             }
236             case "locked": {
237                 final List<String> arg;
238                 try {
239                     arg = OBJECT_MAPPER.convertValue(params, JT_LIST_STRING);
240                 } catch (IllegalArgumentException e) {
241                     return reportedMalformedParameters(requestId, e);
242                 }
243
244                 callback.locked(nettyChannel, arg);
245                 return true;
246             }
247             case "stolen": {
248                 final List<String> arg;
249                 try {
250                     arg = OBJECT_MAPPER.convertValue(params, JT_LIST_STRING);
251                 } catch (IllegalArgumentException e) {
252                     return reportedMalformedParameters(requestId, e);
253                 }
254
255                 callback.stolen(nettyChannel, arg);
256                 return true;
257             }
258             default:
259                 return false;
260         }
261     }
262
263     private boolean reportedMalformedParameters(final String requestId, final Exception cause) {
264         LOG.debug("Request {} failed to map parameters", requestId, cause);
265         sendErrorResponse(requestId, cause.getMessage());
266         return true;
267     }
268
269     private void sendEmptyResponse(final String requestId) {
270         sendErrorResponse(requestId, null);
271     }
272
273     private void sendErrorResponse(final String requestId, final String error) {
274         JsonRpc10Response response = new JsonRpc10Response(requestId);
275         response.setError(error);
276
277         final String jsonString;
278         try {
279             jsonString = OBJECT_MAPPER.writeValueAsString(response);
280         } catch (JsonProcessingException e) {
281             LOG.error("Exception while processing JSON response {}", response, e);
282             return;
283         }
284
285         nettyChannel.writeAndFlush(jsonString);
286     }
287
288     private void handleResponse(final JsonNode response, final JsonNode result) {
289         LOG.trace("Response : {}", response);
290         final String requestId = response.get("id").asText();
291         final CallContext returnCtxt = methodContext.remove(requestId);
292         if (returnCtxt == null) {
293             LOG.debug("Ignoring response for unknown request {}", requestId);
294             return;
295         }
296
297         final JsonNode error = response.get("error");
298         if (error != null && !error.isNull()) {
299             LOG.error("Request {} failed with error {}", requestId, error);
300         }
301
302         final Object mappedResult = OBJECT_MAPPER.convertValue(result, returnCtxt.resultType);
303         if (!returnCtxt.future.set(mappedResult)) {
304             LOG.debug("Request {} did not accept result {}", requestId, mappedResult);
305         }
306     }
307
308     private <T> ListenableFuture<T> sendRequest(final JsonRpc10Request request, final JavaType resultType) {
309         final String requestString;
310         try {
311             requestString = OBJECT_MAPPER.writeValueAsString(request);
312         } catch (JsonProcessingException e) {
313             return Futures.immediateFailedFuture(e);
314         }
315         LOG.trace("getClient Request : {}", requestString);
316
317         final SettableFuture<T> sf = SettableFuture.create();
318         methodContext.put(request.getId(), new CallContext(resultType, sf));
319         FUTURE_REAPER_SERVICE.schedule(() -> {
320             CallContext cc = methodContext.remove(request.getId());
321             if (cc != null) {
322                 if (cc.future.isDone() || cc.future.isCancelled()) {
323                     return;
324                 }
325                 cc.future.cancel(false);
326             }
327         }, reaperInterval, TimeUnit.MILLISECONDS);
328
329         nettyChannel.writeAndFlush(requestString);
330         return sf;
331     }
332
333     private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method) {
334         return sendRequest(createRequest(method), resultType);
335     }
336
337     private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final List params) {
338         final JsonRpc10Request request = createRequest(method);
339         request.setParams(params);
340         return sendRequest(request, resultType);
341     }
342
343     private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final Params params) {
344         final JsonRpc10Request request = createRequest(method);
345         request.setParams(params.params());
346         return sendRequest(request, resultType);
347     }
348
349     private static JsonRpc10Request createRequest(final String method) {
350         JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
351         request.setMethod(method);
352         return request;
353     }
354 }