Eliminate circular dependencies and convert to BP annotations in elanmanager
[netvirt.git] / vpnservice / elanmanager / elanmanager-impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / HwvtepTerminationPointListener.java
index 7b088a9dd1dff107e1c40d4a8771ae767ec931a2..a90aa3e84516e8517a2c5ccb18c97a067e65a024 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,27 +7,31 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
-import com.google.common.collect.Lists;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
+import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.datastoreutils.AsyncClusteredDataChangeListenerBase;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepClusteredDataTreeChangeListener;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
 import org.opendaylight.genius.utils.hwvtep.HwvtepUtils;
 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
+import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
-import org.opendaylight.netvirt.elan.utils.ElanUtils;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.utils.L2GatewayCacheUtils;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.attributes.Devices;
@@ -43,7 +47,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,25 +55,26 @@ import org.slf4j.LoggerFactory;
 /**
  * Listener for physical locator presence in operational datastore.
  */
+@Singleton
 public class HwvtepTerminationPointListener
-        extends AsyncClusteredDataChangeListenerBase<TerminationPoint, HwvtepTerminationPointListener>
-        implements AutoCloseable {
+        extends HwvtepClusteredDataTreeChangeListener<TerminationPoint, HwvtepTerminationPointListener> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepTerminationPointListener.class);
 
-    private DataBroker broker;
-    private ListenerRegistration<DataChangeListener> lstnerRegistration;
+    private final DataBroker broker;
     private final ElanL2GatewayUtils elanL2GatewayUtils;
-    private final EntityOwnershipService entityOwnershipService;
+    private final ElanClusterUtils elanClusterUtils;
 
