Bug 6589 adding support for hwvtep devices ha 42/48342/3
authorAkash <a.k.sahu@ericsson.com>
Tue, 8 Nov 2016 07:29:53 +0000 (12:59 +0530)
committerVishal Thapar <vishal.thapar@ericsson.com>
Fri, 25 Nov 2016 11:12:44 +0000 (11:12 +0000)
Added listners for corresponding mergers and commands added earlier.
Added Handlers to handle event triggred from listners.
Listners -
HAConfigNodeListener : Listenes to HA node on config data tree and
add/delete/update corresponding HA child config node when triggred.
HAOpClusteredListener : Listenes to HA node on operational data tree in
cluster and update the cache based on connect/reconnect of HA node or
update of non-node to HA node .
HAOpNodeLsitener : Listners to HA child node on operational data tree and
add/delete/update corresponding HA node Operational when triggred .
HwvtepNodeBaseListener : Basic listners which is extended by rest listners
specially for HA feature .
Handlers -
ConfigNodeUpdatedHandler : Handles event from HAConfigNodeListener such as
copying HA ps node data to HA child ps node and HA global Node data to HA
child global node in Config data tree .
OpNodeUpdatedHandler : Handles event from  HAOpNodeLsitener such as copying
HA child ps node data to HA ps node and HA child global Node data to HA
global node in Operational data tree .

Change-Id: I01789c7d783b104095c914ea95d14b52f720a051
Signed-off-by: Akash <a.k.sahu@ericsson.com>
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/ConfigNodeUpdatedHandler.java [new file with mode: 0644]
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/HAEventHandler.java [new file with mode: 0644]
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/OpNodeUpdatedHandler.java [new file with mode: 0644]
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAConfigNodeListener.java [new file with mode: 0644]
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpClusteredListener.java [new file with mode: 0644]
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpNodeListener.java [new file with mode: 0644]
vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HwvtepNodeBaseListener.java [new file with mode: 0644]

diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/ConfigNodeUpdatedHandler.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/ConfigNodeUpdatedHandler.java
new file mode 100644 (file)
index 0000000..1e14636
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.handlers;
+
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.GlobalAugmentationMerger;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.GlobalNodeMerger;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.PSAugmentationMerger;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.PSNodeMerger;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class ConfigNodeUpdatedHandler {
+    GlobalAugmentationMerger globalAugmentationMerger = GlobalAugmentationMerger.getInstance();
+    PSAugmentationMerger psAugmentationMerger = PSAugmentationMerger.getInstance();
+    GlobalNodeMerger globalNodeMerger = GlobalNodeMerger.getInstance();
+    PSNodeMerger psNodeMerger = PSNodeMerger.getInstance();
+
+    /**
+     * Copy updated data from HA node to child node of config data tree.
+     *
+     * @param haUpdated HA node updated
+     * @param haOriginal HA node original
+     * @param haChildNodeId HA child node which needs to be updated
+     * @param tx Transaction
+     * @throws ReadFailedException  Exception thrown if read fails
+     * @throws ExecutionException  Exception thrown if Execution fail
+     * @throws InterruptedException Thread interrupted Exception
+     */
+    public void copyHAGlobalUpdateToChild(Node haUpdated,
+                                          Node haOriginal,
+                                          InstanceIdentifier<Node> haChildNodeId,
+                                          ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+
+        Node existingNode = HwvtepHAUtil.readNode(tx, LogicalDatastoreType.CONFIGURATION, haChildNodeId);
+        HwvtepGlobalAugmentation updatedGlobal = HwvtepHAUtil.getGlobalAugmentationOfNode(haUpdated);
+        HwvtepGlobalAugmentation origGlobal = HwvtepHAUtil.getGlobalAugmentationOfNode(haOriginal);
+        HwvtepGlobalAugmentation existingData = HwvtepHAUtil.getGlobalAugmentationOfNode(existingNode);
+
+        globalAugmentationMerger.mergeConfigUpdate(existingData, updatedGlobal, origGlobal, haChildNodeId, tx);
+        globalNodeMerger.mergeConfigUpdate(existingNode, haUpdated, haOriginal, haChildNodeId, tx);
+    }
+
+    /**
+     * Copy HA ps node update to HA child ps node of config data tree.
+     *
+     * @param haUpdated HA node updated
+     * @param haOriginal HA node original
+     * @param haChildNodeId HA child node which needs to be updated
+     * @param tx Transaction
+     * @throws ReadFailedException  Exception thrown if read fails
+     * @throws ExecutionException  Exception thrown if Execution fail
+     * @throws InterruptedException Thread interrupted Exception
+     */
+    public void copyHAPSUpdateToChild(Node haUpdated,
+                                      Node haOriginal,
+                                      InstanceIdentifier<Node> haChildNodeId,
+                                      ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+
+        Node existingNode = HwvtepHAUtil.readNode(tx, LogicalDatastoreType.CONFIGURATION, haChildNodeId);
+
+        PhysicalSwitchAugmentation updated = HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(haUpdated);
+        PhysicalSwitchAugmentation orig = HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(haOriginal);
+        PhysicalSwitchAugmentation existingData = HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(existingNode);
+
+        psAugmentationMerger.mergeConfigUpdate(existingData, updated, orig, haChildNodeId, tx);
+        psNodeMerger.mergeConfigUpdate(existingNode, haUpdated, haOriginal, haChildNodeId, tx);
+    }
+
+}
diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/HAEventHandler.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/HAEventHandler.java
new file mode 100644 (file)
index 0000000..fbaea99
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.handlers;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class HAEventHandler implements IHAEventHandler {
+
+    NodeConnectedHandler nodeConnectedHandler;
+    ConfigNodeUpdatedHandler configNodeUpdatedHandler = new ConfigNodeUpdatedHandler();
+    OpNodeUpdatedHandler opNodeUpdatedHandler = new OpNodeUpdatedHandler();
+    DataBroker db;
+
+    public HAEventHandler(DataBroker db) {
+        this.db = db;
+        nodeConnectedHandler = new NodeConnectedHandler(db);
+    }
+
+    public void close() throws Exception {
+    }
+
+    @Override
+    public void handleChildNodeConnected(Node connectedNode,
+                                         InstanceIdentifier<Node> connectedNodePath,
+                                         InstanceIdentifier<Node> haNodePath,
+                                         ReadWriteTransaction tx)
+            throws ReadFailedException, ExecutionException, InterruptedException {
+        if (haNodePath == null) {
+            return;
+        }
+        nodeConnectedHandler.handleNodeConnected(connectedNode, connectedNodePath, haNodePath,
+                Optional.<Node>absent(), Optional.<Node>absent(), tx);
+    }
+
+    @Override
+    public void handleChildNodeReConnected(Node connectedNode,
+                                           InstanceIdentifier<Node> connectedNodePath,
+                                           InstanceIdentifier<Node> haNodePath,
+                                           Optional<Node> haGlobalCfg,
+                                           Optional<Node> haPSCfg,
+                                           ReadWriteTransaction tx)
+            throws ReadFailedException, ExecutionException, InterruptedException {
+        if (haNodePath == null) {
+            return;
+        }
+        nodeConnectedHandler.handleNodeConnected(connectedNode, connectedNodePath, haNodePath,
+                haGlobalCfg, haPSCfg, tx);
+    }
+
+    @Override
+    public void copyChildGlobalOpUpdateToHAParent(Node updatedSrcNode,
+                                                  Node origSrcNode,
+                                                  InstanceIdentifier<Node> haPath,
+                                                  ReadWriteTransaction tx) throws ReadFailedException {
+        if (haPath == null) {
+            return;
+        }
+        opNodeUpdatedHandler.copyChildGlobalOpUpdateToHAParent(updatedSrcNode, origSrcNode, haPath, tx);
+    }
+
+    @Override
+    public void copyChildPsOpUpdateToHAParent(Node updatedSrcPSNode,
+                                              Node origSrcPSNode,
+                                              InstanceIdentifier<Node> haPath,
+                                              ReadWriteTransaction tx) throws ReadFailedException {
+        if (haPath == null) {
+            return;
+        }
+        opNodeUpdatedHandler.copyChildPsOpUpdateToHAParent(updatedSrcPSNode, origSrcPSNode, haPath, tx);
+    }
+
+    @Override
+    public void copyHAPSUpdateToChild(Node haUpdated,
+                                      Node haOriginal,
+                                      InstanceIdentifier<Node> haChildNodeId,
+                                      ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+        if (haChildNodeId == null) {
+            return;
+        }
+        configNodeUpdatedHandler.copyHAPSUpdateToChild(haUpdated, haOriginal, haChildNodeId, tx);
+    }
+
+    @Override
+    public void copyHAGlobalUpdateToChild(Node haUpdated,
+                                          Node haOriginal,
+                                          InstanceIdentifier<Node> haChildNodeId,
+                                          ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+        if (haChildNodeId == null) {
+            return;
+        }
+        configNodeUpdatedHandler.copyHAGlobalUpdateToChild(haUpdated, haOriginal, haChildNodeId, tx);
+    }
+
+}
diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/OpNodeUpdatedHandler.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/OpNodeUpdatedHandler.java
new file mode 100644 (file)
index 0000000..468874c
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.handlers;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.GlobalAugmentationMerger;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.GlobalNodeMerger;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.PSAugmentationMerger;
+import org.opendaylight.netvirt.elan.l2gw.ha.merge.PSNodeMerger;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class OpNodeUpdatedHandler {
+
+    GlobalAugmentationMerger globalAugmentationMerger = GlobalAugmentationMerger.getInstance();
+    PSAugmentationMerger psAugmentationMerger     = PSAugmentationMerger.getInstance();
+    GlobalNodeMerger globalNodeMerger         = GlobalNodeMerger.getInstance();
+    PSNodeMerger psNodeMerger             = PSNodeMerger.getInstance();
+
+    public void handle(Node updatedSrcNode, Node origSrcNode, InstanceIdentifier<Node> haPath, ReadWriteTransaction tx)
+            throws ReadFailedException {
+        if (updatedSrcNode.getAugmentation(HwvtepGlobalAugmentation.class) != null) {
+            copyChildGlobalOpUpdateToHAParent(updatedSrcNode, origSrcNode, haPath, tx);
+        } else {
+            copyChildPsOpUpdateToHAParent(updatedSrcNode, origSrcNode, haPath, tx);
+        }
+    }
+
+    /**
+     * Copy HA ps node update to HA child ps node of operational data tree.
+     *
+     * @param updatedSrcPSNode Updated HA child ps node
+     * @param origSrcPSNode Original HA ps node
+     * @param haPath HA node path
+     * @param tx Transaction
+     * @throws ReadFailedException  Exception thrown if read fails
+     */
+    public void copyChildPsOpUpdateToHAParent(Node updatedSrcPSNode,
+                                              Node origSrcPSNode,
+                                              InstanceIdentifier<Node> haPath,
+                                              ReadWriteTransaction tx) throws ReadFailedException {
+
+        InstanceIdentifier<Node> haPSPath = HwvtepHAUtil.convertPsPath(updatedSrcPSNode, haPath);
+        Node existingHAPSNode = HwvtepHAUtil.readNode(tx, LogicalDatastoreType.OPERATIONAL, haPSPath);
+
+        PhysicalSwitchAugmentation updatedSrc   = HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(updatedSrcPSNode);
+        PhysicalSwitchAugmentation origSrc      = HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(origSrcPSNode);
+        PhysicalSwitchAugmentation existingData = HwvtepHAUtil.getPhysicalSwitchAugmentationOfNode(existingHAPSNode);
+
+        psAugmentationMerger.mergeOpUpdate(existingData, updatedSrc, origSrc, haPSPath, tx);
+        psNodeMerger.mergeOpUpdate(existingHAPSNode, updatedSrcPSNode, origSrcPSNode, haPSPath, tx);
+    }
+
+    /**
+     * Copy updated data from HA node to child node of operational data tree.
+     *
+     * @param updatedSrcNode Updated HA child node
+     * @param origSrcNode Original HA node
+     * @param haPath HA node path
+     * @param tx Transaction
+     * @throws ReadFailedException  Exception thrown if read fails
+     */
+    public void copyChildGlobalOpUpdateToHAParent(Node updatedSrcNode,
+                                                  Node origSrcNode,
+                                                  InstanceIdentifier<Node> haPath,
+                                                  ReadWriteTransaction tx) throws ReadFailedException {
+
+        Node existingDstNode = HwvtepHAUtil.readNode(tx, LogicalDatastoreType.OPERATIONAL, haPath);
+        if (existingDstNode == null) {
+            //No dst present nothing to copy
+            return;
+        }
+        HwvtepGlobalAugmentation existingData    = HwvtepHAUtil.getGlobalAugmentationOfNode(existingDstNode);
+        HwvtepGlobalAugmentation updatedSrc = HwvtepHAUtil.getGlobalAugmentationOfNode(updatedSrcNode);
+        HwvtepGlobalAugmentation origSrc    = HwvtepHAUtil.getGlobalAugmentationOfNode(origSrcNode);
+
+        globalAugmentationMerger.mergeOpUpdate(existingData, updatedSrc, origSrc, haPath, tx);
+        globalNodeMerger.mergeOpUpdate(existingDstNode, updatedSrcNode, origSrcNode, haPath, tx);
+    }
+
+}
diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAConfigNodeListener.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAConfigNodeListener.java
new file mode 100644 (file)
index 0000000..24b3674
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
+
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.netvirt.elan.l2gw.ha.handlers.ConfigNodeUpdatedHandler;
+import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
+import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HAConfigNodeListener extends HwvtepNodeBaseListener {
+    private static final Logger LOG = LoggerFactory.getLogger(HAConfigNodeListener.class);
+
+    static HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
+
+    IHAEventHandler haEventHandler;
+    ConfigNodeUpdatedHandler configNodeUpdatedHandler = new ConfigNodeUpdatedHandler();
+
+    public HAConfigNodeListener(DataBroker db, HAEventHandler haEventHandler) {
+        super(LogicalDatastoreType.CONFIGURATION, db);
+        this.haEventHandler = haEventHandler;
+    }
+
+    @Override
+    void onPsNodeAdd(InstanceIdentifier<Node> key,
+                     Node haPSNode,
+                     ReadWriteTransaction tx) throws InterruptedException, ExecutionException, ReadFailedException {
+        //copy the ps node data to children
+        String psId = haPSNode.getNodeId().getValue();
+        Set<InstanceIdentifier<Node>> childSwitchIds = HwvtepHAUtil.getPSChildrenIdsForHAPSNode(psId);
+        for (InstanceIdentifier<Node> childSwitchId : childSwitchIds) {
+            haEventHandler.copyHAPSUpdateToChild(haPSNode, null/*haOriginal*/, childSwitchId, tx);
+        }
+        LOG.info("Handle config ps node add {}", psId);
+    }
+
+    @Override
+    void onPsNodeUpdate(InstanceIdentifier<Node> key,
+                        Node haPSUpdated,
+                        Node haPSOriginal,
+                        ReadWriteTransaction tx) throws InterruptedException, ExecutionException, ReadFailedException {
+        //copy the ps node data to children
+        String psId = haPSUpdated.getNodeId().getValue();
+        Set<InstanceIdentifier<Node>> childSwitchIds = HwvtepHAUtil.getPSChildrenIdsForHAPSNode(psId);
+        for (InstanceIdentifier<Node> childSwitchId : childSwitchIds) {
+            haEventHandler.copyHAPSUpdateToChild(haPSUpdated, haPSOriginal, childSwitchId, tx);
+        }
+    }
+
+    @Override
+    void onGlobalNodeUpdate(InstanceIdentifier<Node> key,
+                            Node haUpdated,
+                            Node haOriginal,
+                            ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+        //copy the ha node data to children
+        Set<InstanceIdentifier<Node>> childNodeIds = hwvtepHACache.getChildrenForHANode(key);
+        for (InstanceIdentifier<Node> haChildNodeId : childNodeIds) {
+            haEventHandler.copyHAGlobalUpdateToChild(haUpdated, haOriginal, haChildNodeId, tx);
+        }
+    }
+
+    @Override
+    void onPsNodeDelete(InstanceIdentifier<Node> key,
+                        Node deletedPsNode,
+                        ReadWriteTransaction tx) throws ReadFailedException {
+        //delete ps children nodes
+        String psId = deletedPsNode.getNodeId().getValue();
+        Set<InstanceIdentifier<Node>> childPsIds = HwvtepHAUtil.getPSChildrenIdsForHAPSNode(psId);
+        for (InstanceIdentifier<Node> childPsId : childPsIds) {
+            HwvtepHAUtil.deleteNodeIfPresent(tx, CONFIGURATION, childPsId);
+        }
+    }
+
+    @Override
+    void onGlobalNodeDelete(InstanceIdentifier<Node> key,
+                            Node haNode,
+                            ReadWriteTransaction tx)
+            throws ReadFailedException, ExecutionException, InterruptedException {
+        //delete child nodes
+        String deletedNodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
+        Set<InstanceIdentifier<Node>> children = hwvtepHACache.getChildrenForHANode(key);
+        for (InstanceIdentifier<Node> childId : children) {
+            HwvtepHAUtil.deleteNodeIfPresent(tx, CONFIGURATION, childId);
+        }
+        HwvtepHAUtil.deletePSNodesOfNode(key, haNode, tx);
+    }
+}
diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpClusteredListener.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpClusteredListener.java
new file mode 100644 (file)
index 0000000..fcc6e0f
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
+
+import com.google.common.base.Strings;
+import java.util.List;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Managers;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HAOpClusteredListener extends HwvtepNodeBaseListener implements ClusteredDataTreeChangeListener<Node> {
+    private static final Logger LOG = LoggerFactory.getLogger(HAOpClusteredListener.class);
+
+    static HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
+    private static DataBroker dataBroker;
+    private ListenerRegistration<HAOpClusteredListener> registration;
+
+    public HAOpClusteredListener(DataBroker db) {
+        super(LogicalDatastoreType.OPERATIONAL, db);
+        LOG.info("Registering HAOpClusteredListener");
+    }
+
+    @Override
+    void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx)  {
+        hwvtepHACache.updateDisconnectedNodeStatus(key);
+    }
+
+    @Override
+    void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)  {
+        hwvtepHACache.updateDisconnectedNodeStatus(key);
+    }
+
+    @Override
+    void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)    {
+        hwvtepHACache.updateConnectedNodeStatus(key);
+    }
+
+    @Override
+    void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node updated, ReadWriteTransaction tx)  {
+        addToCacheIfHAChildNode(key, updated);
+        hwvtepHACache.updateConnectedNodeStatus(key);
+    }
+
+    public static void addToCacheIfHAChildNode(InstanceIdentifier<Node> childPath, Node childNode) {
+        String haId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(childNode);
+        if (!Strings.isNullOrEmpty(haId)) {
+            InstanceIdentifier<Node> parentId = HwvtepHAUtil.createInstanceIdentifierFromHAId(haId);
+            HwvtepHAUtil.updateL2GwCacheNodeId(childNode, parentId);
+            hwvtepHACache.addChild(parentId, childPath/*child*/);
+        }
+    }
+
+    @Override
+    void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
+                            Node updatedChildNode,
+                            Node beforeChildNode,
+                            ReadWriteTransaction tx) {
+        boolean wasHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
+        addToHACacheIfBecameHAChild(childPath, updatedChildNode, beforeChildNode, tx);
+        boolean isHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
+
+
+        if (!wasHAChild && isHAChild) {
+            LOG.debug(getPrintableNodeId(childPath) + " " + "became ha_child");
+        } else if (wasHAChild && !isHAChild) {
+            LOG.debug(getPrintableNodeId(childPath) + " " + "unbecome ha_child");
+        }
+    }
+
+    static String getPrintableNodeId(InstanceIdentifier<Node> key) {
+        String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
+        int idx = nodeId.indexOf("uuid/");
+        if (idx > 0) {
+            nodeId = nodeId.substring(idx + "uuid/".length());
+        }
+        return nodeId;
+    }
+
+    /**
+     * If Normal non-ha node changes to HA node , its added to HA cache.
+     *
+     * @param childPath HA child path which got converted to HA node
+     * @param updatedChildNode updated Child node
+     * @param beforeChildNode non-ha node before updated to HA node
+     * @param tx Transaction
+     */
+    public static void addToHACacheIfBecameHAChild(InstanceIdentifier<Node> childPath,
+                                                   Node updatedChildNode,
+                                                   Node beforeChildNode,
+                                                   ReadWriteTransaction tx) {
+        HwvtepGlobalAugmentation updatedAugmentaion = updatedChildNode.getAugmentation(HwvtepGlobalAugmentation.class);
+        HwvtepGlobalAugmentation beforeAugmentaion = null;
+        if (beforeChildNode != null) {
+            beforeAugmentaion = beforeChildNode.getAugmentation(HwvtepGlobalAugmentation.class);
+        }
+        List<Managers> up = null;
+        List<Managers> be = null;
+        if (updatedAugmentaion != null) {
+            up = updatedAugmentaion.getManagers();
+        }
+        if (beforeAugmentaion != null) {
+            be = beforeAugmentaion.getManagers();
+        }
+        if (up != null) {
+            if (be != null) {
+                if (up.size() > 0) {
+                    if (be.size() > 0) {
+                        Managers m1 = up.get(0);
+                        Managers m2 = be.get(0);
+                        if (!m1.equals(m2)) {
+                            LOG.info("Manager entry updated for node {} ", updatedChildNode.getNodeId().getValue());
+                            addToCacheIfHAChildNode(childPath, updatedChildNode);
+                        }
+                    }
+                }
+            }
+            //TODO handle unhaed case
+        }
+    }
+}
+
diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpNodeListener.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpNodeListener.java
new file mode 100644 (file)
index 0000000..c6c5b05
--- /dev/null
@@ -0,0 +1,287 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
+
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.netvirt.elan.l2gw.ha.commands.SwitchesCmd;
+import org.opendaylight.netvirt.elan.l2gw.ha.handlers.HAEventHandler;
+import org.opendaylight.netvirt.elan.l2gw.ha.handlers.IHAEventHandler;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.Switches;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HAOpNodeListener extends HwvtepNodeBaseListener implements DataTreeChangeListener<Node>, AutoCloseable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(HAOpNodeListener.class);
+
+    static HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
+
+    IHAEventHandler haEventHandler;
+
+    Map<String, Boolean> availableGlobalNodes = new HashMap<>();
+    Map<String, Boolean> availablePsNodes = new HashMap<>();
+
+    void clearNodeAvailability(InstanceIdentifier<Node> key) {
+        String id = key.firstKeyOf(Node.class).getNodeId().getValue();
+        String psId = null;
+        String globalId = null;
+
+        if (id.indexOf(HwvtepHAUtil.PHYSICALSWITCH) > 0) {
+            psId = id;
+            globalId = id.substring(0, id.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
+            availablePsNodes.remove(globalId);
+        } else {
+            globalId = id;
+            availableGlobalNodes.remove(globalId);
+        }
+    }
+
+    void updateNodeAvailability(InstanceIdentifier<Node> key) {
+        String id = key.firstKeyOf(Node.class).getNodeId().getValue();
+        String psId = null;
+        String globalId = null;
+
+        if (id.indexOf(HwvtepHAUtil.PHYSICALSWITCH) > 0) {
+            psId = id;
+            globalId = id.substring(0, id.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
+            availablePsNodes.put(globalId, Boolean.TRUE);
+        } else {
+            globalId = id;
+            availableGlobalNodes.put(globalId, Boolean.TRUE);
+        }
+    }
+
+    boolean areBothGlobalAndPsNodeAvailable(InstanceIdentifier<Node> key) {
+        String id = key.firstKeyOf(Node.class).getNodeId().getValue();
+        String psId = null;
+        String globalId = null;
+
+        if (id.indexOf(HwvtepHAUtil.PHYSICALSWITCH) > 0) {
+            psId = id;
+            globalId = id.substring(0, id.indexOf(HwvtepHAUtil.PHYSICALSWITCH));
+        } else {
+            globalId = id;
+        }
+        if (availableGlobalNodes.containsKey(globalId) && availablePsNodes.containsKey(globalId)) {
+            return true;
+        }
+        return false;
+    }
+
+    public HAOpNodeListener(DataBroker db, HAEventHandler haEventHandler) {
+        super(OPERATIONAL, db);
+        this.haEventHandler = haEventHandler;
+        LOG.info("Registering HwvtepDataChangeListener for operational nodes");
+    }
+
+    @Override
+    void onGlobalNodeAdd(InstanceIdentifier<Node> childPath,
+                         Node childNode,
+                         ReadWriteTransaction tx) {
+        LOG.info("Node connected " + childNode.getNodeId().getValue() + " - Checking if Ha or Non-Ha enabled");
+        //update cache
+        HAOpClusteredListener.addToCacheIfHAChildNode(childPath, childNode);
+        if (!hwvtepHACache.isHAEnabledDevice(childPath)) {
+            LOG.debug(" Non ha node connected " + childNode.getNodeId().getValue());
+            return;
+        }
+        hwvtepHACache.updateConnectedNodeStatus(childPath);
+        LOG.info("Ha enabled child node connected {}", childNode.getNodeId().getValue());
+        InstanceIdentifier<Node> haNodePath = hwvtepHACache.getParent(childPath);
+        updateNodeAvailability(childPath);
+        if (areBothGlobalAndPsNodeAvailable(childPath)) {
+            readHAConfigNodeAndMergeData(childPath, childNode, haNodePath, tx);
+        }
+    }
+
+    @Override
+    void onPsNodeDelete(InstanceIdentifier<Node> childPath,
+                        Node childNode,
+                        ReadWriteTransaction tx) {
+        clearNodeAvailability(childPath);
+    }
+
+    @Override
+    void onGlobalNodeDelete(InstanceIdentifier<Node> childPath,
+                            Node childNode,
+                            ReadWriteTransaction tx) throws InterruptedException, ExecutionException,
+            ReadFailedException {
+        clearNodeAvailability(childPath);
+        if (!hwvtepHACache.isHAEnabledDevice(childPath)) {
+            return;
+        }
+        //If all child nodes disconnect remove parent from operational datastore
+        InstanceIdentifier<Node> haNodePath = hwvtepHACache.getParent(childPath);
+        if (haNodePath != null) {
+            Set<InstanceIdentifier<Node>> children = hwvtepHACache.getChildrenForHANode(haNodePath);
+            children.remove(childPath);
+            hwvtepHACache.updateDisconnectedNodeStatus(childPath);
+            if (HwvtepHAUtil.areAllChildDeleted(children, tx)) {
+                LOG.info("All child deleted for ha node {} ", HwvtepHAUtil.getNodeIdVal(haNodePath));
+                HwvtepHAUtil.deleteSwitchesManagedByNode(haNodePath, tx);
+                HwvtepHAUtil.deleteNodeIfPresent(tx, OPERATIONAL, haNodePath);
+            }
+        }
+    }
+
+    @Override
+    void onGlobalNodeUpdate(InstanceIdentifier<Node> childPath,
+                            Node updatedChildNode,
+                            Node originalChildNode,
+                            ReadWriteTransaction tx) throws ReadFailedException {
+
+        String oldHAId = HwvtepHAUtil.getHAIdFromManagerOtherConfig(originalChildNode);
+        if (!Strings.isNullOrEmpty(oldHAId)) { //was already ha child
+            InstanceIdentifier<Node> haPath = hwvtepHACache.getParent(childPath);
+            haEventHandler.copyChildGlobalOpUpdateToHAParent(updatedChildNode, originalChildNode, haPath, tx);
+            return;//TODO handle unha case
+        }
+
+        HAOpClusteredListener.addToHACacheIfBecameHAChild(childPath, updatedChildNode, originalChildNode, tx);
+        boolean becameHAChild = hwvtepHACache.isHAEnabledDevice(childPath);
+        if (becameHAChild) {
+            hwvtepHACache.updateConnectedNodeStatus(childPath);
+            LOG.info("{} became ha child ", updatedChildNode.getNodeId().getValue());
+            onGlobalNodeAdd(childPath, updatedChildNode, tx);
+        }
+    }
+
+    @Override
+    void onPsNodeAdd(InstanceIdentifier<Node> childPath,
+                     Node childPsNode,
+                     ReadWriteTransaction tx) throws ReadFailedException {
+        updateNodeAvailability(childPath);
+        if (areBothGlobalAndPsNodeAvailable(childPath)) {
+            InstanceIdentifier<Node> globalPath = HwvtepHAUtil.getGlobalNodePathFromPSNode(childPsNode);
+            Node globalNode = HwvtepHAUtil.readNode(tx, OPERATIONAL, globalPath);
+            onGlobalNodeAdd(globalPath, globalNode, tx);
+        }
+    }
+
+    @Override
+    void onPsNodeUpdate(InstanceIdentifier<Node> childPath,
+                        Node updatedChildPSNode,
+                        Node originalChildPSNode,
+                        ReadWriteTransaction tx) throws ReadFailedException {
+
+        InstanceIdentifier<Node> globalNodePath = HwvtepHAUtil.getGlobalNodePathFromPSNode(updatedChildPSNode);
+        if (hwvtepHACache.isHAEnabledDevice(globalNodePath) && areBothGlobalAndPsNodeAvailable(childPath)) {
+            InstanceIdentifier<Node> haPath = hwvtepHACache.getParent(globalNodePath);
+            haEventHandler.copyChildPsOpUpdateToHAParent(updatedChildPSNode, originalChildPSNode, haPath, tx);
+        }
+    }
+
+    public void readHAConfigNodeAndMergeData(final InstanceIdentifier<Node> childPath,
+                                             final Node childNode,
+                                             final InstanceIdentifier<Node> haNodePath,
+                                             final ReadWriteTransaction tx) {
+        if (haNodePath == null) {
+            return;
+        }
+        ListenableFuture<Optional<Node>> ft = tx.read(CONFIGURATION, haNodePath);
+        Futures.addCallback(ft, new FutureCallback<Optional<Node>>() {
+            @Override
+            public void onSuccess(final Optional<Node> haGlobalCfg) {
+                if (haGlobalCfg.isPresent()) {
+                    Node haConfigNode = haGlobalCfg.get();
+                    if (childNode.getAugmentation(HwvtepGlobalAugmentation.class) != null) {
+                        List<Switches> switches =
+                                childNode.getAugmentation(HwvtepGlobalAugmentation.class).getSwitches();
+                        if (switches != null) {
+                            SwitchesCmd cmd = new SwitchesCmd();
+                            for (Switches ps : switches) {
+                                ReadWriteTransaction tx = getTx();
+                                Switches dst = cmd.transform(haNodePath, ps);
+                                ListenableFuture<Optional<Node>> ft = tx.read(CONFIGURATION,
+                                        (InstanceIdentifier<Node>) dst.getSwitchRef().getValue());
+                                Futures.addCallback(ft, new FutureCallback<Optional<Node>>() {
+                                    @Override
+                                    public void onSuccess(Optional<Node> haPSCfg) {
+                                        handleNodeReConnected(childPath, childNode, haNodePath, haGlobalCfg, haPSCfg);
+                                    }
+
+                                    @Override
+                                    public void onFailure(Throwable throwable) {
+                                    }
+                                });
+                                break;//TODO handle all switches instead of just one switch
+                            }
+                        } else {
+                            Optional<Node> psNodeOptional = Optional.absent();
+                            handleNodeReConnected(childPath, childNode, haNodePath, haGlobalCfg, psNodeOptional);
+                        }
+                    }
+                } else {
+                    handleNodeConnected(childPath, childNode, haNodePath);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+            }
+        });
+    }
+
+    void handleNodeConnected(final InstanceIdentifier<Node> childPath,
+                             final Node childNode,
+                             final InstanceIdentifier<Node> haNodePath) {
+        HAJobScheduler.getInstance().submitJob(new Callable<Void>() {
+            @Override
+            public Void call() throws InterruptedException, ExecutionException, ReadFailedException,
+                    TransactionCommitFailedException {
+                LOG.info("Ha child connected handleNodeConnected {}", childNode.getNodeId().getValue());
+                ReadWriteTransaction tx = getTx();
+                haEventHandler.handleChildNodeConnected(childNode, childPath, haNodePath, tx);
+                tx.submit().checkedGet();
+                return null;
+            }
+        });
+    }
+
+    void handleNodeReConnected(final InstanceIdentifier<Node> childPath,
+                               final Node childNode,
+                               final InstanceIdentifier<Node> haNodePath,
+                               final Optional<Node> haGlobalCfg,
+                               final Optional<Node> haPSCfg) {
+        HAJobScheduler.getInstance().submitJob(new Callable<Void>() {
+            @Override
+            public Void call() throws InterruptedException, ExecutionException, ReadFailedException,
+                    TransactionCommitFailedException {
+                LOG.info("Ha child reconnected handleNodeReConnected {}", childNode.getNodeId().getValue());
+                ReadWriteTransaction tx = getTx();
+                haEventHandler.handleChildNodeReConnected(childNode, childPath,
+                        haNodePath, haGlobalCfg, haPSCfg, tx);
+                tx.submit().checkedGet();
+                return null;
+            }
+        });
+    }
+}
diff --git a/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HwvtepNodeBaseListener.java b/vpnservice/elanmanager/elanmanager-impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HwvtepNodeBaseListener.java
new file mode 100644 (file)
index 0000000..e089f0b
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netvirt.elan.l2gw.ha.listeners;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
+import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class HwvtepNodeBaseListener implements DataTreeChangeListener<Node>, AutoCloseable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(HwvtepNodeBaseListener.class);
+
+    static HwvtepHACache hwvtepHACache = HwvtepHACache.getInstance();
+
+    private ListenerRegistration<HwvtepNodeBaseListener> registration;
+    DataBroker db;
+
+    public HwvtepNodeBaseListener(LogicalDatastoreType datastoreType, DataBroker dataBroker) {
+        db = dataBroker;
+        registerListener(datastoreType, db);
+    }
+
+    private void registerListener(LogicalDatastoreType datastoreType, final DataBroker db) {
+        final DataTreeIdentifier<Node> treeId =
+                new DataTreeIdentifier<Node>(datastoreType, getWildcardPath());
+        LOG.trace("Registering on path: {}", treeId);
+        registration = db.registerDataTreeChangeListener(treeId, HwvtepNodeBaseListener.this);
+    }
+
+    @Override
+    public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
+        HAJobScheduler.getInstance().submitJob(new Runnable() {
+            @Override
+            public void run() {
+                ReadWriteTransaction tx = getTx();
+                try {
+                    processConnectedNodes(changes, tx);
+                    processUpdatedNodes(changes, tx);
+                    processDisconnectedNodes(changes, tx);
+                    tx.submit().get();
+                } catch (InterruptedException e) {
+                    LOG.error("InterruptedException " + e.getMessage());
+                } catch (ExecutionException e) {
+                    LOG.error("ExecutionException" + e.getMessage());
+                } catch (ReadFailedException e) {
+                    LOG.error("ReadFailedException" + e.getMessage());
+                }
+            }
+        });
+    }
+
+    private void processUpdatedNodes(Collection<DataTreeModification<Node>> changes,
+                                     ReadWriteTransaction tx)
+            throws ReadFailedException, ExecutionException, InterruptedException {
+        for (DataTreeModification<Node> change : changes) {
+            final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+            final DataObjectModification<Node> mod = change.getRootNode();
+            String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
+            Node updated = HwvtepHAUtil.getUpdated(mod);
+            Node original = HwvtepHAUtil.getOriginal(mod);
+            if (updated != null && original != null) {
+                if (updated != null && original != null) {
+                    if (nodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) < 0) {
+                        onGlobalNodeUpdate(key, updated, original, tx);
+                    } else {
+                        onPsNodeUpdate(key, updated, original, tx);
+                    }
+                }
+            }
+        }
+    }
+
+    private void processDisconnectedNodes(Collection<DataTreeModification<Node>> changes,
+                                          ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+
+        for (DataTreeModification<Node> change : changes) {
+            final InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+            final DataObjectModification<Node> mod = change.getRootNode();
+            Node deleted = HwvtepHAUtil.getRemoved(mod);
+            String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
+            if (deleted != null) {
+                if (nodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) < 0) {
+                    LOG.info("Handle global node delete {}", deleted.getNodeId().getValue());
+                    onGlobalNodeDelete(key, deleted, tx);
+                } else {
+                    LOG.error("Handle ps node node delete {}", deleted.getNodeId().getValue());
+                    onPsNodeDelete(key, deleted, tx);
+                }
+            }
+        }
+    }
+
+    void processConnectedNodes(Collection<DataTreeModification<Node>> changes,
+                               ReadWriteTransaction tx)
+            throws ReadFailedException, ExecutionException,
+    InterruptedException {
+        Map<String, Boolean> processedNodes = new HashMap<>();
+        for (DataTreeModification<Node> change : changes) {
+            InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+            DataObjectModification<Node> mod = change.getRootNode();
+            Node node = HwvtepHAUtil.getCreated(mod);
+            String nodeId = key.firstKeyOf(Node.class).getNodeId().getValue();
+            if (node != null) {
+                if (nodeId.indexOf(HwvtepHAUtil.PHYSICALSWITCH) < 0) {
+                    LOG.info("Handle global node add {}", node.getNodeId().getValue());
+                    onGlobalNodeAdd(key, node, tx);
+                } else {
+                    LOG.error("Handle ps node add {}", node.getNodeId().getValue());
+                    onPsNodeAdd(key, node, tx);
+                }
+            }
+        }
+    }
+
+    private InstanceIdentifier<Node> getWildcardPath() {
+        InstanceIdentifier<Node> path = InstanceIdentifier
+                .create(NetworkTopology.class)
+                .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
+                .child(Node.class);
+        return path;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (registration != null) {
+            registration.close();
+        }
+    }
+
+    ReadWriteTransaction getTx() {
+        return db.newReadWriteTransaction();
+    }
+
+    //default methods
+    void onGlobalNodeDelete(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx)
+            throws InterruptedException, ExecutionException, ReadFailedException {
+    }
+
+    void onPsNodeDelete(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)
+            throws ReadFailedException {
+
+    }
+
+    void onGlobalNodeAdd(InstanceIdentifier<Node> key, Node added, ReadWriteTransaction tx) {
+
+    }
+
+    void onPsNodeAdd(InstanceIdentifier<Node> key, Node addedPSNode, ReadWriteTransaction tx)
+            throws ReadFailedException, InterruptedException, ExecutionException {
+
+    }
+
+    void onGlobalNodeUpdate(InstanceIdentifier<Node> key, Node updated, Node original, ReadWriteTransaction tx)
+            throws ReadFailedException, InterruptedException, ExecutionException {
+
+    }
+
+    void onPsNodeUpdate(InstanceIdentifier<Node> key, Node updated, Node original, ReadWriteTransaction tx)
+            throws ReadFailedException, InterruptedException, ExecutionException {
+
+    }
+
+}