Bulk merge of l2gw changes
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / L2GatewayConnectionListener.java
index bf5aceec22331d565e5d1f20419f92c464a8c139..19071cf7b160285a08fc23bafb4a3ad67c46aadd 100644 (file)
@@ -10,13 +10,11 @@ package org.opendaylight.netvirt.elan.l2gw.listeners;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -26,37 +24,39 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
-import org.opendaylight.infrautils.metrics.Counter;
-import org.opendaylight.infrautils.metrics.Labeled;
-import org.opendaylight.infrautils.metrics.MetricDescriptor;
-import org.opendaylight.infrautils.metrics.MetricProvider;
 import org.opendaylight.infrautils.utils.concurrent.Executors;
 import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.DataObjectModification;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.netvirt.elan.cache.ConfigMcastCache;
+import org.opendaylight.netvirt.elan.cache.ItmExternalTunnelCache;
 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
 import org.opendaylight.netvirt.elan.utils.Scheduler;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
-import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
 import org.opendaylight.serviceutils.srm.RecoverableListener;
 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.physical._switch.attributes.TunnelIps;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -66,19 +66,21 @@ import org.slf4j.LoggerFactory;
 @Singleton
 public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener<L2gatewayConnection>
         implements RecoverableListener {
+
     private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
     private static final int MAX_READ_TRIALS = 120;
 
     private static final Function<Node, InstanceIdentifier<Node>> TO_GLOBAL_PATH =
-            HwvtepHAUtil::getGlobalNodePathFromPSNode;
+        HwvtepHAUtil::getGlobalNodePathFromPSNode;
 
     private static final Function<Node, InstanceIdentifier<Node>> TO_NODE_PATH =
         (node) -> HwvtepSouthboundUtils.createInstanceIdentifier(node.getNodeId());
 
     private static final Function<InstanceIdentifier<Node>, String> GET_DEVICE_NAME = HwvtepHAUtil::getPsName;
 
-    private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) ->
-            HwvtepHAUtil.getPsName(psIid) != null;
+    private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE = (psIid) -> {
+        return HwvtepHAUtil.getPsName(psIid) != null;
+    };
 
     private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
         HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
@@ -96,35 +98,73 @@ public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeC
     private final L2GatewayConnectionUtils l2GatewayConnectionUtils;
     private final Scheduler scheduler;
     private final L2GatewayCache l2GatewayCache;
-    private final Labeled<Labeled<Counter>> elanConnectionsCounter;
+    private final ConfigMcastCache configMcastCache;
+    private final L2GatewayListener l2GatewayListener;
+    private final ItmExternalTunnelCache itmExternalTunnelCache;
+    private final HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener;
+    private final ManagedNewTransactionRunner txRunner;
+
+    Map<InstanceIdentifier<Node>, Node> allNodes = null;
 
     @Inject
     public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
                                        Scheduler scheduler, L2GatewayCache l2GatewayCache,
-                                       MetricProvider metricProvider,
                                        final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
                                        final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
-                                       final ServiceRecoveryRegistry serviceRecoveryRegistry) {
+                                       final ServiceRecoveryRegistry serviceRecoveryRegistry,
+                                       ConfigMcastCache configMcastCache,
+                                       L2GatewayListener l2GatewayListener,
+                                       ItmExternalTunnelCache itmExternalTunnelCache,
+                                       HwvtepPhysicalSwitchListener hwvtepPhysicalSwitchListener) {
         super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
                 .child(L2gatewayConnections.class).child(L2gatewayConnection.class),
-                Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
+            Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
+        this.txRunner = new ManagedNewTransactionRunnerImpl(db);
         this.broker = db;
         this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
         this.scheduler = scheduler;
         this.l2GatewayCache = l2GatewayCache;
-        this.elanConnectionsCounter = metricProvider.newCounter(MetricDescriptor.builder()
-                .anchor(this).project("netvirt").module("l2gw").id("connections").build(), "modification", "elan");
+        this.configMcastCache = configMcastCache;
+        this.l2GatewayListener = l2GatewayListener;
+        this.itmExternalTunnelCache = itmExternalTunnelCache;
+        this.hwvtepPhysicalSwitchListener = hwvtepPhysicalSwitchListener;
         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
                 this);
         serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
                 this);
-        init();
     }
 
