Fix deprecation warnings around addAugmentation()
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / HwvtepOperGlobalListener.java
index 405a519d0893adf3ff878d1f509c7217e4057e1a..67e7cdac7916cd8a3e25c7c6e176e93d927628f2 100644 (file)
@@ -5,44 +5,51 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.hwvtepsouthbound;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Timer;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.ovsdb.utils.mdsal.utils.Scheduler;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HwvtepOperGlobalListener implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepOperGlobalListener.class);
+    private static final Map<InstanceIdentifier<Node>, ConnectionInfo> NODE_CONNECTION_INFO = new ConcurrentHashMap<>();
+    private static final Map<InstanceIdentifier<Node>, ScheduledFuture> TIMEOUT_FTS = new ConcurrentHashMap<>();
 
-    private final Timer timer = new Timer();
     private ListenerRegistration<HwvtepOperGlobalListener> registration;
     private final HwvtepConnectionManager hcm;
     private final DataBroker db;
-    private final Map<YangInstanceIdentifier, Node> connectedNodes = new ConcurrentHashMap<>();
+    private static final Map<InstanceIdentifier<Node>, List<Callable<Void>>> NODE_DELET_WAITING_JOBS
+            = new ConcurrentHashMap<>();
+    private static final Map<InstanceIdentifier<Node>, Node> CONNECTED_NODES = new ConcurrentHashMap<>();
+
 
-    HwvtepOperGlobalListener(DataBroker db, HwvtepConnectionManager hcm) {
+    HwvtepOperGlobalListener(final DataBroker db, HwvtepConnectionManager hcm) {
         LOG.info("Registering HwvtepOperGlobalListener");
         this.db = db;
         this.hcm = hcm;
@@ -51,7 +58,7 @@ public class HwvtepOperGlobalListener implements ClusteredDataTreeChangeListener
 
     private void registerListener() {
         final DataTreeIdentifier<Node> treeId =
-                        new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getWildcardPath());
+                        DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, getWildcardPath());
 
         registration = db.registerDataTreeChangeListener(treeId, HwvtepOperGlobalListener.this);
     }
@@ -64,58 +71,187 @@ public class HwvtepOperGlobalListener implements ClusteredDataTreeChangeListener
     }
 
     @Override
