Bulk merge of l2gw changes
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / L2GatewayListener.java
index 917a759e4996a218b71f5896ee29e2f266ad01dc..6f49cc84e45dda2059e0febb5bf495c09c77efcd 100644 (file)
@@ -5,40 +5,46 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
 import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
+import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
-import org.opendaylight.genius.utils.SystemPropertyReader;
-import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
-import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
+import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
+import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
 import org.opendaylight.genius.utils.hwvtep.HwvtepUtils;
-import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
 import org.opendaylight.infrautils.utils.concurrent.Executors;
 import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.util.Datastore.Operational;
 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
 import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
+import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
+import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpNodeListener;
 import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayInstanceRecoveryHandler;
 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayUtils;
+import org.opendaylight.netvirt.elan.l2gw.utils.L2gwZeroDayConfigUtil;
+import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
 import org.opendaylight.netvirt.elanmanager.api.IL2gwService;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
@@ -54,7 +60,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev15071
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateways.attributes.L2gateways;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateways.attributes.l2gateways.L2gateway;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.rev150712.Neutron;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,96 +73,112 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
         implements RecoverableListener {
     private static final Logger LOG = LoggerFactory.getLogger(L2GatewayListener.class);
     private final DataBroker dataBroker;
-    private final ManagedNewTransactionRunner txRunner;
-    private final ItmRpcService itmRpcService;
     private final IL2gwService l2gwService;
-    private final EntityOwnershipUtils entityOwnershipUtils;
-    private final JobCoordinator jobCoordinator;
     private final L2GatewayCache l2GatewayCache;
+    private final HAOpNodeListener haOpNodeListener;
+    private final HAOpClusteredListener haOpClusteredListener;
+    private final ElanClusterUtils elanClusterUtils;
+    private final L2gwZeroDayConfigUtil l2gwZeroDayConfigUtil;
+    private final L2GwTransportZoneListener transportZoneListener;
+    private final ManagedNewTransactionRunner txRunner;
+    private final ItmRpcService itmRpcService;
 
     @Inject
-    public L2GatewayListener(final DataBroker dataBroker, final EntityOwnershipService entityOwnershipService,
-                             final ItmRpcService itmRpcService, final IL2gwService l2gwService,
-                             final JobCoordinator jobCoordinator, final L2GatewayCache l2GatewayCache,
+    public L2GatewayListener(final DataBroker dataBroker,
+                             final IL2gwService l2gwService,
+                             final L2GatewayCache l2GatewayCache,
+                             HAOpNodeListener haOpNodeListener,
+                             HAOpClusteredListener haOpClusteredListener,
+                             final ItmRpcService itmRpcService,
                              L2GatewayInstanceRecoveryHandler l2GatewayInstanceRecoveryHandler,
-                             ServiceRecoveryRegistry serviceRecoveryRegistry) {
+                             ServiceRecoveryRegistry serviceRecoveryRegistry,
+                             L2gwZeroDayConfigUtil l2gwZeroDayConfigUtil,
+                             L2GwTransportZoneListener transportZoneListener,
+                             ElanClusterUtils elanClusterUtils) {
         super(dataBroker, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
                 .child(L2gateways.class).child(L2gateway.class),
-                Executors.newListeningSingleThreadExecutor("L2GatewayListener", LOG));
+            Executors.newListeningSingleThreadExecutor("L2GatewayListener", LOG));
         this.dataBroker = dataBroker;
-        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
-        this.entityOwnershipUtils = new EntityOwnershipUtils(entityOwnershipService);
-        this.itmRpcService = itmRpcService;
         this.l2gwService = l2gwService;
-        this.jobCoordinator = jobCoordinator;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
+        this.l2gwZeroDayConfigUtil = l2gwZeroDayConfigUtil;
+        this.transportZoneListener = transportZoneListener;
         this.l2GatewayCache = l2GatewayCache;
+        this.haOpClusteredListener = haOpClusteredListener;
+        this.haOpNodeListener = haOpNodeListener;
+        this.elanClusterUtils = elanClusterUtils;
+        this.itmRpcService = itmRpcService;
         serviceRecoveryRegistry.addRecoverableListener(l2GatewayInstanceRecoveryHandler.buildServiceRegistryKey(),
                 this);
         init();
     }
 
     public void init() {
+        ResourceBatchingManager.getInstance().registerDefaultBatchHandlers(this.dataBroker);
         LOG.info("{} init", getClass().getSimpleName());
+        // registerListener(); called from L2GatewayConnection listener
     }
 
     @Override
-    @PreDestroy
-    public void close() {
-        super.close();
-        Executors.shutdownAndAwaitTermination(getExecutorService());
+    public void register() {
+        LOG.info("Registering L2Gateway Listener Override Method");
+        super.register();
     }
 
+    @Override
     public void registerListener() {
-        super.register();
         LOG.info("Registering L2Gateway Listener");
+        super.register();
     }
 
+    @Override
     public void deregisterListener() {
-        super.close();
         LOG.info("Deregistering L2GatewayListener");
+        super.close();
+    }
+
+    @Override
+    @PreDestroy
+    public void close() {
+        super.close();
+        Executors.shutdownAndAwaitTermination(getExecutorService());
     }
 
     @Override
     public void add(final InstanceIdentifier<L2gateway> identifier, final L2gateway input) {
-        LOG.info("Adding L2gateway with ID: {}", input.getUuid());
+        LOG.info("Adding L2gateway with ID: {}", input);
 
-        for (Devices l2Device : input.nonnullDevices().values()) {
-            LOG.trace("Adding L2gateway device: {}", l2Device);
+        List<Devices> l2Devices = new ArrayList<>(input.getDevices().values());
+        for (Devices l2Device : l2Devices) {
+            LOG.info("Adding L2gateway device: {}", l2Device);
             addL2Device(l2Device, input);
         }
     }
 
     @Override
     public void remove(final InstanceIdentifier<L2gateway> identifier, final L2gateway input) {
-        LOG.info("Removing L2gateway with ID: {}", input.getUuid());
+        LOG.info("Removing L2gateway with ID: {}", input);
         List<L2gatewayConnection> connections = l2gwService
                 .getL2GwConnectionsByL2GatewayId(input.getUuid());
-        Futures.addCallback(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+
+        txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION, tx -> {
             for (L2gatewayConnection connection : connections) {
                 InstanceIdentifier<L2gatewayConnection> iid = InstanceIdentifier.create(Neutron.class)
-                        .child(L2gatewayConnections.class).child(L2gatewayConnection.class, connection.key());
+                    .child(L2gatewayConnections.class).child(L2gatewayConnection.class, connection.key());
                 tx.delete(iid);
             }
-        }), new FutureCallback<Object>() {
-            @Override
-            public void onSuccess(Object result) {
-                for (Devices l2Device : input.nonnullDevices().values()) {
-                    LOG.trace("Removing L2gateway device: {}", l2Device);
-                    removeL2Device(l2Device, input);
-                }
-            }
+        });
 
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Failed to delete associated l2gwconnection while deleting l2gw {} with id",
-                        input.getUuid(), throwable);
-            }
-        }, MoreExecutors.directExecutor());
+        Collection<Devices> l2Devices = input.getDevices().values();
+        for (Devices l2Device : l2Devices) {
+            LOG.info("Removing L2gateway device: {}", l2Device);
+            removeL2Device(l2Device, input);
+        }
     }
 
     @Override
     public void update(InstanceIdentifier<L2gateway> identifier, L2gateway original, L2gateway update) {
-        LOG.trace("Updating L2gateway : key: {}, original value={}, update value={}", identifier, original, update);
+        LOG.info("Updating L2gateway : key: {}, original value={}, update value={}", identifier, original, update);
         List<L2gatewayConnection> connections = l2gwService.getAssociatedL2GwConnections(
                 Sets.newHashSet(update.getUuid()));
         if (connections == null) {
@@ -162,47 +187,50 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
             return;
         }
         if (original.getDevices() == null) {
-            connections.forEach(l2gwService::addL2GatewayConnection);
+            connections.forEach(
+                (connection) -> l2gwService.addL2GatewayConnection(connection));
             return;
         }
-        jobCoordinator.enqueueJob("l2gw.update", () -> {
-            ListenableFuture<?> future = txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
-                DeviceInterfaces updatedDeviceInterfaces = new DeviceInterfaces(update);
-                original.nonnullDevices().values()
-                        .stream()
-                        .filter((originalDevice) -> originalDevice.getInterfaces() != null)
-                        .forEach((originalDevice) -> {
-                            String deviceName = originalDevice.getDeviceName();
-                            L2GatewayDevice l2GwDevice = l2GatewayCache.get(deviceName);
-                            NodeId physicalSwitchNodeId = HwvtepSouthboundUtils.createManagedNodeId(
-                                    new NodeId(l2GwDevice.getHwvtepNodeId()), deviceName);
-                            originalDevice.nonnullInterfaces().values()
-                                    .stream()
-                                    .filter((intf) -> !updatedDeviceInterfaces.containsInterface(
-                                            deviceName, intf.getInterfaceName()))
-                                    .forEach((intf) -> connections.forEach((connection) -> {
-                                        Integer vlanId = connection.getSegmentId();
-                                        if (intf.getSegmentationIds() != null
-                                                && !intf.getSegmentationIds().isEmpty()) {
-                                            for (Integer vlan : intf.getSegmentationIds()) {
-                                                HwvtepUtils.deleteVlanBinding(tx,
-                                                        physicalSwitchNodeId, intf.getInterfaceName(), vlan);
-                                            }
-                                        } else {
-                                            LOG.debug("Deleting vlan binding {} {} {}",
-                                                    physicalSwitchNodeId, intf.getInterfaceName(), vlanId);
-                                            HwvtepUtils.deleteVlanBinding(tx, physicalSwitchNodeId,
-                                                    intf.getInterfaceName(), vlanId);
+        elanClusterUtils.runOnlyInOwnerNode("l2gw.update", () -> {
+            DeviceInterfaces updatedDeviceInterfaces = new DeviceInterfaces(update);
+            FluentFuture<?> fts = txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+                original.getDevices().values()
+                    .stream()
+                    .filter((originalDevice) -> originalDevice.getInterfaces() != null)
+                    .forEach((originalDevice) -> {
+                        String deviceName = originalDevice.getDeviceName();
+                        L2GatewayDevice l2GwDevice = l2GatewayCache.get(deviceName);
+                        NodeId physicalSwitchNodeId = HwvtepSouthboundUtils.createManagedNodeId(
+                            new NodeId(l2GwDevice.getHwvtepNodeId()), deviceName);
+                        originalDevice.getInterfaces().values()
+                            .stream()
+                            .filter((intf) -> !updatedDeviceInterfaces.containsInterface(
+                                deviceName, intf.getInterfaceName()))
+                            .forEach((intf) -> {
+                                connections.forEach((connection) -> {
+                                    Integer vlanId = connection.getSegmentId();
+                                    if (intf.getSegmentationIds() != null
+                                        && !intf.getSegmentationIds().isEmpty()) {
+                                        for (Integer vlan : intf.getSegmentationIds()) {
+                                            HwvtepUtils.deleteVlanBinding(tx,
+                                                physicalSwitchNodeId, intf.getInterfaceName(), vlan);
                                         }
-                                    }));
-                        });
+                                    } else {
+                                        LOG.info("Deleting vlan binding {} {} {}",
+                                            physicalSwitchNodeId, intf.getInterfaceName(), vlanId);
+                                        HwvtepUtils.deleteVlanBinding(tx, physicalSwitchNodeId,
+                                            intf.getInterfaceName(), vlanId);
+                                    }
+                                });
+                            });
+                    });
             });
-            Futures.addCallback(future, new FutureCallback<Object>() {
+            fts.addCallback(new FutureCallback<Object>() {
                 @Override
                 public void onSuccess(Object success) {
-                    LOG.debug("Successfully deleted vlan bindings for l2gw update {}", update);
-                    connections.forEach((l2GwConnection) ->
-                            l2gwService.addL2GatewayConnection(l2GwConnection, null, update));
+                    LOG.info("Successfully deleted vlan bindings for l2gw update {}", update);
+                        connections.forEach((l2GwConnection) ->
+                                l2gwService.addL2GatewayConnection(l2GwConnection, null, update));
                 }
 
                 @Override
@@ -210,30 +238,32 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
                     LOG.error("Failed to delete vlan bindings as part of l2gw udpate {}", update);
                 }
             }, MoreExecutors.directExecutor());
-            return Collections.singletonList(future);
-        }, SystemPropertyReader.getDataStoreJobCoordinatorMaxRetries());
+        });
     }
 
     private synchronized void addL2Device(Devices l2Device, L2gateway input) {
         String l2DeviceName = l2Device.getDeviceName();
 
         L2GatewayDevice l2GwDevice = l2GatewayCache.addOrGet(l2DeviceName);
+        String hwvtepNodeId = l2GwDevice.getHwvtepNodeId();
+        HwvtepHACache haCache = HwvtepHACache.getInstance();
+        if (hwvtepNodeId == null) {
+            scanNodesAndReplayDeviceGlobalNode(l2Device, input, l2DeviceName);
+        } else if (!haCache.isHAParentNode(HwvtepHAUtil.convertToInstanceIdentifier(hwvtepNodeId))) {
+            replayGlobalNode(l2Device, input, l2DeviceName, hwvtepNodeId);
+        }
         l2GwDevice.addL2GatewayId(input.getUuid());
+
         if (l2GwDevice.getHwvtepNodeId() == null) {
             LOG.info("L2GW provisioning skipped for device {}",l2DeviceName);
         } else {
+            transportZoneListener.createZeroDayForL2Device(l2GwDevice);
             LOG.info("Provisioning l2gw for device {}",l2DeviceName);
             l2gwService.provisionItmAndL2gwConnection(l2GwDevice, l2DeviceName, l2GwDevice.getHwvtepNodeId(),
                     l2GwDevice.getTunnelIp());
         }
     }
 
