Bulk merge of l2gw changes
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / ha / listeners / HAOpClusteredListener.java
index a49ee98ea05237c45c8d761514afc3dd77d14bf1..dc46d5e72084b4c24418cad19b457cd1fab2a945 100644 (file)
@@ -9,9 +9,12 @@ package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
 
 import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -20,40 +23,47 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
-import org.opendaylight.infrautils.metrics.MetricProvider;
+import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.mdsal.binding.api.DataObjectModification;
-import org.opendaylight.mdsal.binding.api.ReadTransaction;
 import org.opendaylight.mdsal.binding.util.Datastore.Operational;
 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.netvirt.elan.l2gw.MdsalEvent;
 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
 import org.opendaylight.serviceutils.srm.RecoverableListener;
 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
-public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
-        implements ClusteredDataTreeChangeListener<Node>, RecoverableListener {
-    private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
+public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational> implements
+        ClusteredDataTreeChangeListener<Node>, RecoverableListener {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
     private final Set<InstanceIdentifier<Node>> connectedNodes = ConcurrentHashMap.newKeySet();
     private final Map<InstanceIdentifier<Node>, Set<Consumer<Optional<Node>>>> waitingJobs = new ConcurrentHashMap<>();
+    private final IdManagerService idManager;
 
     @Inject
-    public HAOpClusteredListener(DataBroker db, HwvtepNodeHACache hwvtepNodeHACache,
-                                 MetricProvider metricProvider,
+    public HAOpClusteredListener(DataBroker db,
                                  final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
-                                 final ServiceRecoveryRegistry serviceRecoveryRegistry) throws Exception {
-        super(OPERATIONAL, db, hwvtepNodeHACache, metricProvider, false);
-        LOG.info("Registering HAOpClusteredListener");
+                                 final ServiceRecoveryRegistry serviceRecoveryRegistry,
+                                 final IdManagerService idManager) throws Exception {
+        super(OPERATIONAL, db);
+        this.idManager = idManager;
         serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(), this);
+        ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(db);
+    }
+
+    public Set<InstanceIdentifier<Node>> getConnectedNodes() {
+        return connectedNodes;
     }
 
     @Override
@@ -63,7 +73,7 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
             LOG.info("Registering HAOpClusteredListener");
             registerListener(OPERATIONAL, getDataBroker());
         } catch (Exception e) {
-            LOG.error("HA OP Clustered register listener error.");
+            LOG.error("HA OP Clustered register listener error.", e);
         }
 
     }
@@ -73,35 +83,31 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
         super.close();
     }
 
-    public Set<InstanceIdentifier<Node>> getConnectedNodes() {
-        return connectedNodes;
-    }
-
     @Override
     synchronized  void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added,
             TypedReadWriteTransaction<Operational> tx)  {
         connectedNodes.remove(key);
-        getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
+        hwvtepHACache.updateDisconnectedNodeStatus(key);
     }
 
     @Override
     void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<Operational> tx)  {
         connectedNodes.remove(key);
-        getHwvtepNodeHACache().updateDisconnectedNodeStatus(key);
+        hwvtepHACache.updateDisconnectedNodeStatus(key);
     }
 
     @Override
     void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, TypedReadWriteTransaction<Operational> tx)    {
         connectedNodes.add(key);
-        getHwvtepNodeHACache().updateConnectedNodeStatus(key);
+        hwvtepHACache.updateConnectedNodeStatus(key);
     }
 
     @Override
