Feature uses features-parent as parent
[groupbasedpolicy.git] / renderers / opflex / src / main / java / org / opendaylight / groupbasedpolicy / renderer / opflex / jsonrpc / RpcServer.java
1 /*
2  * Copyright (C) 2014 Cisco Systems, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Thomas Bachman
9  */
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc;
11
12 import io.netty.bootstrap.ServerBootstrap;
13 import io.netty.channel.AdaptiveRecvByteBufAllocator;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelInitializer;
17 import io.netty.channel.ChannelOption;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.channel.nio.NioEventLoopGroup;
20 import io.netty.channel.socket.SocketChannel;
21 import io.netty.channel.socket.nio.NioServerSocketChannel;
22 import io.netty.handler.codec.string.StringEncoder;
23 import io.netty.handler.logging.LogLevel;
24 import io.netty.handler.logging.LoggingHandler;
25 import io.netty.util.CharsetUtil;
26
27 import java.net.InetAddress;
28 import java.util.List;
29 import java.util.concurrent.ExecutionException;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import com.fasterxml.jackson.databind.DeserializationFeature;
35 import com.fasterxml.jackson.databind.ObjectMapper;
36 import com.fasterxml.jackson.databind.SerializationFeature;
37
38 /**
39  * A (soon-to-be) generic RPC server. It creates {@link JsonRpcEndpoint} objects
40  * for each new connection. The RpcServer has a set of {@link RpcMessage}
41  * types that it supports, and it passes on these supported messages
42  * to the {@link JsonRpcEndpoint} objects that it creates.
43  *
44  * TODO: add serialization type, and refactor so serialization determines
45  *       concrete RpcEndpoint object (only JsonRpcEndpoint right now).
46  * TODO: This and other classes are tightly coupled to netty -- make abstraction?
47  */
48 public class RpcServer {
49     protected static final Logger logger =
50             LoggerFactory.getLogger(RpcServer.class);
51
52     final String identity;
53     final int listenPort;
54     Channel channel;
55     Object context;
56     private final RpcMessageMap messageMap = new RpcMessageMap();
57     ConnectionService connectionService;
58     RpcBroker broker;
59
60     public RpcServer(String identity, int port) {
61         this.listenPort = port;
62         this.identity = identity;
63     }
64
65     public Object getContext() {
66         return context;
67     }
68
69     public void setContext(Object context) {
70         this.context = context;
71     }
72
73     public void addMessage(RpcMessage message) {
74         this.messageMap.add(message);
75     }
76
77     public void addMessageList(List<RpcMessage> messageList) {
78         this.messageMap.addList(messageList);
79     }
80
81     public void setConnectionService(ConnectionService connectionService) {
82         this.connectionService = connectionService;
83     }
84
85     public void setRpcBroker(RpcBroker broker) {
86         this.broker = broker;
87     }
88
89     public void setChannel(Channel channel) {
90         this.channel = channel;
91     }
92
93     public Channel getChannel() {
94         return this.channel;
95     }
96
97     void handleNewConnection(String identifier, Channel newChannel)
98             throws InterruptedException, ExecutionException {
99
100         ObjectMapper objectMapper = new ObjectMapper();
101         objectMapper.configure(
102                 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
103         objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
104
105         JsonRpcEndpoint endpoint = new JsonRpcEndpoint(identifier, connectionService,
106                 objectMapper, newChannel, messageMap, broker);
107         endpoint.setContext(context);
108         JsonRpcServiceBinderHandler binderHandler =
109                 new JsonRpcServiceBinderHandler(endpoint);
110         newChannel.pipeline().addLast(binderHandler);
111
112         connectionService.addConnection(endpoint);
113
114         ChannelFuture closeFuture = newChannel.closeFuture();
115         closeFuture.addListener(endpoint);
116     }
117
118     public void start() {
119         EventLoopGroup bossGroup = new NioEventLoopGroup();
120         EventLoopGroup workerGroup = new NioEventLoopGroup();
121         try {
122             ServerBootstrap b = new ServerBootstrap();
123             b.group(bossGroup, workerGroup)
124                     .channel(NioServerSocketChannel.class)
125                     .option(ChannelOption.SO_BACKLOG, 100)
126                     .handler(new LoggingHandler(LogLevel.INFO))
127                     .childHandler(new ChannelInitializer<SocketChannel>() {
128                         @Override
129                         public void initChannel(SocketChannel ch)
130                                 throws Exception {
131                             logger.debug("New Passive channel created : "
132                                     + ch.toString());
133                             InetAddress address = ch.remoteAddress()
134                                     .getAddress();
135                             int port = ch.remoteAddress().getPort();
136                             String identifier = address.getHostAddress() + ":"
137                                     + port;
138                             ch.pipeline().addLast(
139                                     new LoggingHandler(LogLevel.INFO),
140                                     new JsonRpcDecoder(100000),
141                                     new StringEncoder(CharsetUtil.UTF_8));
142
143                             handleNewConnection(identifier, ch);
144                             logger.trace("Connected Node : " + identifier);
145                         }
146                     });
147             b.option(ChannelOption.TCP_NODELAY, true);
148             b.option(ChannelOption.RCVBUF_ALLOCATOR,
149                     new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
150             // Start the server.
151             ChannelFuture f = b.bind(identity, listenPort).sync();
152             String id = f.channel().localAddress().toString();
153             logger.trace("Connected Node : " + id);
154
155             this.channel = f.channel();
156
157             // Wait until the server socket is closed.
158             f.channel().closeFuture().sync();
159         } catch (InterruptedException e) {
160             logger.error("Thread interrupted", e);
161         } finally {
162             // Shut down all event loops to terminate all threads.
163             bossGroup.shutdownGracefully();
164             workerGroup.shutdownGracefully();
165         }
166     }
167 }