Feature uses features-parent as parent
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / jsonrpc / JsonRpcEndpoint.java
1 /*
2  * Copyright (C) 2013 EBay Software Foundation
3  * Copyright (C) 2014 Cisco Systems, Inc.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Authors : Ashwin Raveendran, Madhu Venugopal, Thomas Bachman
10  */
11 package org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc;
12
13 import io.netty.channel.Channel;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelFutureListener;
16
17 import java.util.Map;
18 import java.util.UUID;
19
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonNode;
25 import com.fasterxml.jackson.databind.ObjectMapper;
26 import com.google.common.collect.Maps;
27 import com.google.common.util.concurrent.SettableFuture;
28
29 /**
30  *
31  * This represents a JSONRPC connection between a {@link RpcServer}
32  * and some client. The clients may connect and disconnect, so one
33  * possible role that the JSONRPC endpoint can serve is to keep a long-lived
34  * notion of a client, while maintaining connectivity as it comes and goes.
35  *
36  * TODO: The current implementation uses Jackson Full data binding serialization,
37  * using JSON that has already been parsed using Jackson's Tree Model.
38  * This will be changed to streaming-mode serialization later.
39  *
40  * @author tbachman
41  *
42  */
43 public class JsonRpcEndpoint implements ChannelFutureListener {
44
45     protected static final Logger logger = LoggerFactory.getLogger(JsonRpcEndpoint.class);
46
47     private static class CallContext {
48         private final String method;
49         private final SettableFuture<Object> future;
50
51         public CallContext(String method, SettableFuture<Object> future) {
52             this.method = method;
53             this.future = future;
54         }
55
56         public String getMethod() {
57             return method;
58         }
59
60         public SettableFuture<Object> getFuture() {
61             return future;
62         }
63     }
64
65     private final String identifier;
66     private Object context;
67     private final ObjectMapper objectMapper;
68     private final Channel nettyChannel;
69     private final Map<String, CallContext> methodContext = Maps.newHashMap();
70     private final RpcMessageMap messageMap;
71     private final RpcBroker broker;
72     private final ConnectionService connectionService;
73
74     public String getIdentifier() {
75         return identifier;
76     }
77
78     public Object getContext() {
79         return context;
80     }
81
82     public void setContext(Object context) {
83         this.context = context;
84     }
85
86     public Channel getChannel() {
87         return nettyChannel;
88     }
89
90     public JsonRpcEndpoint(String identifier, ConnectionService connectionService,
91             ObjectMapper objectMapper, Channel channel,
92             RpcMessageMap messageMap, RpcBroker broker) {
93         this.identifier = identifier;
94         this.connectionService = connectionService;
95         this.objectMapper = objectMapper;
96         this.nettyChannel = channel;
97         this.messageMap = messageMap;
98         this.broker = broker;
99     }
100
101     /**
102      *
103      * Send a concrete {@link RpcMessage} to the RPC endpoint.
104      *
105      * @param message The concrete {@link RpcMessage} to send
106      * @return SettableFuture&lt;Object&gt; The caller can use the returned
107      * object to wait for the response (currently no timeout)
108      * @throws Exception The concrete message couldn't be serialized and sent
109      */
110     public SettableFuture<Object> sendRequest(RpcMessage message) throws Exception {
111         if (messageMap.get(message.getName()) == null) {
112                 return null;
113         }
114         JsonNode jn = objectMapper.getNodeFactory().textNode(UUID.randomUUID().toString());
115         message.setId(jn);
116
117         String s = objectMapper.writeValueAsString(message) + "\0";
118         logger.trace("invoke: {}", s);
119
120         SettableFuture<Object> sf = SettableFuture.create();
121         methodContext.put(message.getId().asText(), new CallContext(message.getName(), sf));
122
123         nettyChannel.writeAndFlush(s);
124
125         return sf;
126     }
127
128     /**
129      *
130      * Send a response to a previous {@link RpcMessage}request
131      *
132      * @param message The concrete {@link RpcMessage}
133      * @throws Exception The concrete message couldn't be serialized and sent
134      */
135     public void  sendResponse (RpcMessage message) throws Exception {
136
137         String s = objectMapper.writeValueAsString(message) + "\0";
138         logger.trace("sendResponse: {}", s);
139
140         nettyChannel.writeAndFlush(s);
141     }
142
143     /**
144      *
145      * Handle an {@link RpcMessage} response from the peer.
146      *
147      * @param response A fully parsed Jackson Tree-Mode JsonNode
148      * @throws NoSuchMethodException Internal error
149      */
150     public void processResult(JsonNode response) throws NoSuchMethodException {
151
152         logger.trace("Response : {}", response.toString());
153         CallContext returnCtxt = methodContext.get(response.get("id").asText());
154         if (returnCtxt == null) return;
155         RpcMessage message = messageMap.get(returnCtxt.getMethod());
156         if (message != null) {
157             try {
158                 RpcMessage handler = objectMapper.treeToValue(response, message.getClass());
159
160                 JsonNode error = response.get("error");
161                 if (error != null && !error.isNull()) {
162                     logger.error("Error : {}", error.toString());
163                 }
164
165                 returnCtxt.getFuture().set(handler);
166             } catch (JsonProcessingException  e) {
167                 logger.error("Unable to handle " + returnCtxt.getMethod(), e);
168             }
169         } else {
170             throw new RuntimeException("The response to " + returnCtxt.getMethod() +
171                     "sent is unsupported");
172         }
173     }
174
175     /**
176      *
177      * Handle incoming {@link RpcMessage} requests. The supported messages
178      * are defined by the endpoint's message map.
179      *
180      * @param requestJson A Jackson JsonNode that has had full Tree-Mode parsing
181      */
182     public void processRequest(JsonNode requestJson) {
183         RpcMessage message;
184         RpcMessage callback = messageMap.get(requestJson.get("method").asText());
185         if (callback != null) {
186             try {
187                 logger.trace("Request : {} {}", requestJson.get("method"), requestJson.get("params"));
188
189                 message = objectMapper.treeToValue(requestJson, callback.getClass());
190                 message.setId(requestJson.get("id"));
191
192                 broker.publish(this, message);
193             } catch (JsonProcessingException  e) {
194                 logger.error("Unable to invoke callback " + callback.getName(), e);
195             }
196             return;
197         }
198
199         // Echo dont need any special processing. hence handling it internally.
200
201         if (requestJson.get("method").asText().equals("echo")) {
202             JsonRpc10Response response = new JsonRpc10Response(requestJson.get("id").asText());
203             response.setError(null);
204             String s = null;
205             try {
206                 s = objectMapper.writeValueAsString(response) + "\0";
207                 nettyChannel.writeAndFlush(s);
208             } catch (JsonProcessingException e) {
209                 logger.error("Exception while processing JSON string " + s, e );
210             }
211             return;
212         }
213
214         logger.error("No handler for Request : {}",requestJson.toString());
215     }
216
217     @Override
218     public void operationComplete(ChannelFuture arg0) throws Exception {
219         connectionService.channelClosed(this);
220     }
221 }