-    protected static boolean isLastL2GatewayBeingDeleted(L2GatewayDevice l2GwDevice) {
-        return l2GwDevice.getL2GatewayIds().size() == 1;
-    }
-
-    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
-            justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void removeL2Device(Devices l2Device, L2gateway input) {
         final String l2DeviceName = l2Device.getDeviceName();
         L2GatewayDevice l2GwDevice = l2GatewayCache.get(l2DeviceName);
@@ -242,6 +272,7 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
             // Also, do not delete device from cache if it's connected
             if (L2GatewayUtils.isLastL2GatewayBeingDeleted(l2GwDevice)) {
                 if (l2GwDevice.isConnected()) {
+                    /*
                     l2GwDevice.removeL2GatewayId(input.getUuid());
                     // Delete ITM tunnels
                     final String hwvtepId = l2GwDevice.getHwvtepNodeId();
@@ -261,25 +292,48 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
 
                         return null;
                     });
+                    */
                 } else {
                     l2GatewayCache.remove(l2DeviceName);
-                    // Cleaning up the config DS
-                    NodeId nodeId = new NodeId(l2GwDevice.getHwvtepNodeId());
-                    NodeId psNodeId = HwvtepSouthboundUtils.createManagedNodeId(nodeId, l2DeviceName);
-                    //FIXME: These should be removed
-                    MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION,
-                            HwvtepSouthboundUtils.createInstanceIdentifier(nodeId));
-                    MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION,
-                            HwvtepSouthboundUtils.createInstanceIdentifier(psNodeId));
-
                 }
