Clustering - adding to LispSouthboundPlugin.
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import static io.netty.buffer.Unpooled.wrappedBuffer;
12
13 import com.google.common.base.Preconditions;
14
15 import io.netty.bootstrap.Bootstrap;
16 import io.netty.buffer.ByteBuf;
17 import io.netty.buffer.ByteBufUtil;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.DatagramPacket;
23 import io.netty.channel.socket.nio.NioDatagramChannel;
24 import io.netty.util.concurrent.DefaultThreadFactory;
25
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import java.net.UnknownHostException;
29 import java.nio.ByteBuffer;
30 import java.util.concurrent.ThreadFactory;
31
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
35 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
36 import org.opendaylight.lispflowmapping.clustering.ClusterNodeModulSwitcherImpl;
37 import org.opendaylight.lispflowmapping.clustering.api.ClusterNodeModuleSwitcher;
38 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
39 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
40 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
41 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.OdlLispSbService;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.lisp.sb.config.rev150517.LispSbConfig;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterNodeModuleSwitcher {
51     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
52
53     private static Object startLock = new Object();
54     private final ClusterNodeModulSwitcherImpl clusterNodeModulSwitcher;
55     private LispSouthboundHandler lispSouthboundHandler;
56     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
57     private NotificationPublishService notificationPublishService;
58     private NioDatagramChannel channel;
59     private volatile String bindingAddress = "0.0.0.0";
60     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
61     private volatile boolean listenOnXtrPort = false;
62     private boolean mapRegisterCacheEnabled = true;
63     private RpcRegistration<OdlLispSbService> sbRpcRegistration;
64     private NioDatagramChannel xtrChannel;
65     private LispSouthboundStats statistics = new LispSouthboundStats();
66     private Bootstrap bootstrap = new Bootstrap();
67     private Bootstrap xtrBootstrap = new Bootstrap();
68     private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
69     private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
70     private DataBroker dataBroker;
71
72     public LispSouthboundPlugin(final DataBroker dataBroker,
73             final NotificationPublishService notificationPublishService,
74             final LispSbConfig lispSbConfig, final EntityOwnershipService entityOwnershipService) {
75         this.dataBroker = dataBroker;
76         this.notificationPublishService = notificationPublishService;
77         this.bindingAddress = lispSbConfig.getBindAddress();
78         this.mapRegisterCacheEnabled = lispSbConfig.isMapRegisterCache();
79         clusterNodeModulSwitcher = new ClusterNodeModulSwitcherImpl(entityOwnershipService);
80         clusterNodeModulSwitcher.setModule(this);
81     }
82
83     public void init() {
84         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
85         synchronized (startLock) {
86             lispSouthboundHandler = new LispSouthboundHandler(this);
87             lispSouthboundHandler.setDataBroker(dataBroker);
88             lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
89             lispSouthboundHandler.setMapRegisterCacheEnabled(mapRegisterCacheEnabled);
90             lispSouthboundHandler.init();
91             lispSouthboundHandler.restoreDaoFromDatastore();
92
93             lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
94             lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
95
96             bootstrap.group(eventLoopGroup);
97             bootstrap.channel(NioDatagramChannel.class);
98             bootstrap.handler(lispSouthboundHandler);
99
100             xtrBootstrap.group(eventLoopGroup);
101             xtrBootstrap.channel(NioDatagramChannel.class);
102             xtrBootstrap.handler(lispXtrSouthboundHandler);
103
104             start();
105             startXtr();
106
107             LOG.info("LISP (RFC6830) Southbound Plugin is up!");
108         }
109         clusterNodeModulSwitcher.switchModuleByEntityOwnership();
110     }
111
112     private void start() {
113         try {
114             channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
115             LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
116         } catch (Exception e) {
117             LOG.error("Failed to open main socket ", e);
118         }
119     }
120
121     private void startXtr() {
122         if (listenOnXtrPort) {
123             try {
124                 xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
125                 LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
126             } catch (Exception e) {
127                 LOG.error("Failed to open xTR socket ", e);
128             }
129         }
130     }
131
132     private void stop() {
133         try {
134             channel.close().sync();
135             channel = null;
136         } catch (Exception e) {
137             LOG.error("Failed to close main socket ", e);
138         }
139     }
140
141     private void stopXtr() {
142         if (listenOnXtrPort) {
143             try {
144                 xtrChannel.close().sync();
145                 xtrChannel = null;
146             } catch (Exception e) {
147                 LOG.error("Failed to close xTR socket ", e);
148             }
149         }
150     }
151
152     private void restart() {
153         LOG.info("Reloading");
154         stop();
155         start();
156     }
157
158     private void restartXtr() {
159         LOG.info("Reloading xTR");
160         stopXtr();
161         startXtr();
162     }
163
164     private void unloadActions() {
165         lispSouthboundHandler = null;
166         lispXtrSouthboundHandler = null;
167
168         stop();
169         stopXtr();
170
171         LOG.info("LISP (RFC6830) Southbound Plugin is down!");
172     }
173
174     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
175             final MessageType packetType) {
176         InetAddress ip = getInetAddress(address);
177         handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue());
178     }
179
180     public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
181             final MessageType packetType, final int portNumber) {
182         InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
183         outBuffer.position(0);
184         ByteBuf data = wrappedBuffer(outBuffer);
185         DatagramPacket packet = new DatagramPacket(data, recipient);
186         LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
187         if (LOG.isTraceEnabled()) {
188             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
189         }
190         channel.write(packet).addListener(new ChannelFutureListener() {
191             @Override
192             public void operationComplete(ChannelFuture future) {
193                 if (future.isSuccess()) {
194                     LOG.trace("Success");
195                     statistics.incrementTx(packetType.getIntValue());
196                 } else {
197                     LOG.warn("Failed to send packet");
198                     statistics.incrementTxErrors();
199                 }
200             }
201         });
202         channel.flush();
203     }
204
205     private InetAddress getInetAddress(TransportAddress address) {
206         Preconditions.checkNotNull(address, "TransportAddress must not be null");
207         IpAddressBinary ip = address.getIpAddress();
208         try {
209             if (ip.getIpv4AddressBinary() != null) {
210                 return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
211             } else if (ip.getIpv6AddressBinary() != null) {
212                 return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
213             }
214         } catch (UnknownHostException e) {
215             LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
216         }
217         return null;
218     }
219
220     public LispSouthboundStats getStats() {
221         return statistics;
222     }
223
224     @Override
225     public void setLispAddress(String address) {
226         synchronized (startLock) {
227             if (bindingAddress.equals(address)) {
228                 LOG.debug("Configured LISP binding address didn't change.");
229             } else {
230                 LOG.debug("Setting LISP binding address to {}", address);
231                 bindingAddress = address;
232                 if (channel != null) {
233                     try {
234                         restart();
235                         restartXtr();
236                     } catch (Exception e) {
237                         LOG.error("Failed to set LISP binding address: ", e);
238                     }
239                 }
240             }
241         }
242     }
243
244     @Override
245     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
246         listenOnXtrPort = shouldListenOnXtrPort;
247         if (listenOnXtrPort) {
248             restartXtr();
249         } else {
250             LOG.info("Shutting down xTR");
251             stopXtr();
252         }
253     }
254
255     @Override
256     public void setXtrPort(int port) {
257         this.xtrPort = port;
258         if (listenOnXtrPort) {
259             restartXtr();
260         }
261     }
262
263     public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
264         this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
265         if (mapRegisterCacheEnabled) {
266             LOG.info("Enabling Map-Register cache");
267         } else {
268             LOG.info("Disabling Map-Register cache");
269         }
270     }
271
272     @Override
273     public void close() throws Exception {
274         eventLoopGroup.shutdownGracefully();
275         sbRpcRegistration.close();
276         lispSouthboundHandler.close();
277         unloadActions();
278     }
279
280     @Override
281     public void stopModule() {
282         if (lispSouthboundHandler != null) {
283             lispSouthboundHandler.setNotificationProvider(null);
284             lispSouthboundHandler.setIsReadFromChannelEnabled(false);
285         }
286         if (lispXtrSouthboundHandler != null) {
287             lispXtrSouthboundHandler.setNotificationProvider(null);
288         }
289     }
290
291     @Override
292     public void startModule() {
293         if (lispSouthboundHandler != null) {
294             lispSouthboundHandler.setNotificationProvider(notificationPublishService);
295             lispSouthboundHandler.restoreDaoFromDatastore();
296             lispSouthboundHandler.setIsReadFromChannelEnabled(true);
297         }
298         if (lispXtrSouthboundHandler != null) {
299             lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
300         }
301     }
302 }