+    @PostConstruct
+    @SuppressWarnings("illegalcatch")
     public void init() {
-        loadL2GwDeviceCache(1);
-        LOG.trace("Loading l2gw connection cache");
-        loadL2GwConnectionCache();
+        ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(this.broker);
+        scheduler.getScheduledExecutorService().schedule(() -> {
+            txRunner.callWithNewReadOnlyTransactionAndClose(CONFIGURATION, tx -> {
+                try {
+                    LOG.trace("Loading l2gw device cache");
+                    loadL2GwDeviceCache(tx);
+                    LOG.trace("Loading l2gw Mcast cache");
+                    fillConfigMcastCache();
+                    LOG.trace("Loading l2gw connection cache");
+                    loadL2GwConnectionCache(tx);
+                } catch (Exception e) {
+                    LOG.error("Failed to load cache", e);
+                } finally {
+                    allNodes.clear();
+                    l2GatewayListener.registerListener();
+                    ///configMcastCache.registerListener(CONFIGURATION, broker);
+                    //itmExternalTunnelCache.registerListener(CONFIGURATION, broker);
+                    registerListener();
+                    hwvtepPhysicalSwitchListener.registerListener();
+                }
+            });
+        }, 1, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void register() {
+        LOG.info("Registering L2GatewayConnectionListener Override Method");
+        super.register();
     }
 
     @Override
@@ -148,9 +188,7 @@ public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeC
     @Override
     public void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
         LOG.trace("Adding L2gatewayConnection: {}", input);
-        elanConnectionsCounter
-                .label(DataObjectModification.ModificationType.WRITE.name())
-                .label(input.getNetworkId().getValue()).increment();
+
         // Get associated L2GwId from 'input'
         // Create logical switch in each of the L2GwDevices part of L2Gw
         // Logical switch name is network UUID
@@ -161,9 +199,7 @@ public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeC
     @Override
     public void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
         LOG.trace("Removing L2gatewayConnection: {}", input);
-        elanConnectionsCounter
-                .label(DataObjectModification.ModificationType.DELETE.name())
-                .label(input.getNetworkId().getValue()).increment();
+
         l2GatewayConnectionUtils.deleteL2GatewayConnection(input);
     }
 
@@ -173,42 +209,62 @@ public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeC
         LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
     }
 
-    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-            justification = "https://github.com/spotbugs/spotbugs/issues/811")
-    private void loadL2GwDeviceCache(final int trialNo) {
-        scheduler.getScheduledExecutorService().schedule(() -> {
-            if (trialNo == MAX_READ_TRIALS) {
-                LOG.error("Failed to read config topology");
-                return;
-            }
-            ReadTransaction tx = broker.newReadOnlyTransaction();
-            InstanceIdentifier<Topology> topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
-            Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback<Optional<Topology>>() {
-                @Override
-                public void onSuccess(Optional<Topology> topologyOptional) {
-                    if (topologyOptional != null && topologyOptional.isPresent()) {
-                        loadL2GwDeviceCache(new ArrayList<Node>(topologyOptional.get().nonnullNode().values()));
-                    }
-                    registerListener();
-                }
-
-                @Override
-                public void onFailure(Throwable throwable) {
-                    loadL2GwDeviceCache(trialNo + 1);
-                }
-            }, MoreExecutors.directExecutor());
-            tx.close();
-        }, 1, TimeUnit.SECONDS);
+    private void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
+        LOG.trace("L2GatewayConnectionListener Adding device to cache {}", psNode.getNodeId().getValue());
+        String deviceName = HwvtepHAUtil.getPsName(psIid);
+        List<TunnelIps> tunnelIps = new ArrayList<>(getTunnelIps(psNode));
+        if (tunnelIps != null) {
+            l2GatewayCache.updateL2GatewayCache(deviceName, globalNode.getNodeId().getValue(), tunnelIps);
+            LOG.info("L2GatewayConnectionListener Added device to cache {} {}",
+                    psNode.getNodeId().getValue(), tunnelIps);
+        } else {
+            LOG.error("L2GatewayConnectionListener Could not add device to l2gw cache no tunnel ip found {}",
+                    psNode.getNodeId().getValue());
+        }
     }
 
-    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-            justification = "https://github.com/spotbugs/spotbugs/issues/811")
-    private void loadL2GwDeviceCache(List<Node> nodes) {
-        if (nodes == null) {
-            LOG.debug("No config topology nodes are present");
+    private void fillConfigMcastCache() {
+        if (allNodes == null) {
             return;
         }
-        Map<InstanceIdentifier<Node>, Node> allNodes = nodes
+        //allNodes.entrySet().stream().map(entry -> entry);
+        allNodes.entrySet().stream()
+                .filter(entry -> entry.getValue().augmentation(HwvtepGlobalAugmentation.class) != null)
+                .filter(entry ->
+                        entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs() != null)
+                .forEach(entry -> {
+                    entry.getValue().augmentation(HwvtepGlobalAugmentation.class).getRemoteMcastMacs().values().stream()
+                            .forEach(mac -> {
+                                configMcastCache.added(getMacIid(entry.getKey(), mac), mac);
+                            });
+                });
+    }
+
+    private InstanceIdentifier<RemoteMcastMacs> getMacIid(InstanceIdentifier<Node> nodeIid, RemoteMcastMacs mac) {
+        return nodeIid.augmentation(HwvtepGlobalAugmentation.class).child(RemoteMcastMacs.class, mac.key());
+    }
+
+    public void loadL2GwConnectionCache(TypedReadTransaction<Configuration> tx) {
+        InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
+                .create(Neutron.class)
+                .child(L2gatewayConnections.class);
+        Optional<L2gatewayConnections> optional = Optional.empty();
+        try {
+            optional = tx.read(parentIid).get();
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("Exception while reading l2gwconnecton for populating Cache");
+        }
+        if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
+            LOG.trace("Found some connections to fill in l2gw connection cache");
+            optional.get().getL2gatewayConnection().values()
+                .forEach(connection -> {
+                    add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
+                });
+        }
+    }
+
+    private void loadL2GwDeviceCache(TypedReadTransaction tx) {
+        allNodes = (Map<InstanceIdentifier<Node>, Node>) readAllConfigNodes(tx)
                 .stream()
                 .collect(toMap(TO_NODE_PATH, Function.identity()));
 
@@ -222,12 +278,7 @@ public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeC
                 .collect(groupingBy(GET_DEVICE_NAME, toList()));
 
         //Process HA nodes
-        allNodes.values().stream()
-                .filter(IS_HA_PARENT_NODE)
-                .forEach(parentNode -> allIids.stream()
-                        .filter(IS_PS_NODE)
-                        .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
-                        .forEach(psIid -> addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid))));
+        createHANodes(allIids);
 
         //Process non HA nodes there will be only one ps node iid for each device for non ha nodes
         psNodesByDeviceName.values().stream()
