2 * Copyright (C) 2013 EBay Software Foundation
3 * Copyright (C) 2014 Cisco Systems, Inc.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 * Authors : Ashwin Raveendran, Madhu Venugopal, Thomas Bachman
11 package org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc;
13 import io.netty.channel.Channel;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelFutureListener;
18 import java.util.UUID;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonNode;
25 import com.fasterxml.jackson.databind.ObjectMapper;
26 import com.google.common.collect.Maps;
27 import com.google.common.util.concurrent.SettableFuture;
31 * This represents a JSONRPC connection between a {@link RpcServer}
32 * and some client. The clients may connect and disconnect, so one
33 * possible role that the JSONRPC endpoint can serve is to keep a long-lived
34 * notion of a client, while maintaining connectivity as it comes and goes.
36 * TODO: The current implementation uses Jackson Full data binding serialization,
37 * using JSON that has already been parsed using Jackson's Tree Model.
38 * This will be changed to streaming-mode serialization later.
43 public class JsonRpcEndpoint implements ChannelFutureListener {
45 protected static final Logger logger = LoggerFactory.getLogger(JsonRpcEndpoint.class);
47 private static class CallContext {
48 private final String method;
49 private final SettableFuture<Object> future;
51 public CallContext(String method, SettableFuture<Object> future) {
56 public String getMethod() {
60 public SettableFuture<Object> getFuture() {
65 private final String identifier;
66 private Object context;
67 private final ObjectMapper objectMapper;
68 private final Channel nettyChannel;
69 private final Map<String, CallContext> methodContext = Maps.newHashMap();
70 private final RpcMessageMap messageMap;
71 private final RpcBroker broker;
72 private final ConnectionService connectionService;
74 public String getIdentifier() {
78 public Object getContext() {
82 public void setContext(Object context) {
83 this.context = context;
86 public Channel getChannel() {
90 public JsonRpcEndpoint(String identifier, ConnectionService connectionService,
91 ObjectMapper objectMapper, Channel channel,
92 RpcMessageMap messageMap, RpcBroker broker) {
93 this.identifier = identifier;
94 this.connectionService = connectionService;
95 this.objectMapper = objectMapper;
96 this.nettyChannel = channel;
97 this.messageMap = messageMap;
103 * Send a concrete {@link RpcMessage} to the RPC endpoint.
105 * @param message The concrete {@link RpcMessage} to send
106 * @return SettableFuture<Object> The caller can use the returned
107 * object to wait for the response (currently no timeout)
108 * @throws Exception The concrete message couldn't be serialized and sent
110 public SettableFuture<Object> sendRequest(RpcMessage message) throws Exception {
111 if (messageMap.get(message.getName()) == null) {
114 JsonNode jn = objectMapper.getNodeFactory().textNode(UUID.randomUUID().toString());
117 String s = objectMapper.writeValueAsString(message) + "\0";
118 logger.trace("invoke: {}", s);
120 SettableFuture<Object> sf = SettableFuture.create();
121 methodContext.put(message.getId().asText(), new CallContext(message.getName(), sf));
123 nettyChannel.writeAndFlush(s);
130 * Send a response to a previous {@link RpcMessage}request
132 * @param message The concrete {@link RpcMessage}
133 * @throws Exception The concrete message couldn't be serialized and sent
135 public void sendResponse (RpcMessage message) throws Exception {
137 String s = objectMapper.writeValueAsString(message) + "\0";
138 logger.trace("sendResponse: {}", s);
140 nettyChannel.writeAndFlush(s);
145 * Handle an {@link RpcMessage} response from the peer.
147 * @param response A fully parsed Jackson Tree-Mode JsonNode
148 * @throws NoSuchMethodException Internal error
150 public void processResult(JsonNode response) throws NoSuchMethodException {
152 logger.trace("Response : {}", response.toString());
153 CallContext returnCtxt = methodContext.get(response.get("id").asText());
154 if (returnCtxt == null) return;
155 RpcMessage message = messageMap.get(returnCtxt.getMethod());
156 if (message != null) {
158 RpcMessage handler = objectMapper.treeToValue(response, message.getClass());
160 JsonNode error = response.get("error");
161 if (error != null && !error.isNull()) {
162 logger.error("Error : {}", error.toString());
165 returnCtxt.getFuture().set(handler);
166 } catch (JsonProcessingException e) {
167 logger.error("Unable to handle " + returnCtxt.getMethod(), e);
170 throw new RuntimeException("The response to " + returnCtxt.getMethod() +
171 "sent is unsupported");
177 * Handle incoming {@link RpcMessage} requests. The supported messages
178 * are defined by the endpoint's message map.
180 * @param requestJson A Jackson JsonNode that has had full Tree-Mode parsing
182 public void processRequest(JsonNode requestJson) {
184 RpcMessage callback = messageMap.get(requestJson.get("method").asText());
185 if (callback != null) {
187 logger.trace("Request : {} {}", requestJson.get("method"), requestJson.get("params"));
189 message = objectMapper.treeToValue(requestJson, callback.getClass());
190 message.setId(requestJson.get("id"));
192 broker.publish(this, message);
193 } catch (JsonProcessingException e) {
194 logger.error("Unable to invoke callback " + callback.getName(), e);
199 // Echo dont need any special processing. hence handling it internally.
201 if (requestJson.get("method").asText().equals("echo")) {
202 JsonRpc10Response response = new JsonRpc10Response(requestJson.get("id").asText());
203 response.setError(null);
206 s = objectMapper.writeValueAsString(response) + "\0";
207 nettyChannel.writeAndFlush(s);
208 } catch (JsonProcessingException e) {
209 logger.error("Exception while processing JSON string " + s, e );
214 logger.error("No handler for Request : {}",requestJson.toString());
218 public void operationComplete(ChannelFuture arg0) throws Exception {
219 connectionService.channelClosed(this);