-    public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
-        changes.forEach(change -> {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
+        LOG.trace("onDataTreeChanged: ");
+        try {
+            connect(changes);
+            updated(changes);
+            disconnect(changes);
+        } catch (Exception e) {
+            LOG.error("Failed to handle dcn event ", e);
+        }
+    }
+
+    public static void runAfterTimeoutIfNodeNotCreated(InstanceIdentifier<Node> iid, Runnable job) {
+        ScheduledFuture<?> ft = TIMEOUT_FTS.get(iid);
+        if (ft != null) {
+            ft.cancel(false);
+        }
+        ft = Scheduler.getScheduledExecutorService().schedule(() -> {
+            TIMEOUT_FTS.remove(iid);
+            if (!NODE_CONNECTION_INFO.containsKey(iid)) {
+                job.run();
+            }
+        }, HwvtepSouthboundConstants.EOS_TIMEOUT, TimeUnit.SECONDS);
+        TIMEOUT_FTS.put(iid, ft);
+    }
+
+    public void runAfterNodeDeleted(InstanceIdentifier<Node> iid, Callable<Void> job) throws Exception {
+        synchronized (HwvtepOperGlobalListener.class) {
+            if (NODE_DELET_WAITING_JOBS.containsKey(iid)) {
+                LOG.error("Node present in the cache {} adding to delete queue", iid);
+                NODE_DELET_WAITING_JOBS.get(iid).add(job);
+                //Also delete the node so that reconciliation kicks in
+                deleteTheNodeOfOldConnection(iid, getNodeConnectionInfo(iid));
+                HwvtepSouthboundUtil.getScheduledExecutorService().schedule(() -> {
+                    runPendingJobs(iid);
+                }, HwvtepSouthboundConstants.HWVTEP_REGISTER_CALLBACKS_WAIT_TIMEOUT, TimeUnit.SECONDS);
+            } else {
+                LOG.info("Node not present in the cache {} running the job now", iid);
+                job.call();
+            }
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private synchronized void runPendingJobs(InstanceIdentifier<Node> iid) {
+        List<Callable<Void>> jobs = NODE_DELET_WAITING_JOBS.remove(iid);
+        if (jobs != null && !jobs.isEmpty()) {
+            jobs.forEach((job) -> {
+                try {
+                    LOG.error("Node disconnected job found {} running it now ", iid);
+                    job.call();
+                } catch (Exception e) {
+                    LOG.error("Failed to run callable ", e);
+                }
+            });
+            jobs.clear();
+        }
+    }
+
+    private void connect(Collection<DataTreeModification<Node>> changes) {
+        changes.forEach((change) -> {
             InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
             DataObjectModification<Node> mod = change.getRootNode();
-            InstanceIdentifier<Node> nodeIid = change.getRootPath().getRootIdentifier();
-            YangInstanceIdentifier entityId =
-                    HwvtepSouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(nodeIid);
             Node node = getCreated(mod);
-            if (node != null) {
-                connectedNodes.put(entityId, node);
+            if (node == null) {
+                return;
+            }
+            CONNECTED_NODES.put(key, node);
+            ScheduledFuture ft = TIMEOUT_FTS.remove(key);
+            if (ft != null) {
+                ft.cancel(false);
+            }
+            HwvtepGlobalAugmentation globalAugmentation = node.augmentation(HwvtepGlobalAugmentation.class);
+            if (globalAugmentation != null) {
+                ConnectionInfo connectionInfo = globalAugmentation.getConnectionInfo();
+                if (connectionInfo != null) {
+                    NODE_CONNECTION_INFO.put(key, connectionInfo);
+                }
             }
-            node = getRemoved(mod);
             if (node != null) {
-                connectedNodes.remove(entityId);
-                HwvtepConnectionInstance connectionInstance = hcm.getConnectionInstanceFromNodeIid(nodeIid);
-                if (Objects.equals(connectionInstance.getConnectionInfo().getRemotePort(),
-                        HwvtepSouthboundUtil.getRemotePort(node))) {
-                    //Oops some one deleted the node held by me This should never happen
-                    try {
-                        connectionInstance.refreshOperNode();
-                    } catch (ExecutionException | InterruptedException e) {
-                        LOG.error("Failed to refresh operational nodes ", e);
-                    }
+                synchronized (HwvtepOperGlobalListener.class) {
+                    NODE_DELET_WAITING_JOBS.putIfAbsent(key, new ArrayList<>());
                 }
+            }
+        });
+    }
 
+    private void updated(Collection<DataTreeModification<Node>> changes) {
+        changes.forEach((change) -> {
+            InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+            DataObjectModification<Node> mod = change.getRootNode();
+            Node node = getUpdated(mod);
+            if (node != null) {
+                CONNECTED_NODES.put(key, node);
             }
         });
     }
 
-    private Node getCreated(DataObjectModification<Node> mod) {
+    public static Node getNode(final InstanceIdentifier<Node> key) {
+        return CONNECTED_NODES.get(key);
+    }
+
+    private void disconnect(Collection<DataTreeModification<Node>> changes) {
+        changes.forEach((change) -> {
+            InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+            DataObjectModification<Node> mod = change.getRootNode();
+            Node node = getRemoved(mod);
+            if (node != null) {
+                CONNECTED_NODES.remove(key);
+                NODE_CONNECTION_INFO.remove(key);
+                synchronized (HwvtepOperGlobalListener.class) {
+                    runPendingJobs(key);
+                }
+            }
+        });
+    }
+
+    private static String getNodeId(InstanceIdentifier<Node> iid) {
+        return iid.firstKeyOf(Node.class).getNodeId().getValue();
+    }
+
+    public void scheduleOldConnectionNodeDelete(InstanceIdentifier<Node> iid) {
+        ConnectionInfo oldConnectionInfo = getNodeConnectionInfo(iid);
+        HwvtepSouthboundUtil.getScheduledExecutorService().schedule(() -> {
+            deleteTheNodeOfOldConnection(iid, oldConnectionInfo);
+        }, HwvtepSouthboundConstants.STALE_HWVTEP_CLEANUP_DELAY_SECS, TimeUnit.SECONDS);
+    }
+
+    private void deleteTheNodeOfOldConnection(InstanceIdentifier<Node> iid,
+                                                    ConnectionInfo oldConnectionInfo) {
+        if (oldConnectionInfo == null) {
+            return;
+        }
+        ConnectionInfo latestConnectionInfo = getNodeConnectionInfo(iid);
+        if (Objects.equals(latestConnectionInfo, oldConnectionInfo)) {
+            //Still old connection node is not deleted
+            LOG.debug("Delete Node {} from oper ", getNodeId(iid));
+            hcm.cleanupOperationalNode(iid);
+        }
+    }
+
+    private static ConnectionInfo getNodeConnectionInfo(InstanceIdentifier<Node> iid) {
+        return NODE_CONNECTION_INFO.get(iid);
+    }
+
+    private static Node getCreated(final DataObjectModification<Node> mod) {
         if (mod.getModificationType() == ModificationType.WRITE && mod.getDataBefore() == null) {
             return mod.getDataAfter();
         }
         return null;
     }
 
-    private Node getRemoved(DataObjectModification<Node> mod) {
+    private static Node getRemoved(final DataObjectModification<Node> mod) {
         if (mod.getModificationType() == ModificationType.DELETE) {
             return mod.getDataBefore();
         }
         return null;
     }
 
-    public Map<YangInstanceIdentifier, Node> getConnectedNodes() {
-        return Collections.unmodifiableMap(connectedNodes);
+    private static InstanceIdentifier<Node> getWildcardPath() {
+        return InstanceIdentifier.create(NetworkTopology.class)
+                .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
+                .child(Node.class);
     }
 
-    private InstanceIdentifier<Node> getWildcardPath() {
-        InstanceIdentifier<Node> path = InstanceIdentifier
-                        .create(NetworkTopology.class)
-                        .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
-                        .child(Node.class);
-        return path;
+    private Node getUpdated(DataObjectModification<Node> mod) {
+        Node node = null;
+        switch (mod.getModificationType()) {
+            case SUBTREE_MODIFIED:
+                node = mod.getDataAfter();
+                break;
+            case WRITE:
+                if (mod.getDataBefore() !=  null) {
+                    node = mod.getDataAfter();
+                }
+                break;
+            default:
+                break;
+        }
+        return node;
     }
+
+
 }