-    public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated,
-            TypedReadWriteTransaction<Operational> tx) {
-        connectedNodes. add(key);
-        HwvtepHAUtil.addToCacheIfHAChildNode(key, updated, getHwvtepNodeHACache());
-        getHwvtepNodeHACache().updateConnectedNodeStatus(key);
+    public synchronized void onGlobalNodeAdd(InstanceIdentifier<Node> key,
+        Node updated, TypedReadWriteTransaction<Operational> tx) {
+        connectedNodes.add(key);
+        addToCacheIfHAChildNode(key, updated);
+        hwvtepHACache.updateConnectedNodeStatus(key);
         if (waitingJobs.containsKey(key) && !waitingJobs.get(key).isEmpty()) {
             try {
                 HAJobScheduler jobScheduler = HAJobScheduler.getInstance();
@@ -110,6 +116,7 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
                     waitingJobs.get(key).forEach(
                         (waitingJob) -> jobScheduler.submitJob(() -> waitingJob.accept(nodeOptional)));
                     waitingJobs.get(key).clear();
+                    hwvtepHACache.addDebugEvent(new MdsalEvent("Waiting jobs of node are run ", getNodeId(key)));
                 } else {
                     LOG.error("Failed to read oper node {}", key);
                 }
@@ -119,18 +126,28 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
         }
     }
 
+    public static void addToCacheIfHAChildNode(InstanceIdentifier<Node> childPath, Node childNode) {
+        String haId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(childNode);
+        if (!Strings.isNullOrEmpty(haId)) {
+            InstanceIdentifier<Node> parentId = HwvtepHAUtil.createInstanceIdentifierFromHAId(haId);
+            //HwvtepHAUtil.updateL2GwCacheNodeId(childNode, parentId);
+            hwvtepHACache.addChild(parentId, childPath/*child*/);
+        }
+    }
+
     @Override
     void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
                             Node updatedChildNode,
                             Node beforeChildNode,
                             DataObjectModification<Node> mod,
                             TypedReadWriteTransaction<Operational> tx) {
-        boolean wasHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
+        boolean wasHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
         addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode);
-        boolean isHAChild = getHwvtepNodeHACache().isHAEnabledDevice(childPath);
+        boolean isHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
 
 
         if (!wasHAChild && isHAChild) {
+            hwvtepHACache.addDebugEvent(new MdsalEvent(getNodeId(childPath), "became ha child"));
             LOG.debug("{} became ha_child", getNodeId(childPath));
         } else if (wasHAChild && !isHAChild) {
             LOG.debug("{} unbecome ha_child", getNodeId(childPath));
@@ -146,6 +163,38 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
         return nodeId;
     }
 
+    /**
+     * If Normal non-ha node changes to HA node , its added to HA cache.
+     *
+     * @param childPath HA child path which got converted to HA node
+     * @param updatedChildNode updated Child node
+     * @param beforeChildNode non-ha node before updated to HA node
+     */
+    public static void addToHACacheIfBecameHAChild(InstanceIdentifier<Node> childPath,
+                                                   Node updatedChildNode,
+                                                   Node beforeChildNode) {
+        HwvtepGlobalAugmentation updatedAugmentaion = updatedChildNode.augmentation(HwvtepGlobalAugmentation.class);
+        HwvtepGlobalAugmentation beforeAugmentaion = null;
+        if (beforeChildNode != null) {
+            beforeAugmentaion = beforeChildNode.augmentation(HwvtepGlobalAugmentation.class);
+        }
+        Collection<Managers> up = null;
+        Collection<Managers> be = null;
+        if (updatedAugmentaion != null) {
+            up = updatedAugmentaion.nonnullManagers().values();
+        }
+        if (beforeAugmentaion != null) {
+            be = beforeAugmentaion.nonnullManagers().values();
+        }
+        if (up != null) {
+            if (!Objects.equals(up, be)) {
+                LOG.info("Manager entry updated for node {} ", updatedChildNode.getNodeId().getValue());
+                addToCacheIfHAChildNode(childPath, updatedChildNode);
+            }
+            //TODO handle unhaed case
+        }
+    }
+
     public Set<InstanceIdentifier<Node>> getConnected(Set<InstanceIdentifier<Node>> candidateds) {
         if (candidateds == null) {
             return Collections.emptySet();
@@ -158,15 +207,18 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener<Operational>
     public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
         if (connectedNodes.contains(iid)) {
             HAJobScheduler.getInstance().submitJob(() -> {
-                try (ReadTransaction tx = getDataBroker().newReadOnlyTransaction()) {
-                    consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).get());
-                } catch (InterruptedException | ExecutionException e) {
-                    LOG.error("Failed to read oper ds {}", iid);
-                }
+                txRunner.callWithNewReadOnlyTransactionAndClose(OPERATIONAL, tx -> {
+                    try {
+                        consumer.accept(tx.read(iid).get());
+                    } catch (ExecutionException | InterruptedException e) {
+                        LOG.error("Failed job run after node {}", iid);
+                    }
+                });
             });
         } else {
             waitingJobs.computeIfAbsent(iid, key -> Sets.newConcurrentHashSet()).add(consumer);
         }
+        hwvtepHACache.addDebugEvent(new MdsalEvent("Job waiting for ", getNodeId(iid)));
     }
 }