2 * Copyright (C) 2014 Cisco Systems, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Thomas Bachman
10 package org.opendaylight.groupbasedpolicy.renderer.opflex.jsonrpc;
12 import io.netty.bootstrap.ServerBootstrap;
13 import io.netty.channel.AdaptiveRecvByteBufAllocator;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelInitializer;
17 import io.netty.channel.ChannelOption;
18 import io.netty.channel.EventLoopGroup;
19 import io.netty.channel.nio.NioEventLoopGroup;
20 import io.netty.channel.socket.SocketChannel;
21 import io.netty.channel.socket.nio.NioServerSocketChannel;
22 import io.netty.handler.codec.string.StringEncoder;
23 import io.netty.handler.logging.LogLevel;
24 import io.netty.handler.logging.LoggingHandler;
25 import io.netty.util.CharsetUtil;
27 import java.net.InetAddress;
28 import java.util.List;
29 import java.util.concurrent.ExecutionException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 import com.fasterxml.jackson.databind.DeserializationFeature;
35 import com.fasterxml.jackson.databind.ObjectMapper;
36 import com.fasterxml.jackson.databind.SerializationFeature;
39 * A (soon-to-be) generic RPC server. It creates {@link JsonRpcEndpoint} objects
40 * for each new connection. The RpcServer has a set of {@link RpcMessage}
41 * types that it supports, and it passes on these supported messages
42 * to the {@link JsonRpcEndpoint} objects that it creates.
44 * TODO: add serialization type, and refactor so serialization determines
45 * concrete RpcEndpoint object (only JsonRpcEndpoint right now).
46 * TODO: This and other classes are tightly coupled to netty -- make abstraction?
48 public class RpcServer {
49 protected static final Logger logger =
50 LoggerFactory.getLogger(RpcServer.class);
52 final String identity;
56 private final RpcMessageMap messageMap = new RpcMessageMap();
57 ConnectionService connectionService;
60 public RpcServer(String identity, int port) {
61 this.listenPort = port;
62 this.identity = identity;
65 public Object getContext() {
69 public void setContext(Object context) {
70 this.context = context;
73 public void addMessage(RpcMessage message) {
74 this.messageMap.add(message);
77 public void addMessageList(List<RpcMessage> messageList) {
78 this.messageMap.addList(messageList);
81 public void setConnectionService(ConnectionService connectionService) {
82 this.connectionService = connectionService;
85 public void setRpcBroker(RpcBroker broker) {
89 public void setChannel(Channel channel) {
90 this.channel = channel;
93 public Channel getChannel() {
97 void handleNewConnection(String identifier, Channel newChannel)
98 throws InterruptedException, ExecutionException {
100 ObjectMapper objectMapper = new ObjectMapper();
101 objectMapper.configure(
102 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
103 objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
105 JsonRpcEndpoint endpoint = new JsonRpcEndpoint(identifier, connectionService,
106 objectMapper, newChannel, messageMap, broker);
107 endpoint.setContext(context);
108 JsonRpcServiceBinderHandler binderHandler =
109 new JsonRpcServiceBinderHandler(endpoint);
110 newChannel.pipeline().addLast(binderHandler);
112 connectionService.addConnection(endpoint);
114 ChannelFuture closeFuture = newChannel.closeFuture();
115 closeFuture.addListener(endpoint);
118 public void start() {
119 EventLoopGroup bossGroup = new NioEventLoopGroup();
120 EventLoopGroup workerGroup = new NioEventLoopGroup();
122 ServerBootstrap b = new ServerBootstrap();
123 b.group(bossGroup, workerGroup)
124 .channel(NioServerSocketChannel.class)
125 .option(ChannelOption.SO_BACKLOG, 100)
126 .handler(new LoggingHandler(LogLevel.INFO))
127 .childHandler(new ChannelInitializer<SocketChannel>() {
129 public void initChannel(SocketChannel ch)
131 logger.debug("New Passive channel created : "
133 InetAddress address = ch.remoteAddress()
135 int port = ch.remoteAddress().getPort();
136 String identifier = address.getHostAddress() + ":"
138 ch.pipeline().addLast(
139 new LoggingHandler(LogLevel.INFO),
140 new JsonRpcDecoder(100000),
141 new StringEncoder(CharsetUtil.UTF_8));
143 handleNewConnection(identifier, ch);
144 logger.trace("Connected Node : " + identifier);
147 b.option(ChannelOption.TCP_NODELAY, true);
148 b.option(ChannelOption.RCVBUF_ALLOCATOR,
149 new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
151 ChannelFuture f = b.bind(identity, listenPort).sync();
152 String id = f.channel().localAddress().toString();
153 logger.trace("Connected Node : " + id);
155 this.channel = f.channel();
157 // Wait until the server socket is closed.
158 f.channel().closeFuture().sync();
159 } catch (InterruptedException e) {
160 logger.error("Thread interrupted", e);
162 // Shut down all event loops to terminate all threads.
163 bossGroup.shutdownGracefully();
164 workerGroup.shutdownGracefully();