Clustering - adding to LispSouthboundPlugin.
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
index 4a260f2201f2f8871974f1180b1e7c3a3b4964d6..4557bb387cc097e9fa0d289b52ce342bff57ce23 100644 (file)
@@ -9,6 +9,9 @@
 package org.opendaylight.lispflowmapping.southbound;
 
 import static io.netty.buffer.Unpooled.wrappedBuffer;
+
+import com.google.common.base.Preconditions;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
@@ -22,68 +25,94 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ThreadFactory;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.lispflowmapping.clustering.ClusterNodeModulSwitcherImpl;
+import org.opendaylight.lispflowmapping.clustering.api.ClusterNodeModuleSwitcher;
 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundHandler;
 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundHandler;
 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.inet.binary.types.rev160303.IpAddressBinary;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.OdlLispSbService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.lisp.sb.config.rev150517.LispSbConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.net.InetAddresses;
-
-public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable {
+public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, ClusterNodeModuleSwitcher {
     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
 
     private static Object startLock = new Object();
+    private final ClusterNodeModulSwitcherImpl clusterNodeModulSwitcher;
     private LispSouthboundHandler lispSouthboundHandler;
     private LispXtrSouthboundHandler lispXtrSouthboundHandler;
     private NotificationPublishService notificationPublishService;
-    private RpcProviderRegistry rpcRegistry;
     private NioDatagramChannel channel;
     private volatile String bindingAddress = "0.0.0.0";
     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
     private volatile boolean listenOnXtrPort = false;
+    private boolean mapRegisterCacheEnabled = true;
     private RpcRegistration<OdlLispSbService> sbRpcRegistration;
     private NioDatagramChannel xtrChannel;
     private LispSouthboundStats statistics = new LispSouthboundStats();
+    private Bootstrap bootstrap = new Bootstrap();
+    private Bootstrap xtrBootstrap = new Bootstrap();
     private ThreadFactory threadFactory = new DefaultThreadFactory("lisp-sb");
     private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(0, threadFactory);
+    private DataBroker dataBroker;
 
+    public LispSouthboundPlugin(final DataBroker dataBroker,
+            final NotificationPublishService notificationPublishService,
+            final LispSbConfig lispSbConfig, final EntityOwnershipService entityOwnershipService) {
+        this.dataBroker = dataBroker;
+        this.notificationPublishService = notificationPublishService;
+        this.bindingAddress = lispSbConfig.getBindAddress();
+        this.mapRegisterCacheEnabled = lispSbConfig.isMapRegisterCache();
+        clusterNodeModulSwitcher = new ClusterNodeModulSwitcherImpl(entityOwnershipService);
+        clusterNodeModulSwitcher.setModule(this);
+    }
 
     public void init() {
         LOG.info("LISP (RFC6830) Southbound Plugin is initializing...");
-        final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
-
-        sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler);
-
         synchronized (startLock) {
             lispSouthboundHandler = new LispSouthboundHandler(this);
-            lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
+            lispSouthboundHandler.setDataBroker(dataBroker);
             lispSouthboundHandler.setNotificationProvider(this.notificationPublishService);
+            lispSouthboundHandler.setMapRegisterCacheEnabled(mapRegisterCacheEnabled);
+            lispSouthboundHandler.init();
+            lispSouthboundHandler.restoreDaoFromDatastore();
+
+            lispXtrSouthboundHandler = new LispXtrSouthboundHandler();
             lispXtrSouthboundHandler.setNotificationProvider(this.notificationPublishService);
 
+            bootstrap.group(eventLoopGroup);
+            bootstrap.channel(NioDatagramChannel.class);
+            bootstrap.handler(lispSouthboundHandler);
+
+            xtrBootstrap.group(eventLoopGroup);
+            xtrBootstrap.channel(NioDatagramChannel.class);
+            xtrBootstrap.handler(lispXtrSouthboundHandler);
+
             start();
             startXtr();
 
             LOG.info("LISP (RFC6830) Southbound Plugin is up!");
         }
+        clusterNodeModulSwitcher.switchModuleByEntityOwnership();
     }
 
     private void start() {
         try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(eventLoopGroup);
-            bootstrap.channel(NioDatagramChannel.class);
-            bootstrap.handler(lispSouthboundHandler);
             channel = (NioDatagramChannel) bootstrap.bind(bindingAddress, LispMessage.PORT_NUM).sync().channel();
+            LOG.debug("Binding LISP UDP listening socket to {}:{}", bindingAddress, LispMessage.PORT_NUM);
         } catch (Exception e) {
             LOG.error("Failed to open main socket ", e);
         }
@@ -92,11 +121,8 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
     private void startXtr() {
         if (listenOnXtrPort) {
             try {
-                Bootstrap xtrBootstrap = new Bootstrap();
-                xtrBootstrap.group(eventLoopGroup);
-                xtrBootstrap.channel(NioDatagramChannel.class);
-                xtrBootstrap.handler(lispXtrSouthboundHandler);
                 xtrChannel = (NioDatagramChannel) xtrBootstrap.bind(bindingAddress, xtrPort).sync().channel();
+                LOG.debug("Binding LISP xTR UDP listening socket to {}:{}", bindingAddress, xtrPort);
             } catch (Exception e) {
                 LOG.error("Failed to open xTR socket ", e);
             }
@@ -135,18 +161,9 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
         startXtr();
     }
 
-    public void setNotificationPublishService(NotificationPublishService notificationService) {
-        this.notificationPublishService = notificationService;
-    }
-
-    public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
-        this.rpcRegistry = rpcRegistry;
-    }
-
     private void unloadActions() {
         lispSouthboundHandler = null;
         lispXtrSouthboundHandler = null;
-        bindingAddress = "0.0.0.0";
 
         stop();
         stopXtr();
@@ -156,12 +173,17 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
 
     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer,
             final MessageType packetType) {
-        InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
-        InetSocketAddress recipient = new InetSocketAddress(ip, address.getPort().getValue());
-        // the wrappedBuffer() method doesn't copy data, so this conversion shouldn't hurt performance
-        ByteBuf data = wrappedBuffer(outBuffer.array());
+        InetAddress ip = getInetAddress(address);
+        handleSerializedLispBuffer(ip, outBuffer, packetType, address.getPort().getValue());
+    }
+
+    public void handleSerializedLispBuffer(InetAddress address, ByteBuffer outBuffer,
+            final MessageType packetType, final int portNumber) {
+        InetSocketAddress recipient = new InetSocketAddress(address, portNumber);
+        outBuffer.position(0);
+        ByteBuf data = wrappedBuffer(outBuffer);
         DatagramPacket packet = new DatagramPacket(data, recipient);
-        LOG.debug("Sending {} on port {} to address: {}", packetType, address.getPort().getValue(), ip);
+        LOG.debug("Sending {} on port {} to address: {}", packetType, portNumber, address);
         if (LOG.isTraceEnabled()) {
             LOG.trace("Buffer:\n{}", ByteBufUtil.prettyHexDump(data));
         }
@@ -180,6 +202,21 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
         channel.flush();
     }
 
+    private InetAddress getInetAddress(TransportAddress address) {
+        Preconditions.checkNotNull(address, "TransportAddress must not be null");
+        IpAddressBinary ip = address.getIpAddress();
+        try {
+            if (ip.getIpv4AddressBinary() != null) {
+                return InetAddress.getByAddress(ip.getIpv4AddressBinary().getValue());
+            } else if (ip.getIpv6AddressBinary() != null) {
+                return InetAddress.getByAddress(ip.getIpv6AddressBinary().getValue());
+            }
+        } catch (UnknownHostException e) {
+            LOG.debug("Could not convert TransportAddress {} to InetAddress", address, e);
+        }
+        return null;
+    }
+
     public LispSouthboundStats getStats() {
         return statistics;
     }
@@ -192,11 +229,13 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
             } else {
                 LOG.debug("Setting LISP binding address to {}", address);
                 bindingAddress = address;
-                try {
-                    restart();
-                    restartXtr();
-                } catch (Exception e) {
-                    LOG.error("Failed to set LISP binding address: ", e);
+                if (channel != null) {
+                    try {
+                        restart();
+                        restartXtr();
+                    } catch (Exception e) {
+                        LOG.error("Failed to set LISP binding address: ", e);
+                    }
                 }
             }
         }
