2 * Copyright (c) 2014 Contextream, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.lispflowmapping.southbound;
11 import static io.netty.buffer.Unpooled.wrappedBuffer;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.bootstrap.Bootstrap;
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.EventLoopGroup;
23 import io.netty.channel.epoll.Epoll;
24 import io.netty.channel.epoll.EpollDatagramChannel;
25 import io.netty.channel.epoll.EpollEventLoopGroup;
26 import io.netty.channel.nio.NioEventLoopGroup;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.nio.NioDatagramChannel;
29 import io.netty.util.concurrent.DefaultThreadFactory;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.UnknownHostException;
33 import java.nio.ByteBuffer;
34 import java.util.concurrent.ThreadFactory;
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
37 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
38 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
39 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
40 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
43 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterSingletonService {
51 protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
52 public static final String LISPFLOWMAPPING_ENTITY_NAME = "lispflowmapping";
53 public static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create(
54 LISPFLOWMAPPING_ENTITY_NAME);
56 private volatile String bindingAddress;
57 private boolean mapRegisterCacheEnabled;
58 private long mapRegisterCacheTimeout;
60 private static Object startLock = new Object();
61 private final ClusterSingletonServiceProvider clusterSingletonService;
62 private LispSouthboundHandler lispSouthboundHandler;
63 private LispXtrSouthboundHandler lispXtrSouthboundHandler;
64 private NotificationPublishService notificationPublishService;
65 private Channel channel;
66 private Channel xtrChannel;
67 private Class channelType;
68 private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
69 private volatile boolean listenOnXtrPort = false;
70 private LispSouthboundStats statistics = new LispSouthboundStats();
71 private Bootstrap bootstrap = new Bootstrap();
72 private Bootstrap xtrBootstrap = new Bootstrap();
73 private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
74 private EventLoopGroup eventLoopGroup;
75 private DataBroker dataBroker;
77 public LispSouthboundPlugin(final DataBroker dataBroker,
78 final NotificationPublishService notificationPublishService,
79 final ClusterSingletonServiceProvider clusterSingletonService) {
80 this.dataBroker = dataBroker;
81 this.notificationPublishService = notificationPublishService;
82 this.clusterSingletonService = clusterSingletonService;
83 this.clusterSingletonService.registerClusterSingletonService(this);
87 LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
88 synchronized (startLock) {
89 lispSouthboundHandler = new LispSouthboundHandler(this);
90 lispSouthboundHandler.setDataBroker(dataBroker);
91 lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
92 lispSouthboundHandler.setMapRegisterCacheEnabled(mapRegisterCacheEnabled);
93 lispSouthboundHandler.setMapRegisterCacheTimeout(mapRegisterCacheTimeout);
94 lispSouthboundHandler.init();
95 lispSouthboundHandler.restoreDaoFromDatastore();
97 lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
98 lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
100 if (Epoll.isAvailable()) {
101 eventLoopGroup = new EpollEventLoopGroup(0, threadFactory);
102 channelType = EpollDatagramChannel.class;
103 LOG.debug("Using Netty Epoll for UDP sockets");
105 eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
106 channelType = NioDatagramChannel.class;
107 LOG.debug("Using Netty I/O (non-Epoll) for UDP sockets");
110 bootstrap.group(eventLoopGroup);
111 bootstrap.channel(channelType);
112 bootstrap.handler(lispSouthboundHandler);
114 xtrBootstrap.group(eventLoopGroup);
115 xtrBootstrap.channel(channelType);
116 xtrBootstrap.handler(lispXtrSouthboundHandler);
121 LOG.info("LISP (RFC6830) Southbound Plugin is up!");
125 @SuppressWarnings("checkstyle:IllegalCatch")
126 private void start() {
128 channel = bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
129 LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
130 } catch (Exception e) {
131 LOG.error("Failed to open main socket ", e);
135 @SuppressWarnings("checkstyle:IllegalCatch")
136 private void startXtr() {
137 if (listenOnXtrPort) {
139 xtrChannel = xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
140 LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
141 } catch (Exception e) {
142 LOG.error("Failed to open xTR socket ", e);
147 @SuppressWarnings("checkstyle:IllegalCatch")
148 private void stop() {
150 channel.close().sync();
152 } catch (Exception e) {
153 LOG.error("Failed to close main socket ", e);
157 @SuppressWarnings("checkstyle:IllegalCatch")
158 private void stopXtr() {
159 if (listenOnXtrPort) {
161 xtrChannel.close().sync();
163 } catch (Exception e) {
164 LOG.error("Failed to close xTR socket ", e);
169 private void restart() {
170 LOG.info("Reloading");
175 private void restartXtr() {
176 LOG.info("Reloading xTR");
181 private void unloadActions() {
182 lispSouthboundHandler = null;
183 lispXtrSouthboundHandler = null;
188 LOG.info("LISP (RFC6830) Southbound Plugin is down!");
191 public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
192 final MessageType packetType) {
193 InetAddress ip = getInetAddress(address);
194 handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue());
197 public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
198 final MessageType packetType, final int portNumber) {
199 InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
200 outBuffer.position(0);
201 ByteBuf data = wrappedBuffer(outBuffer);
202 DatagramPacket packet = new DatagramPacket(data, recipient);
203 LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
204 if (LOG.isTraceEnabled()) {
205 LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
207 channel.write(packet).addListener(new ChannelFutureListener() {
209 public void operationComplete(ChannelFuture future) {
210 if (future.isSuccess()) {
211 LOG.trace("Success");
212 statistics.incrementTx(packetType.getIntValue());
214 LOG.warn("Failed to send packet");
215 statistics.incrementTxErrors();
222 private InetAddress getInetAddress(TransportAddress address) {
223 Preconditions.checkNotNull(address, "TransportAddress must not be null");
224 IpAddressBinary ip = address.getIpAddress();
226 if (ip.getIpv4AddressBinary() != null) {
227 return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
228 } else if (ip.getIpv6AddressBinary() != null) {
229 return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
231 } catch (UnknownHostException e) {
232 LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
237 public LispSouthboundStats getStats() {
242 @SuppressWarnings("checkstyle:IllegalCatch")
243 public void setLispAddress(String address) {
244 synchronized (startLock) {
245 if (bindingAddress.equals(address)) {
246 LOG.debug("Configured LISP binding address didn't change.");
248 LOG.debug("Setting LISP binding address to {}", address);
249 bindingAddress = address;
250 if (channel != null) {
254 } catch (Exception e) {
255 LOG.error("Failed to set LISP binding address: ", e);
263 public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
264 listenOnXtrPort = shouldListenOnXtrPort;
265 if (listenOnXtrPort) {
268 LOG.info("Shutting down xTR");
274 public void setXtrPort(int port) {
276 if (listenOnXtrPort) {
281 public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
282 this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
283 if (mapRegisterCacheEnabled) {
284 LOG.info("Enabling Map-Register cache");
286 LOG.info("Disabling Map-Register cache");
290 public void setMapRegisterCacheTimeout(long mapRegisterCacheTimeout) {
291 this.mapRegisterCacheTimeout = mapRegisterCacheTimeout;
294 public void setBindingAddress(String bindingAddress) {
295 this.bindingAddress = bindingAddress;
299 public void close() throws Exception {
300 eventLoopGroup.shutdownGracefully();
301 lispSouthboundHandler.close();
303 clusterSingletonService.close();
307 public void instantiateServiceInstance() {
308 if (lispSouthboundHandler != null) {
309 lispSouthboundHandler.setNotificationProvider(notificationPublishService);
310 lispSouthboundHandler.restoreDaoFromDatastore();
311 lispSouthboundHandler.setIsMaster(true);
313 if (lispXtrSouthboundHandler != null) {
314 lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
319 public ListenableFuture<Void> closeServiceInstance() {
320 if (lispSouthboundHandler != null) {
321 lispSouthboundHandler.setNotificationProvider(null);
322 lispSouthboundHandler.setIsMaster(false);
324 if (lispXtrSouthboundHandler != null) {
325 lispXtrSouthboundHandler.setNotificationProvider(null);
327 return Futures.<Void>immediateFuture(null);
331 public ServiceGroupIdentifier getIdentifier() {
332 return SERVICE_GROUP_IDENTIFIER;