-    public HwvtepTerminationPointListener(DataBroker broker, ElanUtils elanUtils,
-                                          EntityOwnershipService entityOwnershipService) {
+    @Inject
+    public HwvtepTerminationPointListener(DataBroker broker, ElanL2GatewayUtils elanL2GatewayUtils,
+            ElanClusterUtils elanClusterUtils) {
         super(TerminationPoint.class, HwvtepTerminationPointListener.class);
 
         this.broker = broker;
-        this.elanL2GatewayUtils = elanUtils.getElanL2GatewayUtils();
-        this.entityOwnershipService = entityOwnershipService;
-        registerListener();
+        this.elanL2GatewayUtils = elanL2GatewayUtils;
+        this.elanClusterUtils = elanClusterUtils;
+        //No longer needed as port reconciliation is added in plugin
+        //registerListener(LogicalDatastoreType.OPERATIONAL, broker);
         LOG.debug("created HwvtepTerminationPointListener");
     }
 
@@ -78,82 +82,60 @@ public class HwvtepTerminationPointListener
     static Map<InstanceIdentifier<TerminationPoint>, Boolean> teps = new ConcurrentHashMap<>();
 
     public static void runJobAfterPhysicalLocatorIsAvialable(InstanceIdentifier<TerminationPoint> key,
-            Runnable runnable) {
+                                                             Runnable runnable) {
         if (teps.get(key) != null) {
             LOG.debug("physical locator already available {} running job ", key);
             runnable.run();
             return;
         }
         synchronized (HwvtepTerminationPointListener.class) {
-            List<Runnable> list = waitingJobsList.get(key);
-            if (list == null) {
-                waitingJobsList.put(key, Lists.newArrayList(runnable));
-            } else {
-                list.add(runnable);
-            }
+            waitingJobsList.computeIfAbsent(key, k -> new ArrayList<>()).add(runnable);
             LOG.debug("added the job to wait list of physical locator {}", key);
         }
     }
 
-    protected void registerListener() {
-        try {
-            lstnerRegistration = this.broker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
-                    InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
-                            new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
-                    .child(TerminationPoint.class), this, DataChangeScope.BASE);
-        } catch (final Exception e) {
-            LOG.error("Hwvtep LocalUcasMacs DataChange listener registration failed !", e);
-            throw new IllegalStateException("Hwvtep LocalUcasMacs DataChange listener registration failed .", e);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (lstnerRegistration != null) {
-            try {
-                lstnerRegistration.close();
-            } catch (final Exception e) {
-                LOG.error("Error when cleaning up DataChangeListener.", e);
-            }
-            lstnerRegistration = null;
-        }
-    }
-
     @Override
-    protected void remove(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint del) {
+    protected void removed(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint del) {
         LOG.trace("physical locator removed {}", identifier);
         teps.remove(identifier);
+        final HwvtepPhysicalPortAugmentation portAugmentation =
+                del.getAugmentation(HwvtepPhysicalPortAugmentation.class);
+        if (portAugmentation != null) {
+            final NodeId nodeId = identifier.firstIdentifierOf(Node.class).firstKeyOf(Node.class).getNodeId();
+            elanClusterUtils.runOnlyInOwnerNode(HwvtepSouthboundConstants.ELAN_ENTITY_NAME,
+                "Handling Physical port delete",
+                () -> handlePortDeleted(identifier, portAugmentation, del, nodeId));
+            return;
+        }
     }
 
     @Override
-    protected void update(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint original,
-            TerminationPoint update) {
+    protected void updated(InstanceIdentifier<TerminationPoint> identifier, TerminationPoint original,
+                           TerminationPoint update) {
         LOG.trace("physical locator available {}", identifier);
     }
 
     @Override
-    protected void add(InstanceIdentifier<TerminationPoint> identifier, final TerminationPoint add) {
+    protected void added(InstanceIdentifier<TerminationPoint> identifier, final TerminationPoint add) {
         final HwvtepPhysicalPortAugmentation portAugmentation =
                 add.getAugmentation(HwvtepPhysicalPortAugmentation.class);
         if (portAugmentation != null) {
             final NodeId nodeId = identifier.firstIdentifierOf(Node.class).firstKeyOf(Node.class).getNodeId();
-            ElanClusterUtils.runOnlyInLeaderNode(entityOwnershipService, HwvtepSouthboundConstants.ELAN_ENTITY_NAME,
-                    "handling Physical Switch add", () -> handlePortAdded(portAugmentation, add, nodeId));
+            elanClusterUtils.runOnlyInOwnerNode(HwvtepSouthboundConstants.ELAN_ENTITY_NAME,
+                () -> handlePortAdded(portAugmentation, add, nodeId));
             return;
         }
 
         LOG.trace("physical locator available {}", identifier);
         teps.put(identifier, true);
-        List<Runnable> runnableList = null;
+        List<Runnable> runnableList;
         synchronized (HwvtepTerminationPointListener.class) {
             runnableList = waitingJobsList.get(identifier);
             waitingJobsList.remove(identifier);
         }
         if (runnableList != null) {
             LOG.debug("physical locator available {} running jobs ", identifier);
-            for (Runnable r : runnableList) {
-                r.run();
-            }
+            runnableList.forEach(Runnable::run);
         } else {
             LOG.debug("no jobs are waiting for physical locator {}", identifier);
         }
@@ -161,22 +143,18 @@ public class HwvtepTerminationPointListener
 
     @Override
     protected InstanceIdentifier<TerminationPoint> getWildCardPath() {
-        return InstanceIdentifier.create(NetworkTopology.class).child(Topology.class).child(Node.class)
+        return InstanceIdentifier.create(NetworkTopology.class)
+                .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID)).child(Node.class)
                 .child(TerminationPoint.class);
     }
 
     @Override
-    protected ClusteredDataChangeListener getDataChangeListener() {
-        return HwvtepTerminationPointListener.this;
-    }
-
-    @Override
-    protected DataChangeScope getDataChangeScope() {
-        return DataChangeScope.BASE;
+    protected HwvtepTerminationPointListener getDataTreeChangeListener() {
+        return this;
     }
 
     private List<ListenableFuture<Void>> handlePortAdded(HwvtepPhysicalPortAugmentation portAugmentation,
-            TerminationPoint portAdded, NodeId psNodeId) {
+                                                         TerminationPoint portAdded, NodeId psNodeId) {
         Node psNode = HwvtepUtils.getHwVtepNode(broker, LogicalDatastoreType.OPERATIONAL, psNodeId);
         if (psNode != null) {
             String psName = psNode.getAugmentation(PhysicalSwitchAugmentation.class).getHwvtepNodeName().getValue();
@@ -185,15 +163,12 @@ public class HwvtepTerminationPointListener
                 if (isL2GatewayConfigured(l2GwDevice)) {
                     List<L2gatewayConnection> l2GwConns = L2GatewayConnectionUtils.getAssociatedL2GwConnections(broker,
                             l2GwDevice.getL2GatewayIds());
-                    if (l2GwConns != null) {
-                        String newPortId = portAdded.getTpId().getValue();
-                        NodeId hwvtepNodeId = new NodeId(l2GwDevice.getHwvtepNodeId());
-                        List<VlanBindings> vlanBindings = getVlanBindings(l2GwConns, hwvtepNodeId, psName, newPortId);
-                        List<ListenableFuture<Void>> futures = new ArrayList<>();
-                        futures.add(elanL2GatewayUtils.updateVlanBindingsInL2GatewayDevice(hwvtepNodeId, psName,
-                                newPortId, vlanBindings));
-                        return futures;
-                    }
+                    String newPortId = portAdded.getTpId().getValue();
+                    NodeId hwvtepNodeId = new NodeId(l2GwDevice.getHwvtepNodeId());
+                    List<VlanBindings> vlanBindings = getVlanBindings(l2GwConns, hwvtepNodeId, psName, newPortId);
+                    return Collections.singletonList(
+                            elanL2GatewayUtils.updateVlanBindingsInL2GatewayDevice(hwvtepNodeId, psName, newPortId,
+                                    vlanBindings));
                 }
             } else {
                 LOG.error("{} details are not present in L2Gateway Cache", psName);
@@ -204,8 +179,32 @@ public class HwvtepTerminationPointListener
         return Collections.emptyList();
     }
 
+    private List<ListenableFuture<Void>> handlePortDeleted(InstanceIdentifier<TerminationPoint> identifier,
+                                                           HwvtepPhysicalPortAugmentation portAugmentation,
+                                                           TerminationPoint portDeleted,
+                                                           NodeId psNodeId) throws ReadFailedException {
+        InstanceIdentifier<Node> psNodeIid = identifier.firstIdentifierOf(Node.class);
+        final ReadWriteTransaction tx = broker.newReadWriteTransaction();
+        final SettableFuture settableFuture = SettableFuture.create();
+        List<ListenableFuture<Void>> futures = Collections.singletonList(settableFuture);
+        Futures.addCallback(tx.read(LogicalDatastoreType.CONFIGURATION, psNodeIid),
+                new SettableFutureCallback(settableFuture) {
+                    @Override
+                    public void onSuccess(@Nonnull Object resultNode) {
+                        Optional<Node> nodeOptional = (Optional<Node>) resultNode;
+                        if (nodeOptional.isPresent()) {
+                            //case of port deleted
+                            tx.delete(LogicalDatastoreType.CONFIGURATION, identifier);
+                            Futures.addCallback(tx.submit(), new SettableFutureCallback(settableFuture),
+                                                MoreExecutors.directExecutor());
+                        }
+                    }
+                }, MoreExecutors.directExecutor());
+        return futures;
+    }
+
     private List<VlanBindings> getVlanBindings(List<L2gatewayConnection> l2GwConns, NodeId hwvtepNodeId, String psName,
-            String newPortId) {
+                                               String newPortId) {
         List<VlanBindings> vlanBindings = new ArrayList<>();
         for (L2gatewayConnection l2GwConn : l2GwConns) {
             L2gateway l2Gateway = L2GatewayConnectionUtils.getNeutronL2gateway(broker, l2GwConn.getL2gatewayId());
@@ -244,7 +243,7 @@ public class HwvtepTerminationPointListener
     }
 
     private boolean isL2GatewayConfigured(L2GatewayDevice l2GwDevice) {
-        return l2GwDevice.getHwvtepNodeId() != null && l2GwDevice.isConnected()
-                && l2GwDevice.getL2GatewayIds().size() > 0 && l2GwDevice.getTunnelIp() != null;
+        return l2GwDevice.getHwvtepNodeId() != null
+                && !l2GwDevice.getL2GatewayIds().isEmpty() && l2GwDevice.getTunnelIp() != null;
     }
 }