Turn JsonRpcEndpoint into a proper OvsdbRPC implementation 27/86127/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 28 Nov 2019 13:39:59 +0000 (14:39 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 3 Dec 2019 09:46:48 +0000 (10:46 +0100)
The Netty wiring and invocation handling is quite arcane here, relying
mostly on reflection to perform dispatch.

There is only a single direct user JsonRpcEndpoint, which expects
a OvsdbRPC to be exposed from it. Furthermore it is obvious that
JsonRpcEndpoint has a 1:1 correspondence with JsonRpcServiceBinderHandler
and that the invocation context used to pass request processing relies
on Netty Channel instance -- to which the JsonRpcEndpoint is already
bound.

This integrates JsonRpcServiceBinderHandler into JsonRpcEndpoint,
thus making the fusion obvious, eliminating the need to pass Object
context between the two.

Furthermore we eliminate JsonRpcEndpoint.getClient() and opt to make
JsonRpcEndpoint implement OvsdbRPC, providing a very clear connection
between the two and completely eliminating use of reflection in the
process.

Change-Id: I1cfcf784330fdf6e30a55fcaff6f5bd716d987e3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit fc3bf4c16d4bd71512ca991fcfc95d804c73c05e)

library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcEndpoint.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcServiceBinderHandler.java [deleted file]

index b968c278154411e68b6349de33a5acafebf6c5c6..a79bc74ac8678845d66cc5dd2268f9d929f2a40b 100644 (file)
@@ -66,8 +66,6 @@ import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
 import org.opendaylight.ovsdb.lib.jsonrpc.ExceptionHandler;
 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
-import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -232,17 +230,13 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static OvsdbClient getChannelClient(final Channel channel, final ConnectionType type,
             final SocketConnectionType socketConnType) {
 
-        JsonRpcEndpoint factory = new JsonRpcEndpoint(OBJECT_MAPPER, channel);
-        JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
-        binderHandler.setContext(channel);
-        channel.pipeline().addLast(binderHandler);
+        JsonRpcEndpoint endpoint = new JsonRpcEndpoint(OBJECT_MAPPER, channel);
+        channel.pipeline().addLast(endpoint);
 
-        OvsdbRPC rpc = factory.getClient(channel, OvsdbRPC.class);
-        OvsdbClientImpl client = new OvsdbClientImpl(rpc, channel, type, socketConnType);
+        OvsdbClientImpl client = new OvsdbClientImpl(endpoint, channel, type, socketConnType);
         client.setConnectionPublished(true);
         CONNECTIONS.put(client, channel);
-        ChannelFuture closeFuture = channel.closeFuture();
-        closeFuture.addListener(new ChannelConnectionHandler(client));
+        channel.closeFuture().addListener(new ChannelConnectionHandler(client));
         return client;
     }
 
index 4922ac34efa6e158f187a59acea8ba9382497a10..44ac281383fc76868474c42ed87025fe34a9bd58 100644 (file)
@@ -5,24 +5,23 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.lib.jsonrpc;
 
+import static java.util.Objects.requireNonNull;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.type.TypeFactory;
-import com.google.common.reflect.Invokable;
-import com.google.common.reflect.Reflection;
-import com.google.common.reflect.TypeToken;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.Channel;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -31,13 +30,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.ovsdb.lib.error.UnexpectedResultException;
 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
