From: Robert Varga Date: Thu, 28 Nov 2019 13:39:59 +0000 (+0100) Subject: Turn JsonRpcEndpoint into a proper OvsdbRPC implementation X-Git-Tag: release/neon-sr3~24 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=1c80d0ab09d0841e433b39542198b020ce4c82cd;p=ovsdb.git Turn JsonRpcEndpoint into a proper OvsdbRPC implementation The Netty wiring and invocation handling is quite arcane here, relying mostly on reflection to perform dispatch. There is only a single direct user JsonRpcEndpoint, which expects a OvsdbRPC to be exposed from it. Furthermore it is obvious that JsonRpcEndpoint has a 1:1 correspondence with JsonRpcServiceBinderHandler and that the invocation context used to pass request processing relies on Netty Channel instance -- to which the JsonRpcEndpoint is already bound. This integrates JsonRpcServiceBinderHandler into JsonRpcEndpoint, thus making the fusion obvious, eliminating the need to pass Object context between the two. Furthermore we eliminate JsonRpcEndpoint.getClient() and opt to make JsonRpcEndpoint implement OvsdbRPC, providing a very clear connection between the two and completely eliminating use of reflection in the process. Change-Id: I1cfcf784330fdf6e30a55fcaff6f5bd716d987e3 Signed-off-by: Robert Varga (cherry picked from commit fc3bf4c16d4bd71512ca991fcfc95d804c73c05e) --- diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java index b968c2781..a79bc74ac 100644 --- a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java +++ b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java @@ -66,8 +66,6 @@ import org.opendaylight.ovsdb.lib.OvsdbConnectionListener; import org.opendaylight.ovsdb.lib.jsonrpc.ExceptionHandler; import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder; import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint; -import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler; -import org.opendaylight.ovsdb.lib.message.OvsdbRPC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,17 +230,13 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection { private static OvsdbClient getChannelClient(final Channel channel, final ConnectionType type, final SocketConnectionType socketConnType) { - JsonRpcEndpoint factory = new JsonRpcEndpoint(OBJECT_MAPPER, channel); - JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory); - binderHandler.setContext(channel); - channel.pipeline().addLast(binderHandler); + JsonRpcEndpoint endpoint = new JsonRpcEndpoint(OBJECT_MAPPER, channel); + channel.pipeline().addLast(endpoint); - OvsdbRPC rpc = factory.getClient(channel, OvsdbRPC.class); - OvsdbClientImpl client = new OvsdbClientImpl(rpc, channel, type, socketConnType); + OvsdbClientImpl client = new OvsdbClientImpl(endpoint, channel, type, socketConnType); client.setConnectionPublished(true); CONNECTIONS.put(client, channel); - ChannelFuture closeFuture = channel.closeFuture(); - closeFuture.addListener(new ChannelConnectionHandler(client)); + channel.closeFuture().addListener(new ChannelConnectionHandler(client)); return client; } diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcEndpoint.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcEndpoint.java index 4922ac34e..44ac28138 100644 --- a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcEndpoint.java +++ b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcEndpoint.java @@ -5,24 +5,23 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.ovsdb.lib.jsonrpc; +import static java.util.Objects.requireNonNull; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; -import com.google.common.reflect.Invokable; -import com.google.common.reflect.Reflection; -import com.google.common.reflect.TypeToken; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.Channel; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.HashMap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.List; import java.util.Map; import java.util.UUID; @@ -31,13 +30,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.opendaylight.ovsdb.lib.error.UnexpectedResultException; import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException; import org.opendaylight.ovsdb.lib.message.OvsdbRPC; +import org.opendaylight.ovsdb.lib.message.Response; +import org.opendaylight.ovsdb.lib.message.TransactBuilder; +import org.opendaylight.ovsdb.lib.message.UpdateNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JsonRpcEndpoint { +public class JsonRpcEndpoint extends ChannelInboundHandlerAdapter implements OvsdbRPC { private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class); private static final int REAPER_THREADS = 3; @@ -47,189 +48,304 @@ public class JsonRpcEndpoint { private static final ScheduledExecutorService FUTURE_REAPER_SERVICE = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY); + private static final JavaType JT_OBJECT = TypeFactory.defaultInstance().constructType(Object.class); + private static final JavaType JT_JSON_NODE = TypeFactory.defaultInstance().constructType(JsonNode.class); + private static final JavaType JT_LIST_JSON_NODE = TypeFactory.defaultInstance().constructParametricType( + List.class, JsonNode.class); + private static final JavaType JT_LIST_STRING = TypeFactory.defaultInstance().constructParametricType( + List.class, String.class); + private static int reaperInterval = 1000; - public static class CallContext { - Method method; - JsonRpc10Request request; - SettableFuture future; + private static final class CallContext { + final JavaType resultType; + final SettableFuture future; - public CallContext(JsonRpc10Request request, Method method, SettableFuture future) { - this.method = method; - this.request = request; + CallContext(final JavaType resultType, final SettableFuture future) { + this.resultType = resultType; this.future = future; } + } - public Method getMethod() { - return method; - } + private final Map methodContext = new ConcurrentHashMap<>(); + private final ObjectMapper objectMapper; + private final Channel nettyChannel; - public JsonRpc10Request getRequest() { - return request; - } + private volatile Callback currentCallback = null; - public SettableFuture getFuture() { - return future; - } + public JsonRpcEndpoint(final ObjectMapper objectMapper, final Channel channel) { + this.objectMapper = requireNonNull(objectMapper); + this.nettyChannel = requireNonNull(channel); } - ObjectMapper objectMapper; - Channel nettyChannel; - Map methodContext = new ConcurrentHashMap<>(); - Map requestCallbacks = new HashMap<>(); + // FIXME: the reaper service should probably be split out + public static void setReaperInterval(final int interval) { + reaperInterval = interval; + LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval); + } - public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel) { - this.objectMapper = objectMapper; - this.nettyChannel = channel; + public static void close() { + LOG.info("Shutting down reaper executor service"); + FUTURE_REAPER_SERVICE.shutdownNow(); } - public T getClient(final Object context, Class klazz) { + @Override + public ListenableFuture get_schema(final List dbNames) { + return sendRequest(JT_JSON_NODE, "get_schema", dbNames); + } - return Reflection.newProxy(klazz, (proxy, method, args) -> { - if (method.getName().equals(OvsdbRPC.REGISTER_CALLBACK_METHOD)) { - if (args == null || args.length != 1 || !(args[0] instanceof OvsdbRPC.Callback)) { - return false; - } - requestCallbacks.put(context, (OvsdbRPC.Callback)args[0]); - return true; - } + @Override + public ListenableFuture> echo() { + return sendRequest(JT_LIST_STRING, "echo"); + } - JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString()); - request.setMethod(method.getName()); + @Override + public ListenableFuture monitor(final Params equest) { + return sendRequest(JT_JSON_NODE, "monitor", equest); + } - if (args != null && args.length != 0) { - List params = null; + @Override + public ListenableFuture> list_dbs() { + return sendRequest(JT_LIST_STRING, "list_dbs"); + } - if (args.length == 1) { - if (args[0] instanceof Params) { - params = ((Params) args[0]).params(); - } else if (args[0] instanceof List) { - params = (List) args[0]; - } + @Override + public ListenableFuture> transact(final TransactBuilder transact) { + return sendRequest(JT_LIST_JSON_NODE, "transact", transact); + } - if (params == null) { - throw new UnsupportedArgumentException("do not understand this argument yet"); - } - request.setParams(params); - } - } + @Override + public ListenableFuture cancel(final String id) { + // FIXME: reflection-based access did not handle this, this keeps equivalent functionality + throw new UnsupportedArgumentException("do not understand this argument yet"); + } - String requestString = objectMapper.writeValueAsString(request); - LOG.trace("getClient Request : {}", requestString); - - SettableFuture sf = SettableFuture.create(); - methodContext.put(request.getId(), new CallContext(request, method, sf)); - FUTURE_REAPER_SERVICE.schedule(() -> { - CallContext cc = methodContext.remove(request.getId()); - if (cc != null) { - if (cc.getFuture().isDone() || cc.getFuture().isCancelled()) { - return; - } - cc.getFuture().cancel(false); - } - }, reaperInterval, TimeUnit.MILLISECONDS); + @Override + public ListenableFuture monitor_cancel(final Params jsonValue) { + return sendRequest(JT_JSON_NODE, "monitor_cancel", jsonValue); + } - nettyChannel.writeAndFlush(requestString); + @Override + public ListenableFuture lock(final List id) { + return sendRequest(JT_OBJECT, "lock", id); + } + + @Override + public ListenableFuture steal(final List id) { + return sendRequest(JT_OBJECT, "steal", id); + } - return sf; + @Override + public ListenableFuture unlock(final List id) { + return sendRequest(JT_OBJECT, "unlock", id); + } + + @Override + public boolean registerCallback(final Callback callback) { + if (callback == null) { + return false; } - ); + this.currentCallback = callback; + return true; } - public void processResult(JsonNode response) throws NoSuchMethodException { + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + if (!(msg instanceof JsonNode)) { + LOG.debug("Unexpected message {}, closing channel {}", msg, nettyChannel); + ctx.channel().close(); + return; + } - LOG.trace("Response : {}", response); - CallContext returnCtxt = methodContext.remove(response.get("id").asText()); - if (returnCtxt == null) { + final JsonNode jsonNode = (JsonNode) msg; + final JsonNode result = jsonNode.get("result"); + if (result != null) { + handleResponse(jsonNode, result); + return; + } + final JsonNode method = jsonNode.get("method"); + if (method != null && !method.isNull()) { + handleRequest(jsonNode, method); return; } - if (ListenableFuture.class == returnCtxt.getMethod().getReturnType()) { - TypeToken retType = TypeToken.of( - returnCtxt.getMethod().getGenericReturnType()) - .resolveType(ListenableFuture.class.getMethod("get").getGenericReturnType()); - JavaType javaType = TypeFactory.defaultInstance().constructType(retType.getType()); - - JsonNode result = response.get("result"); - Object result1 = objectMapper.convertValue(result, javaType); - JsonNode error = response.get("error"); - if (error != null && !error.isNull()) { - LOG.error("Error : {}", error); - } + LOG.debug("Ignoring message {} on channel {}", jsonNode, nettyChannel); + } - returnCtxt.getFuture().set(result1); + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.flush(); + } - } else { - throw new UnexpectedResultException("Don't know how to handle this"); + private void handleRequest(final JsonNode jsonRequest, final JsonNode jsonMethod) { + final JsonNode id = jsonRequest.get("id"); + final JsonNode params = jsonRequest.get("params"); + if (id == null) { + LOG.debug("Ignoring request with non-existent id field: {} {}", jsonMethod, params); + return; } + + final String requestId = id.asText(); + if (Strings.isNullOrEmpty(requestId)) { + LOG.debug("Ignoring equest with null or empty id field: {} {}", jsonMethod, params); + return; + } + + LOG.trace("Request : {} {} {}", id, jsonMethod, params); + + final String method = jsonMethod.asText(); + switch (method) { + case "echo": + // Echo does not need any special processing. hence handling it internally. + sendEmptyResponse(requestId); + return; + case "list_dbs": + // send a null response for list_dbs + sendEmptyResponse(requestId); + return; + default: + if (!handleCallbackRequest(currentCallback, requestId, method, params)) { + LOG.error("No handler for Request : {} on {}", jsonRequest, nettyChannel); + } + } + } - public void processRequest(Object context, JsonNode requestJson) { - JsonRpc10Request request = new JsonRpc10Request(requestJson.get("id").asText()); - request.setMethod(requestJson.get("method").asText()); - LOG.trace("Request : {} {} {}", requestJson.get("id"), requestJson.get("method"), - requestJson.get("params")); - OvsdbRPC.Callback callback = requestCallbacks.get(context); - if (callback != null) { - Method[] methods = callback.getClass().getDeclaredMethods(); - for (Method method : methods) { - if (method.getName().equals(request.getMethod())) { - Class[] parameters = method.getParameterTypes(); - JsonNode params = requestJson.get("params"); - Object param = objectMapper.convertValue(params, parameters[1]); - try { - Invokable from = Invokable.from(method); - from.setAccessible(true); - from.invoke(callback, context, param); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.error("Unable to invoke callback {}", method.getName(), e); - } - return; + private boolean handleCallbackRequest(final Callback callback, final String requestId, final String method, + final JsonNode params) { + if (callback == null) { + // No callback registered: bail out + return false; + } + + switch (method) { + case "update": { + final UpdateNotification arg; + try { + arg = objectMapper.convertValue(params, UpdateNotification.class); + } catch (IllegalArgumentException e) { + return reportedMalformedParameters(requestId, e); } + + callback.update(nettyChannel, arg); + return true; } - } + case "locked": { + final List arg; + try { + arg = objectMapper.convertValue(params, JT_LIST_STRING); + } catch (IllegalArgumentException e) { + return reportedMalformedParameters(requestId, e); + } - // Echo dont need any special processing. hence handling it internally. + callback.locked(nettyChannel, arg); + return true; + } + case "stolen": { + final List arg; + try { + arg = objectMapper.convertValue(params, JT_LIST_STRING); + } catch (IllegalArgumentException e) { + return reportedMalformedParameters(requestId, e); + } - if (request.getMethod().equals("echo")) { - JsonRpc10Response response = new JsonRpc10Response(request.getId()); - response.setError(null); - try { - String jsonString = objectMapper.writeValueAsString(response); - nettyChannel.writeAndFlush(jsonString); - } catch (JsonProcessingException e) { - LOG.error("Exception while processing JSON response {}", response, e); + callback.stolen(nettyChannel, arg); + return true; } + default: + return false; + } + } + + private boolean reportedMalformedParameters(final String requestId, final Exception cause) { + LOG.debug("Request {} failed to map parameters", requestId, cause); + sendErrorResponse(requestId, cause.getMessage()); + return true; + } + + private void sendEmptyResponse(final String requestId) { + sendErrorResponse(requestId, null); + } + + private void sendErrorResponse(final String requestId, final String error) { + JsonRpc10Response response = new JsonRpc10Response(requestId); + response.setError(error); + + final String jsonString; + try { + jsonString = objectMapper.writeValueAsString(response); + } catch (JsonProcessingException e) { + LOG.error("Exception while processing JSON response {}", response, e); return; } - // send a null response for list_dbs - if (request.getMethod().equals("list_dbs")) { - JsonRpc10Response response = new JsonRpc10Response(request.getId()); - response.setError(null); - try { - String jsonString = objectMapper.writeValueAsString(response); - nettyChannel.writeAndFlush(jsonString); - } catch (JsonProcessingException e) { - LOG.error("Exception while processing JSON response {}", response, e); - } + nettyChannel.writeAndFlush(jsonString); + } + + private void handleResponse(final JsonNode response, final JsonNode result) { + LOG.trace("Response : {}", response); + final String requestId = response.get("id").asText(); + final CallContext returnCtxt = methodContext.remove(requestId); + if (returnCtxt == null) { + LOG.debug("Ignoring response for unknown request {}", requestId); return; } - LOG.error("No handler for Request : {} on {}", requestJson, context); + final JsonNode error = response.get("error"); + if (error != null && !error.isNull()) { + LOG.error("Request {} failed with error {}", requestId, error); + } + + final Object mappedResult = objectMapper.convertValue(result, returnCtxt.resultType); + if (!returnCtxt.future.set(mappedResult)) { + LOG.debug("Request {} did not accept result {}", requestId, mappedResult); + } + } + + private ListenableFuture sendRequest(final JsonRpc10Request request, final JavaType resultType) { + final String requestString; + try { + requestString = objectMapper.writeValueAsString(request); + } catch (JsonProcessingException e) { + return Futures.immediateFailedFuture(e); + } + LOG.trace("getClient Request : {}", requestString); + + final SettableFuture sf = SettableFuture.create(); + methodContext.put(request.getId(), new CallContext(resultType, sf)); + FUTURE_REAPER_SERVICE.schedule(() -> { + CallContext cc = methodContext.remove(request.getId()); + if (cc != null) { + if (cc.future.isDone() || cc.future.isCancelled()) { + return; + } + cc.future.cancel(false); + } + }, reaperInterval, TimeUnit.MILLISECONDS); + + nettyChannel.writeAndFlush(requestString); + return sf; } - public Map getMethodContext() { - return methodContext; + private ListenableFuture sendRequest(final JavaType resultType, final String method) { + return sendRequest(createRequest(method), resultType); } - public static void setReaperInterval(int interval) { - reaperInterval = interval; - LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval); + private ListenableFuture sendRequest(final JavaType resultType, final String method, final List params) { + final JsonRpc10Request request = createRequest(method); + request.setParams(params); + return sendRequest(request, resultType); } - public static void close() { - LOG.info("Shutting down reaper executor service"); - FUTURE_REAPER_SERVICE.shutdownNow(); + private ListenableFuture sendRequest(final JavaType resultType, final String method, final Params params) { + final JsonRpc10Request request = createRequest(method); + request.setParams(params.params()); + return sendRequest(request, resultType); + } + + private static JsonRpc10Request createRequest(final String method) { + JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString()); + request.setMethod(method); + return request; } } diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcServiceBinderHandler.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcServiceBinderHandler.java deleted file mode 100644 index b58931442..000000000 --- a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcServiceBinderHandler.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2013, 2015 EBay Software Foundation and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.ovsdb.lib.jsonrpc; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Strings; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JsonRpcServiceBinderHandler extends ChannelInboundHandlerAdapter { - private static final Logger LOG = LoggerFactory.getLogger(JsonRpcServiceBinderHandler.class); - JsonRpcEndpoint factory = null; - Object context = null; - - public Object getContext() { - return context; - } - - public void setContext(Object context) { - this.context = context; - } - - public JsonRpcServiceBinderHandler(JsonRpcEndpoint factory) { - this.factory = factory; - } - - @Override - public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - if (msg instanceof JsonNode) { - JsonNode jsonNode = (JsonNode) msg; - if (jsonNode.has("result")) { - try { - factory.processResult(jsonNode); - } catch (NoSuchMethodException e) { - /* - ChannelRead is a method invoked during Netty message receive event. - The only sane thing we can do is to print a meaningful error message. - */ - LOG.error("NoSuchMethodException when handling {}", msg, e); - } - } else if (jsonNode.hasNonNull("method")) { - if (jsonNode.has("id") && !Strings.isNullOrEmpty(jsonNode.get("id").asText())) { - factory.processRequest(context, jsonNode); - } else { - LOG.debug("Request with null or empty id field: {} {}", jsonNode.get("method"), - jsonNode.get("params")); - } - } - - return; - } - ctx.channel().close(); - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - ctx.flush(); - } -}