+                l2GwDevice.removeL2GatewayId(input.getUuid());
+                //Delete itm tunnels
+                elanClusterUtils.runOnlyInOwnerNode(l2GwDevice.getDeviceName(),
+                    "handling delete of l2gwdevice delete itm tunnels ",
+                    () -> {
+                        if (l2GwDevice.getHwvtepNodeId() == null) {
+                            return Collections.emptyList();
+                        }
+                        // Cleaning up the config DS
+                        NodeId nodeId = new NodeId(l2GwDevice.getHwvtepNodeId());
+                        LOG.info("L2GatewayListener deleting the config nodes {} {}", nodeId, l2DeviceName);
+                        NodeId psNodeId = HwvtepSouthboundUtils.createManagedNodeId(nodeId, l2DeviceName);
+                        InstanceIdentifier<Node> psNodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(psNodeId);
+                        InstanceIdentifier<Node> globalIid = HwvtepSouthboundUtils.createInstanceIdentifier(nodeId);
+
+                        List<ListenableFuture<?>> result = new ArrayList<>();
+                        result.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                            tx -> {
+                                LOG.info("Deleting the zero day config for l2gw delete {}", psNodeIid);
+                                l2gwZeroDayConfigUtil.deleteZeroDayConfig(tx, globalIid, l2GwDevice);
+                            }));
+                        LOG.info("L2GatewayListener Deleting itm tunnels for {}", l2GwDevice.getDeviceName());
+                        for (final IpAddress tunnelIpAddr :  l2GwDevice.getTunnelIps()) {
+                            L2GatewayUtils.deleteItmTunnels(itmRpcService, l2GwDevice.getHwvtepNodeId(),
+                                l2DeviceName, tunnelIpAddr);
+                            //result.add(ElanL2GatewayUtils.deleteItmTunnels(tunnelIpAddr, dataBroker));
+                            LOG.info("L2GatewayListener Deleting itm tunnel {}", tunnelIpAddr);
+                        }
+                        return result;
+                    }
+                );
             } else {
                 l2GwDevice.removeL2GatewayId(input.getUuid());
-                LOG.trace("ITM tunnels are not deleted for {} as this device has other L2gateway associations",
+                LOG.info("ITM tunnels are not deleted for {} as this device has other L2gateway associations",
                         l2DeviceName);
             }
         } else {
-            LOG.error("Unable to find L2 Gateway details for {}", l2DeviceName);
+            LOG.error("L2GatewayListener Unable to find L2 Gateway details for {}", l2DeviceName);
         }
     }
 