@@ -242,42 +293,61 @@ public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeC
                 });
     }
 
-    public void loadL2GwConnectionCache() {
-        InstanceIdentifier<L2gatewayConnections> parentIid = InstanceIdentifier
-                .create(Neutron.class)
-                .child(L2gatewayConnections.class);
+    private void createHANodes(Set<InstanceIdentifier<Node>> allIids) {
+        allNodes.values().stream()
+                .filter(IS_HA_PARENT_NODE)
+                .forEach(parentNode -> {
+                    fillHACache(parentNode);
+                    allIids.stream()
+                            .filter(IS_PS_NODE)
+                            .filter(psIid -> PS_NODE_OF_PARENT_NODE.test(psIid, parentNode))
+                            .forEach(psIid -> {
+                                addL2DeviceToCache(psIid, parentNode, allNodes.get(psIid));
+                            });
+                });
+    }
 
-        Optional<L2gatewayConnections> optional = Optional.empty();
-        try {
-            optional = SingleTransactionDataBroker.syncReadOptional(broker, CONFIGURATION,
-                    parentIid);
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.error("loadL2GwConnectionCache: Exception while reading L2gatewayConnections DS", e);
+    private static void fillHACache(Node parentNode) {
+        InstanceIdentifier<Node> parentIid
+                = HwvtepHAUtil.convertToInstanceIdentifier(parentNode.getNodeId().getValue());
+        List<NodeId> childIids
+                = HwvtepHAUtil.getChildNodeIdsFromManagerOtherConfig(Optional.of(parentNode));
+        if (childIids != null) {
+            for (NodeId childid : childIids) {
+                InstanceIdentifier<Node> childIid
+                        = HwvtepHAUtil.convertToInstanceIdentifier(childid.getValue());
+                HwvtepHACache.getInstance().addChild(parentIid, childIid);
+            }
         }
-        if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
-            LOG.trace("Found some connections to fill in l2gw connection cache");
-            new ArrayList<>(optional.get().nonnullL2gatewayConnection().values())
-                    .forEach(connection -> {
-                        add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
-                    });
+    }
+
+    private Collection<TunnelIps> getTunnelIps(Node psNode) {
+        if (psNode.augmentation(PhysicalSwitchAugmentation.class) != null) {
+            return psNode.augmentation(PhysicalSwitchAugmentation.class).nonnullTunnelIps().values();
         }
+        return Collections.EMPTY_LIST;
     }
 
-    void addL2DeviceToCache(InstanceIdentifier<Node> psIid, Node globalNode, Node psNode) {
-        LOG.trace("Adding device to cache {}", psNode.getNodeId().getValue());
-        String deviceName = HwvtepHAUtil.getPsName(psIid);
-        L2GatewayDevice l2GwDevice = l2GatewayCache.addOrGet(deviceName);
-        l2GwDevice.setConnected(true);
-        l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue());
+    private List<Node> readAllConfigNodes(TypedReadTransaction<Configuration> tx) {
 
-        List<TunnelIps> tunnelIps = psNode.augmentation(PhysicalSwitchAugmentation.class) != null
-                ? new ArrayList<>(psNode.augmentation(PhysicalSwitchAugmentation.class)
-            .nonnullTunnelIps().values()) : null;
-        if (tunnelIps != null) {
-            for (TunnelIps tunnelIp : tunnelIps) {
-                IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();
-                l2GwDevice.addTunnelIp(tunnelIpAddr);
+
+        int trialNo = 1;
+        Optional<Topology> topologyOptional = Optional.empty();
+        do {
+            try {
+                topologyOptional = tx.read(HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier()).get();
+                break;
+            } catch (ExecutionException | InterruptedException e) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    LOG.trace("Sleep interrupted");
+                }
             }
+        } while (trialNo++ < MAX_READ_TRIALS);
+        if (topologyOptional != null && topologyOptional.isPresent() && topologyOptional.get().getNode() != null) {
+            return  new ArrayList<>(topologyOptional.get().nonnullNode().values());
         }
+        return Collections.EMPTY_LIST;
     }
 }