Bug 5047: Improve soutbound performance
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import static io.netty.buffer.Unpooled.wrappedBuffer;
12 import io.netty.bootstrap.Bootstrap;
13 import io.netty.buffer.ByteBuf;
14 import io.netty.buffer.ByteBufUtil;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelFutureListener;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.nio.NioEventLoopGroup;
19 import io.netty.channel.socket.DatagramPacket;
20 import io.netty.channel.socket.nio.NioDatagramChannel;
21 import io.netty.util.concurrent.DefaultThreadFactory;
22
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.nio.ByteBuffer;
26 import java.util.concurrent.ThreadFactory;
27 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
29 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
33 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
34 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
35 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.OdlLispSbService;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.net.InetAddresses;
43
44 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider {
45     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
46
47     private static Object startLock = new Object();
48     private LispSouthboundHandler lispSouthboundHandler;
49     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
50     private NotificationPublishService notificationPublishService;
51     private RpcProviderRegistry rpcRegistry;
52     private BindingAwareBroker broker;
53     private NioDatagramChannel channel;
54     private volatile String bindingAddress = "0.0.0.0";
55     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
56     private volatile boolean listenOnXtrPort = false;
57     private BindingAwareBroker.RpcRegistration<OdlLispSbService> sbRpcRegistration;
58     private NioDatagramChannel xtrChannel;
59     private LispSouthboundStats statistics = new LispSouthboundStats();
60     private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
61     private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
62
63
64     public void init() {
65         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
66         final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
67
68         sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler);
69         broker.registerProvider(this);
70
71         synchronized (startLock) {
72             lispSouthboundHandler = new LispSouthboundHandler(this);
73             lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
74             lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
75             lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
76
77             start();
78             startXtr();
79
80             LOG.info("LISP (RFC6830) Southbound Plugin is up!");
81         }
82     }
83
84     private void start() {
85         try {
86             Bootstrap bootstrap = new Bootstrap();
87             bootstrap.group(eventLoopGroup);
88             bootstrap.channel(NioDatagramChannel.class);
89             bootstrap.handler(lispSouthboundHandler);
90             channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
91         } catch (Exception e) {
92             LOG.error("Failed to open main socket ", e);
93         }
94     }
95
96     private void startXtr() {
97         if (listenOnXtrPort) {
98             try {
99                 Bootstrap xtrBootstrap = new Bootstrap();
100                 xtrBootstrap.group(eventLoopGroup);
101                 xtrBootstrap.channel(NioDatagramChannel.class);
102                 xtrBootstrap.handler(lispXtrSouthboundHandler);
103                 xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
104             } catch (Exception e) {
105                 LOG.error("Failed to open xTR socket ", e);
106             }
107         }
108     }
109
110     private void stop() {
111         try {
112             channel.close().sync();
113             channel = null;
114         } catch (Exception e) {
115             LOG.error("Failed to close main socket ", e);
116         }
117     }
118
119     private void stopXtr() {
120         if (listenOnXtrPort) {
121             try {
122                 xtrChannel.close().sync();
123                 xtrChannel = null;
124             } catch (Exception e) {
125                 LOG.error("Failed to close xTR socket ", e);
126             }
127         }
128     }
129
130     private void restart() {
131         LOG.info("Reloading");
132         stop();
133         start();
134     }
135
136     private void restartXtr() {
137         LOG.info("Reloading xTR");
138         stopXtr();
139         startXtr();
140     }
141
142     public void setNotificationPublishService(NotificationPublishService notificationService) {
143         this.notificationPublishService = notificationService;
144     }
145
146     public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
147         this.rpcRegistry = rpcRegistry;
148     }
149
150     public void setBindingAwareBroker(BindingAwareBroker broker) {
151         this.broker = broker;
152     }
153
154     private void unloadActions() {
155         lispSouthboundHandler = null;
156         lispXtrSouthboundHandler = null;
157         bindingAddress = "0.0.0.0";
158
159         stop();
160         stopXtr();
161
162         LOG.info("LISP (RFC6830) Southbound Plugin is down!");
163     }
164
165     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
166             final MessageType packetType) {
167         InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
168         InetSocketAddress recipient = new InetSocketAddress(ip, address.getPort().getValue());
169         // the wrappedBuffer() method doesn't copy data, so this conversion shouldn't hurt performance
170         ByteBuf data = wrappedBuffer(outBuffer.array());
171         DatagramPacket packet = new DatagramPacket(data, recipient);
172         LOG.debug("Sending {} on port {} to address: {}", packetType, address.getPort().getValue(), ip);
173         if (LOG.isTraceEnabled()) {
174             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
175         }
176         channel.write(packet).addListener(new ChannelFutureListener() {
177             @Override
178             public void operationComplete(ChannelFuture future) {
179                 if (future.isSuccess()) {
180                     LOG.trace("Success");
181                     statistics.incrementTx(packetType.getIntValue());
182                 } else {
183                     LOG.warn("Failed to send packet");
184                     statistics.incrementTxErrors();
185                 }
186             }
187         });
188         channel.flush();
189     }
190
191     public LispSouthboundStats getStats() {
192         return statistics;
193     }
194
195     @Override
196     public void setLispAddress(String address) {
197         synchronized (startLock) {
198             if (bindingAddress.equals(address)) {
199                 LOG.debug("Configured LISP binding address didn't change.");
200             } else {
201                 LOG.debug("Setting LISP binding address to {}", address);
202                 bindingAddress = address;
203                 try {
204                     restart();
205                     restartXtr();
206                 } catch (Exception e) {
207                     LOG.error("Failed to set LISP binding address: ", e);
208                 }
209             }
210         }
211     }
212
213     @Override
214     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
215         listenOnXtrPort = shouldListenOnXtrPort;
216         if (listenOnXtrPort) {
217             restartXtr();
218         } else {
219             LOG.info("Shutting down xTR");
220             stopXtr();
221         }
222     }
223
224     @Override
225     public void setXtrPort(int port) {
226         this.xtrPort = port;
227         if (listenOnXtrPort) {
228             restartXtr();
229         }
230     }
231
232     @Override
233     public void close() throws Exception {
234         unloadActions();
235         eventLoopGroup.shutdownGracefully();
236         sbRpcRegistration.close();
237     }
238
239     @Override
240     public void onSessionInitiated(ProviderContext session) {
241         LOG.debug("LispSouthboundPlugin Provider Session Initiated");
242     }
243 }