+import org.opendaylight.ovsdb.lib.message.Response;
+import org.opendaylight.ovsdb.lib.message.TransactBuilder;
+import org.opendaylight.ovsdb.lib.message.UpdateNotification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JsonRpcEndpoint {
+public class JsonRpcEndpoint extends ChannelInboundHandlerAdapter implements OvsdbRPC {
 
     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
     private static final int REAPER_THREADS = 3;
@@ -47,189 +48,304 @@ public class JsonRpcEndpoint {
     private static final ScheduledExecutorService FUTURE_REAPER_SERVICE
             = Executors.newScheduledThreadPool(REAPER_THREADS, FUTURE_REAPER_THREAD_FACTORY);
 
+    private static final JavaType JT_OBJECT = TypeFactory.defaultInstance().constructType(Object.class);
+    private static final JavaType JT_JSON_NODE = TypeFactory.defaultInstance().constructType(JsonNode.class);
+    private static final JavaType JT_LIST_JSON_NODE = TypeFactory.defaultInstance().constructParametricType(
+        List.class, JsonNode.class);
+    private static final JavaType JT_LIST_STRING = TypeFactory.defaultInstance().constructParametricType(
+        List.class, String.class);
+
     private static int reaperInterval = 1000;
 
-    public static class CallContext {
-        Method method;
-        JsonRpc10Request request;
-        SettableFuture<Object> future;
+    private static final class CallContext {
+        final JavaType resultType;
+        final SettableFuture future;
 
-        public CallContext(JsonRpc10Request request, Method method, SettableFuture<Object> future) {
-            this.method = method;
-            this.request = request;
+        CallContext(final JavaType resultType, final SettableFuture future) {
+            this.resultType = resultType;
             this.future = future;
         }
+    }
 
-        public Method getMethod() {
-            return method;
-        }
+    private final Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
+    private final ObjectMapper objectMapper;
+    private final Channel nettyChannel;
 
-        public JsonRpc10Request getRequest() {
-            return request;
-        }
+    private volatile Callback currentCallback = null;
 
-        public SettableFuture<Object> getFuture() {
-            return future;
-        }
+    public JsonRpcEndpoint(final ObjectMapper objectMapper, final Channel channel) {
+        this.objectMapper = requireNonNull(objectMapper);
+        this.nettyChannel = requireNonNull(channel);
     }
 
-    ObjectMapper objectMapper;
-    Channel nettyChannel;
-    Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
-    Map<Object, OvsdbRPC.Callback> requestCallbacks = new HashMap<>();
+    // FIXME: the reaper service should probably be split out
+    public static void setReaperInterval(final int interval) {
+        reaperInterval = interval;
+        LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
+    }
 
-    public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel) {
-        this.objectMapper = objectMapper;
-        this.nettyChannel = channel;
+    public static void close() {
+        LOG.info("Shutting down reaper executor service");
+        FUTURE_REAPER_SERVICE.shutdownNow();
     }
 
-    public <T> T getClient(final Object context, Class<T> klazz) {
+    @Override
+    public ListenableFuture<JsonNode> get_schema(final List<String> dbNames) {
+        return sendRequest(JT_JSON_NODE, "get_schema", dbNames);
+    }
 
-        return Reflection.newProxy(klazz, (proxy, method, args) -> {
-            if (method.getName().equals(OvsdbRPC.REGISTER_CALLBACK_METHOD)) {
-                if (args == null || args.length != 1 || !(args[0] instanceof OvsdbRPC.Callback)) {
-                    return false;
-                }
-                requestCallbacks.put(context, (OvsdbRPC.Callback)args[0]);
-                return true;
-            }
+    @Override
+    public ListenableFuture<List<String>> echo() {
+        return sendRequest(JT_LIST_STRING, "echo");
+    }
 
-            JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
-            request.setMethod(method.getName());
+    @Override
+    public ListenableFuture<JsonNode> monitor(final Params equest) {
+        return sendRequest(JT_JSON_NODE, "monitor", equest);
+    }
 
-            if (args != null && args.length != 0) {
-                List<Object> params = null;
+    @Override
+    public ListenableFuture<List<String>> list_dbs() {
+        return sendRequest(JT_LIST_STRING, "list_dbs");
+    }
 
-                if (args.length == 1) {
-                    if (args[0] instanceof Params) {
-                        params = ((Params) args[0]).params();
-                    } else if (args[0] instanceof List) {
-                        params = (List<Object>) args[0];
-                    }
+    @Override
+    public ListenableFuture<List<JsonNode>> transact(final TransactBuilder transact) {
+        return sendRequest(JT_LIST_JSON_NODE, "transact", transact);
+    }
 
-                    if (params == null) {
-                        throw new UnsupportedArgumentException("do not understand this argument yet");
-                    }
-                    request.setParams(params);
-                }
-            }
+    @Override
+    public ListenableFuture<Response> cancel(final String id) {
+        // FIXME: reflection-based access did not handle this, this keeps equivalent functionality
+        throw new UnsupportedArgumentException("do not understand this argument yet");
+    }
 
-            String requestString = objectMapper.writeValueAsString(request);
-            LOG.trace("getClient Request : {}", requestString);
-
-            SettableFuture<Object> sf = SettableFuture.create();
-            methodContext.put(request.getId(), new CallContext(request, method, sf));
-            FUTURE_REAPER_SERVICE.schedule(() -> {
-                CallContext cc = methodContext.remove(request.getId());
-                if (cc != null) {
-                    if (cc.getFuture().isDone() || cc.getFuture().isCancelled()) {
-                        return;
-                    }
-                    cc.getFuture().cancel(false);
-                }
-            }, reaperInterval, TimeUnit.MILLISECONDS);
+    @Override
+    public ListenableFuture<JsonNode> monitor_cancel(final Params jsonValue) {
+        return sendRequest(JT_JSON_NODE, "monitor_cancel", jsonValue);
+    }
 
-            nettyChannel.writeAndFlush(requestString);
+    @Override
+    public ListenableFuture<Object> lock(final List<String> id) {
+        return sendRequest(JT_OBJECT, "lock", id);
+    }
+
+    @Override
+    public ListenableFuture<Object> steal(final List<String> id) {
+        return sendRequest(JT_OBJECT, "steal", id);
+    }
 
-            return sf;
+    @Override
+    public ListenableFuture<Object> unlock(final List<String> id) {
+        return sendRequest(JT_OBJECT, "unlock", id);
+    }
+
+    @Override
+    public boolean registerCallback(final Callback callback) {
+        if (callback == null) {
+            return false;
         }
-        );
+        this.currentCallback = callback;
+        return true;
     }
 
-    public void processResult(JsonNode response) throws NoSuchMethodException {
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+        if (!(msg instanceof JsonNode)) {
+            LOG.debug("Unexpected message {}, closing channel {}", msg, nettyChannel);
+            ctx.channel().close();
+            return;
+        }
 
-        LOG.trace("Response : {}", response);
-        CallContext returnCtxt = methodContext.remove(response.get("id").asText());
-        if (returnCtxt == null) {
+        final JsonNode jsonNode = (JsonNode) msg;
+        final JsonNode result = jsonNode.get("result");
+        if (result != null) {
+            handleResponse(jsonNode, result);
+            return;
+        }
+        final JsonNode method = jsonNode.get("method");
+        if (method != null && !method.isNull()) {
+            handleRequest(jsonNode, method);
             return;
         }
 
-        if (ListenableFuture.class == returnCtxt.getMethod().getReturnType()) {
-            TypeToken<?> retType = TypeToken.of(
-                    returnCtxt.getMethod().getGenericReturnType())
-                    .resolveType(ListenableFuture.class.getMethod("get").getGenericReturnType());
-            JavaType javaType =  TypeFactory.defaultInstance().constructType(retType.getType());
-
-            JsonNode result = response.get("result");
-            Object result1 = objectMapper.convertValue(result, javaType);
-            JsonNode error = response.get("error");
-            if (error != null && !error.isNull()) {
-                LOG.error("Error : {}", error);
-            }
+        LOG.debug("Ignoring message {} on channel {}", jsonNode, nettyChannel);
+    }
 
-            returnCtxt.getFuture().set(result1);
+    @Override
+    public void channelReadComplete(final ChannelHandlerContext ctx) {
+        ctx.flush();
+    }
 
-        } else {
-            throw new UnexpectedResultException("Don't know how to handle this");
+    private void handleRequest(final JsonNode jsonRequest, final JsonNode jsonMethod) {
+        final JsonNode id = jsonRequest.get("id");
+        final JsonNode params = jsonRequest.get("params");
+        if (id == null) {
+            LOG.debug("Ignoring request with non-existent id field: {} {}", jsonMethod, params);
+            return;
         }
+
+        final String requestId = id.asText();
+        if (Strings.isNullOrEmpty(requestId)) {
+            LOG.debug("Ignoring equest with null or empty id field: {} {}", jsonMethod, params);
+            return;
+        }
+
+        LOG.trace("Request : {} {} {}", id, jsonMethod, params);
+
+        final String method = jsonMethod.asText();
+        switch (method) {
+            case "echo":
+                // Echo does not need any special processing. hence handling it internally.
+                sendEmptyResponse(requestId);
+                return;
+            case "list_dbs":
+                // send a null response for list_dbs
+                sendEmptyResponse(requestId);
+                return;
+            default:
+                if (!handleCallbackRequest(currentCallback, requestId, method, params)) {
+                    LOG.error("No handler for Request : {} on {}", jsonRequest, nettyChannel);
+                }
+        }
+
     }
 
-    public void processRequest(Object context, JsonNode requestJson) {
-        JsonRpc10Request request = new JsonRpc10Request(requestJson.get("id").asText());
-        request.setMethod(requestJson.get("method").asText());
-        LOG.trace("Request : {} {} {}", requestJson.get("id"), requestJson.get("method"),
-                requestJson.get("params"));
-        OvsdbRPC.Callback callback = requestCallbacks.get(context);
-        if (callback != null) {
-            Method[] methods = callback.getClass().getDeclaredMethods();
-            for (Method method : methods) {
-                if (method.getName().equals(request.getMethod())) {
-                    Class<?>[] parameters = method.getParameterTypes();
-                    JsonNode params = requestJson.get("params");
-                    Object param = objectMapper.convertValue(params, parameters[1]);
-                    try {
-                        Invokable from = Invokable.from(method);
-                        from.setAccessible(true);
-                        from.invoke(callback, context, param);
-                    } catch (IllegalAccessException | InvocationTargetException e) {
-                        LOG.error("Unable to invoke callback {}", method.getName(), e);
-                    }
-                    return;
+    private boolean handleCallbackRequest(final Callback callback, final String requestId, final String method,
+            final JsonNode params) {
+        if (callback == null) {
+            // No callback registered: bail out
+            return false;
+        }
+
+        switch (method) {
+            case "update": {
+                final UpdateNotification arg;
+                try {
+                    arg = objectMapper.convertValue(params, UpdateNotification.class);
+                } catch (IllegalArgumentException e) {
+                    return reportedMalformedParameters(requestId, e);
                 }
+
+                callback.update(nettyChannel, arg);
+                return true;
             }
-        }
+            case "locked": {
+                final List<String> arg;
+                try {
+                    arg = objectMapper.convertValue(params, JT_LIST_STRING);
+                } catch (IllegalArgumentException e) {
+                    return reportedMalformedParameters(requestId, e);
+                }
 
-        // Echo dont need any special processing. hence handling it internally.
+                callback.locked(nettyChannel, arg);
+                return true;
+            }
+            case "stolen": {
+                final List<String> arg;
+                try {
+                    arg = objectMapper.convertValue(params, JT_LIST_STRING);
+                } catch (IllegalArgumentException e) {
+                    return reportedMalformedParameters(requestId, e);
+                }
 
-        if (request.getMethod().equals("echo")) {
-            JsonRpc10Response response = new JsonRpc10Response(request.getId());
-            response.setError(null);
-            try {
-                String jsonString = objectMapper.writeValueAsString(response);
-                nettyChannel.writeAndFlush(jsonString);
-            } catch (JsonProcessingException e) {
-                LOG.error("Exception while processing JSON response {}", response, e);
+                callback.stolen(nettyChannel, arg);
+                return true;
             }
+            default:
+                return false;
+        }
+    }
+
+    private boolean reportedMalformedParameters(final String requestId, final Exception cause) {
+        LOG.debug("Request {} failed to map parameters", requestId, cause);
+        sendErrorResponse(requestId, cause.getMessage());
+        return true;
+    }
+
+    private void sendEmptyResponse(final String requestId) {
+        sendErrorResponse(requestId, null);
+    }
+
+    private void sendErrorResponse(final String requestId, final String error) {
+        JsonRpc10Response response = new JsonRpc10Response(requestId);
+        response.setError(error);
+
+        final String jsonString;
+        try {
+            jsonString = objectMapper.writeValueAsString(response);
+        } catch (JsonProcessingException e) {
+            LOG.error("Exception while processing JSON response {}", response, e);
             return;
         }
 
-        // send a null response for list_dbs
-        if (request.getMethod().equals("list_dbs")) {
-            JsonRpc10Response response = new JsonRpc10Response(request.getId());
-            response.setError(null);
-            try {
-                String jsonString = objectMapper.writeValueAsString(response);
-                nettyChannel.writeAndFlush(jsonString);
-            } catch (JsonProcessingException e) {
-                LOG.error("Exception while processing JSON response {}", response, e);
-            }
+        nettyChannel.writeAndFlush(jsonString);
+    }
+
+    private void handleResponse(final JsonNode response, final JsonNode result) {
+        LOG.trace("Response : {}", response);
+        final String requestId = response.get("id").asText();
+        final CallContext returnCtxt = methodContext.remove(requestId);
+        if (returnCtxt == null) {
+            LOG.debug("Ignoring response for unknown request {}", requestId);
             return;
         }
 
-        LOG.error("No handler for Request : {} on {}", requestJson, context);
+        final JsonNode error = response.get("error");
+        if (error != null && !error.isNull()) {
+            LOG.error("Request {} failed with error {}", requestId, error);
+        }
+
+        final Object mappedResult = objectMapper.convertValue(result, returnCtxt.resultType);
+        if (!returnCtxt.future.set(mappedResult)) {
+            LOG.debug("Request {} did not accept result {}", requestId, mappedResult);
+        }
+    }
+
+    private <T> ListenableFuture<T> sendRequest(final JsonRpc10Request request, final JavaType resultType) {
+        final String requestString;
+        try {
+            requestString = objectMapper.writeValueAsString(request);
+        } catch (JsonProcessingException e) {
+            return Futures.immediateFailedFuture(e);
+        }
+        LOG.trace("getClient Request : {}", requestString);
+
+        final SettableFuture<T> sf = SettableFuture.create();
+        methodContext.put(request.getId(), new CallContext(resultType, sf));
+        FUTURE_REAPER_SERVICE.schedule(() -> {
+            CallContext cc = methodContext.remove(request.getId());
+            if (cc != null) {
+                if (cc.future.isDone() || cc.future.isCancelled()) {
+                    return;
+                }
+                cc.future.cancel(false);
+            }
+        }, reaperInterval, TimeUnit.MILLISECONDS);
+
+        nettyChannel.writeAndFlush(requestString);
+        return sf;
     }
 
-    public Map<String, CallContext> getMethodContext() {
-        return methodContext;
+    private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method) {
+        return sendRequest(createRequest(method), resultType);
     }
 
-    public static void setReaperInterval(int interval) {
-        reaperInterval = interval;
-        LOG.info("Ovsdb Rpc Task interval is set to {} millisecond", reaperInterval);
+    private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final List params) {
+        final JsonRpc10Request request = createRequest(method);
+        request.setParams(params);
+        return sendRequest(request, resultType);
     }
 
-    public static void close() {
-        LOG.info("Shutting down reaper executor service");
-        FUTURE_REAPER_SERVICE.shutdownNow();
+    private <T> ListenableFuture<T> sendRequest(final JavaType resultType, final String method, final Params params) {
+        final JsonRpc10Request request = createRequest(method);
+        request.setParams(params.params());
+        return sendRequest(request, resultType);
+    }
+
+    private static JsonRpc10Request createRequest(final String method) {
+        JsonRpc10Request request = new JsonRpc10Request(UUID.randomUUID().toString());
+        request.setMethod(method);
+        return request;
     }
 }
diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcServiceBinderHandler.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcServiceBinderHandler.java
deleted file mode 100644 (file)
index b589314..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2013, 2015 EBay Software Foundation and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.ovsdb.lib.jsonrpc;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.base.Strings;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JsonRpcServiceBinderHandler extends ChannelInboundHandlerAdapter {
-    private static final Logger LOG = LoggerFactory.getLogger(JsonRpcServiceBinderHandler.class);
-    JsonRpcEndpoint factory = null;
-    Object context = null;
-
-    public Object getContext() {
-        return context;
-    }
-
-    public void setContext(Object context) {
-        this.context = context;
-    }
-
-    public JsonRpcServiceBinderHandler(JsonRpcEndpoint factory) {
-        this.factory = factory;
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
-        if (msg instanceof JsonNode) {
-            JsonNode jsonNode = (JsonNode) msg;
-            if (jsonNode.has("result")) {
-                try {
-                    factory.processResult(jsonNode);
-                } catch (NoSuchMethodException e) {
-                     /*
-                       ChannelRead is a method invoked during Netty message receive event.
-                       The only sane thing we can do is to print a meaningful error message.
-                     */
-                    LOG.error("NoSuchMethodException when handling {}", msg, e);
-                }
-            } else if (jsonNode.hasNonNull("method")) {
-                if (jsonNode.has("id") && !Strings.isNullOrEmpty(jsonNode.get("id").asText())) {
-                    factory.processRequest(context, jsonNode);
-                } else {
-                    LOG.debug("Request with null or empty id field: {} {}", jsonNode.get("method"),
-                            jsonNode.get("params"));
-                }
-            }
-
-            return;
-        }
-        ctx.channel().close();
-    }
-
-    @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-        ctx.flush();
-    }
-}