@@ -287,11 +341,11 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
         Map<String, Map<String, Interfaces>> deviceInterfacesMap = new HashMap<>();
 
         DeviceInterfaces(L2gateway l2gateway) {
-            if (l2gateway.nonnullDevices() != null) {
-                l2gateway.nonnullDevices().values().forEach((device) -> {
+            if (l2gateway.getDevices() != null) {
+                l2gateway.getDevices().values().forEach((device) -> {
                     deviceInterfacesMap.putIfAbsent(device.getDeviceName(), new HashMap<>());
-                    if (device.nonnullInterfaces() != null) {
-                        device.nonnullInterfaces().values().forEach((intf) ->
+                    if (device.getInterfaces() != null) {
+                        device.getInterfaces().values().forEach((intf) ->
                                 deviceInterfacesMap.get(device.getDeviceName()).put(intf.getInterfaceName(), intf));
                     }
                 });
@@ -305,4 +359,85 @@ public class L2GatewayListener extends AbstractClusteredAsyncDataTreeChangeListe
             return false;
         }
     }
+
+    private void scanNodesAndReplayDeviceGlobalNode(Devices l2Device, L2gateway input, String l2DeviceName) {
+        txRunner.callWithNewReadOnlyTransactionAndClose(OPERATIONAL, tx -> {
+            List<Node> allNodes = readAllOperNodes(tx);
+            for (Node psNode : allNodes) {
+                if (Objects.equals(HwvtepHAUtil.getPsName(psNode), l2DeviceName)) {
+                    String globalNodeId = HwvtepHAUtil.getGlobalNodePathFromPSNode(psNode)
+                        .firstKeyOf(Node.class).getNodeId().getValue();
+                    replayGlobalNode(l2Device, input, l2DeviceName, globalNodeId);
+                }
+            }
+        });
+
+    }
+
+    private List<Node> readAllOperNodes(TypedReadTransaction<Operational> tx) {
+        Optional<Topology> topologyOptional = null;
+        try {
+
+            topologyOptional = tx.read(HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Failed to read oper nodes", e);
+        }
+        if (topologyOptional != null && topologyOptional.isPresent() && topologyOptional.get().getNode() != null) {
+            return new ArrayList<>(topologyOptional.get().getNode().values());
+        }
+        return Collections.emptyList();
+    }
+
+
+    private void replayGlobalNode(Devices l2Device, L2gateway input,
+                                  String l2DeviceName, String hwvtepNodeId) {
+        HwvtepHACache haCache = HwvtepHACache.getInstance();
+        if (haCache.isHAParentNode(HwvtepHAUtil.convertToInstanceIdentifier(hwvtepNodeId))) {
+            return;
+        }
+        InstanceIdentifier<Node> globalIid = HwvtepHAUtil.convertToInstanceIdentifier(hwvtepNodeId);
+        InstanceIdentifier<Node> psIid = HwvtepHAUtil.convertToInstanceIdentifier(
+                hwvtepNodeId + "/physicalswitch/" + l2DeviceName);
+        replayGlobalNode(globalIid, psIid, l2Device, input, haCache, hwvtepNodeId, l2DeviceName);
+    }
+
+    private void replayGlobalNode(InstanceIdentifier<Node> globalIid,
+                                  final InstanceIdentifier<Node> psIid,
+                                  final Devices l2Device, final L2gateway input,
+                                  HwvtepHACache haCache,
+                                  String hwvtepNodeId,
+                                  String l2DeviceName) {
+        txRunner.callWithNewReadWriteTransactionAndSubmit(OPERATIONAL, tx -> {
+            String globalId = hwvtepNodeId;
+            final Optional<Node> globalNode = tx.read(globalIid).get();
+            if (!globalNode.isPresent()) {
+                LOG.error("replayGlobalNode Global Node not present in oper store {}", globalId);
+                return;
+            }
+            final Optional<Node> psNode = tx.read(psIid).get();
+
+            haOpClusteredListener.onGlobalNodeAdd(globalIid, globalNode.get(), tx);
+            if (!haCache.isHAEnabledDevice(globalIid)) {
+                LOG.error("replayGlobalNode Non ha node connected {}", globalId);
+                return;
+            }
+            globalId = haCache.getParent(globalIid).firstKeyOf(Node.class).getNodeId().getValue();
+            haOpNodeListener.onGlobalNodeAdd(globalIid, globalNode.get(), tx);
+            if (!psNode.isPresent()) {
+                LOG.error("replayGlobalNode ps node not present in oper store {}", psIid);
+                return;
+            }
+            haOpNodeListener.onPsNodeAdd(psIid, psNode.get(), tx);
+            PhysicalSwitchAugmentation psAugmentation = psNode.get().augmentation(
+                PhysicalSwitchAugmentation.class);
+            if (psAugmentation != null
+                && psAugmentation.getTunnelIps() != null && !psAugmentation.getTunnelIps().isEmpty()) {
+                l2GatewayCache.updateL2GatewayCache(
+                    l2DeviceName, globalId, new ArrayList<>(psAugmentation.nonnullTunnelIps().values()));
+            } else {
+                LOG.error("replayGlobalNode Failed to find tunnel ips for {}", psIid);
+            }
+        });
+
+    }
 }