@@ -221,10 +260,43 @@ public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCl
         }
     }
 
+    public void setMapRegisterCacheEnabled(final boolean mapRegisterCacheEnabled) {
+        this.mapRegisterCacheEnabled = mapRegisterCacheEnabled;
+        if (mapRegisterCacheEnabled) {
+            LOG.info("Enabling Map-Register cache");
+        } else {
+            LOG.info("Disabling Map-Register cache");
+        }
+    }
+
     @Override
     public void close() throws Exception {
-        unloadActions();
         eventLoopGroup.shutdownGracefully();
         sbRpcRegistration.close();
+        lispSouthboundHandler.close();
+        unloadActions();
+    }
+
+    @Override
+    public void stopModule() {
+        if (lispSouthboundHandler != null) {
+            lispSouthboundHandler.setNotificationProvider(null);
+            lispSouthboundHandler.setIsReadFromChannelEnabled(false);
+        }
+        if (lispXtrSouthboundHandler != null) {
+            lispXtrSouthboundHandler.setNotificationProvider(null);
+        }
+    }
+
+    @Override
+    public void startModule() {
+        if (lispSouthboundHandler != null) {
+            lispSouthboundHandler.setNotificationProvider(notificationPublishService);
+            lispSouthboundHandler.restoreDaoFromDatastore();
+            lispSouthboundHandler.setIsReadFromChannelEnabled(true);
+        }
+        if (lispXtrSouthboundHandler != null) {
+            lispXtrSouthboundHandler.setNotificationProvider(notificationPublishService);
+        }
     }
 }