elanmanager: use transaction manager 13/68313/9
authorStephen Kitt <skitt@redhat.com>
Tue, 13 Feb 2018 11:44:35 +0000 (12:44 +0100)
committerSam Hague <shague@redhat.com>
Thu, 22 Feb 2018 18:11:53 +0000 (18:11 +0000)
This replaces all direct invocations of
DataBroker::newWriteOnlyTransaction() with calls to a
ManagedNewTransactionRunner, ensuring all such transactions are
closed. It also reworks a number of methods to use read/write
transactions, ensuring that changes are made based on still-relevant
data.

This doesn’t address transactions in ElanInterfaceManager.

Change-Id: Id38c7482cf2403a7751f0faa6134be7374e4a072
Signed-off-by: Stephen Kitt <skitt@redhat.com>
21 files changed:
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/cli/l2gw/L2GwValidateCli.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/evpn/utils/EvpnMacVrfUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/evpn/utils/EvpnUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInstanceManager.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInterfaceConfigListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInterfaceManager.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanLearntVpnVipToPortListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanNodeListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanPacketInHandler.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanSmacFlowEventListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/listeners/HAOpClusteredListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/ElanInstanceListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/HwvtepPhysicalSwitchListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/HwvtepTerminationPointListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/LocalUcastMacListener.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/utils/ElanL2GatewayMulticastUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/utils/ElanL2GatewayUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/utils/StaleVlanBindingsCleaner.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/utils/ElanForwardingEntriesHandler.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/utils/ElanUtils.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/utils/TransportZoneNotificationUtil.java

index 298987a4bdd0e3b0c610c0707e5ef5faf4cdcdc7..da91352340c4c0d469421e2c0ba90b1eb6b86c36 100644 (file)
@@ -26,7 +26,7 @@ import java.util.Set;
 import org.apache.karaf.shell.commands.Command;
 import org.apache.karaf.shell.console.OsgiCommandSupport;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
@@ -122,40 +122,41 @@ public class L2GwValidateCli extends OsgiCommandSupport {
     }
 
     private void readNodes() throws ReadFailedException {
-        ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        InstanceIdentifier<Topology> topoId = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
+        try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
+            InstanceIdentifier<Topology> topoId = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
 
-        Optional<Topology> operationalTopoOptional = tx.read(LogicalDatastoreType.OPERATIONAL, topoId).checkedGet();
-        Optional<Topology> configTopoOptional = tx.read(LogicalDatastoreType.CONFIGURATION, topoId).checkedGet();
+            Optional<Topology> operationalTopoOptional = tx.read(LogicalDatastoreType.OPERATIONAL, topoId).checkedGet();
+            Optional<Topology> configTopoOptional = tx.read(LogicalDatastoreType.CONFIGURATION, topoId).checkedGet();
 
-        if (operationalTopoOptional.isPresent()) {
-            for (Node node : operationalTopoOptional.get().getNode()) {
-                InstanceIdentifier<Node> nodeIid = topoId.child(Node.class, node.getKey());
-                operationalNodes.put(nodeIid, node);
+            if (operationalTopoOptional.isPresent()) {
+                for (Node node : operationalTopoOptional.get().getNode()) {
+                    InstanceIdentifier<Node> nodeIid = topoId.child(Node.class, node.getKey());
+                    operationalNodes.put(nodeIid, node);
+                }
             }
-        }
-        if (configTopoOptional.isPresent()) {
-            for (Node node : configTopoOptional.get().getNode()) {
-                InstanceIdentifier<Node> nodeIid = topoId.child(Node.class, node.getKey());
-                configNodes.put(nodeIid, node);
+            if (configTopoOptional.isPresent()) {
+                for (Node node : configTopoOptional.get().getNode()) {
+                    InstanceIdentifier<Node> nodeIid = topoId.child(Node.class, node.getKey());
+                    configNodes.put(nodeIid, node);
+                }
             }
-        }
 
-        fillNodesData(operationalNodes, operationalNodesData);
-        fillNodesData(configNodes, configNodesData);
+            fillNodesData(operationalNodes, operationalNodesData);
+            fillNodesData(configNodes, configNodesData);
 
-        Optional<ElanInstances> elanInstancesOptional = tx.read(LogicalDatastoreType.CONFIGURATION,
-                InstanceIdentifier.builder(ElanInstances.class).build()).checkedGet();
+            Optional<ElanInstances> elanInstancesOptional = tx.read(LogicalDatastoreType.CONFIGURATION,
+                    InstanceIdentifier.builder(ElanInstances.class).build()).checkedGet();
 
-        if (elanInstancesOptional.isPresent() && elanInstancesOptional.get().getElanInstance() != null) {
-            for (ElanInstance elanInstance : elanInstancesOptional.get().getElanInstance()) {
-                elanInstanceMap.put(elanInstance.getElanInstanceName(), elanInstance);
+            if (elanInstancesOptional.isPresent() && elanInstancesOptional.get().getElanInstance() != null) {
+                for (ElanInstance elanInstance : elanInstancesOptional.get().getElanInstance()) {
+                    elanInstanceMap.put(elanInstance.getElanInstanceName(), elanInstance);
+                }
+            }
+            l2gatewayConnections = L2GatewayConnectionUtils.getAllL2gatewayConnections(dataBroker);
+            l2gateways = L2GatewayConnectionUtils.getL2gatewayList(dataBroker);
+            for (L2gateway l2gateway : l2gateways) {
+                uuidToL2Gateway.put(l2gateway.getUuid(), l2gateway);
             }
-        }
-        l2gatewayConnections = L2GatewayConnectionUtils.getAllL2gatewayConnections(dataBroker);
-        l2gateways = L2GatewayConnectionUtils.getL2gatewayList(dataBroker);
-        for (L2gateway l2gateway : l2gateways) {
-            uuidToL2Gateway.put(l2gateway.getUuid(), l2gateway);
         }
     }
 
index d7994d9605e4093145f798dfac734caf75ddec06..621192c23c7df4958771a9b97e90698b6df74c16 100644 (file)
@@ -16,7 +16,7 @@ import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
@@ -87,22 +87,23 @@ public class EvpnMacVrfUtils {
     }
 
     public String getElanNameByMacvrfiid(InstanceIdentifier<MacVrfEntry> instanceIdentifier) {
-        ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        String rd = instanceIdentifier.firstKeyOf(VrfTables.class).getRouteDistinguisher();
-        String elanName = null;
-        InstanceIdentifier<EvpnRdToNetwork> iidEvpnRdToNet =
-                InstanceIdentifier.builder(EvpnRdToNetworks.class).child(EvpnRdToNetwork.class,
-                        new EvpnRdToNetworkKey(rd)).build();
-        try {
-            Optional<EvpnRdToNetwork> evpnRdToNetwork =
-                    tx.read(LogicalDatastoreType.CONFIGURATION, iidEvpnRdToNet).checkedGet();
-            if (evpnRdToNetwork.isPresent()) {
-                elanName = evpnRdToNetwork.get().getNetworkId();
+        try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
+            String rd = instanceIdentifier.firstKeyOf(VrfTables.class).getRouteDistinguisher();
+            String elanName = null;
+            InstanceIdentifier<EvpnRdToNetwork> iidEvpnRdToNet =
+                    InstanceIdentifier.builder(EvpnRdToNetworks.class).child(EvpnRdToNetwork.class,
+                            new EvpnRdToNetworkKey(rd)).build();
+            try {
+                Optional<EvpnRdToNetwork> evpnRdToNetwork =
+                        tx.read(LogicalDatastoreType.CONFIGURATION, iidEvpnRdToNet).checkedGet();
+                if (evpnRdToNetwork.isPresent()) {
+                    elanName = evpnRdToNetwork.get().getNetworkId();
+                }
+            } catch (ReadFailedException e) {
+                LOG.error("getElanName: unable to read elanName, exception ", e);
             }
-        } catch (ReadFailedException e) {
-            LOG.error("getElanName: unable to read elanName, exception ", e);
+            return elanName;
         }
-        return elanName;
     }
 
     public InstanceIdentifier<MacVrfEntry> getMacVrfEntryIid(String rd, MacVrfEntry macVrfEntry) {
index f754359f9835ebde70b869cf6edc546f03a4abe3..cf864485793bdb780a4e976a259b6592b3d02d50 100644 (file)
@@ -24,10 +24,12 @@ import java.util.function.Predicate;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.itm.globals.ITMConstants;
@@ -42,6 +44,7 @@ import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
 import org.opendaylight.genius.utils.ServiceIndex;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
@@ -81,6 +84,7 @@ public class EvpnUtils {
     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IInterfaceManager interfaceManager;
     private final ElanUtils elanUtils;
     private final ItmRpcService itmRpcService;
@@ -94,6 +98,7 @@ public class EvpnUtils {
             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.interfaceManager = interfaceManager;
         this.elanUtils = elanUtils;
         this.itmRpcService = itmRpcService;
@@ -359,36 +364,35 @@ public class EvpnUtils {
     }
 
     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        int instructionKey = 0;
-        LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
-        List<Instruction> instructions = new ArrayList<>();
-        instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
-                NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
-        short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
-        BoundServices serviceInfo = ElanUtils.getBoundServices(
-                ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
-                NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
-        InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-        Optional<BoundServices> existingElanService = elanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
-                bindServiceId);
-        if (!existingElanService.isPresent()) {
-            tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo, true);
-            tx.submit();
-        }
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            int instructionKey = 0;
+            LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
+            List<Instruction> instructions = new ArrayList<>();
+            instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
+                    NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
+            short elanServiceIndex =
+                    ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
+            BoundServices serviceInfo = ElanUtils.getBoundServices(
+                    ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
+                    NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
+            InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
+            if (!tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
+                tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo,
+                        WriteTransaction.CREATE_MISSING_PARENTS);
+            }
+        }), LOG, "Error binding an ELAN service to an external tunnel");
     }
 
     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
-        short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
-        InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-        Optional<BoundServices> existingElanService = elanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
-                bindServiceId);
-        if (!existingElanService.isPresent()) {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
-            tx.submit();
-        }
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
+            short elanServiceIndex =
+                    ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
+            InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
+            if (tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
+                tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
+            }
+        }), LOG, "Error binding an ELAN service to an external tunnel");
     }
 
     private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
@@ -448,17 +452,18 @@ public class EvpnUtils {
             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
             List futures = Collections.singletonList(settableFuture);
 
-            ReadWriteTransaction tx = broker.newReadWriteTransaction();
-
-            Futures.addCallback(tx.read(datastoreType, iid), new SettableFutureCallback<Optional<T>>(settableFuture) {
-                @Override
-                public void onSuccess(Optional<T> data) {
-                    function.apply(data);
-                    super.onSuccess(data);
-                }
-            }, MoreExecutors.directExecutor());
-
-            return futures;
+            try (ReadOnlyTransaction tx = broker.newReadOnlyTransaction()) {
+                Futures.addCallback(tx.read(datastoreType, iid),
+                        new SettableFutureCallback<Optional<T>>(settableFuture) {
+                            @Override
+                            public void onSuccess(Optional<T> data) {
+                                function.apply(data);
+                                super.onSuccess(data);
+                            }
+                        }, MoreExecutors.directExecutor());
+
+                return futures;
+            }
         }, ElanConstants.JOB_MAX_RETRIES);
     }
 }
index 4d7a0457845a68972d83979ef9c95ef68aed6e93..c68b0651fb4023dc1b48dc6eb3e6ee0f9e0d39c6 100644 (file)
@@ -18,13 +18,15 @@ import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.ElanException;
 import org.opendaylight.netvirt.elan.cache.ElanInterfaceCache;
 import org.opendaylight.netvirt.elan.utils.ElanConstants;
@@ -48,6 +50,7 @@ public class ElanInstanceManager extends AsyncDataTreeChangeListenerBase<ElanIns
     private static final Logger LOG = LoggerFactory.getLogger(ElanInstanceManager.class);
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IdManagerService idManager;
     private final IInterfaceManager interfaceManager;
     private final ElanInterfaceManager elanInterfaceManager;
@@ -61,6 +64,7 @@ public class ElanInstanceManager extends AsyncDataTreeChangeListenerBase<ElanIns
                                final ElanInterfaceCache elanInterfaceCache) {
         super(ElanInstance.class, ElanInstanceManager.class);
         this.broker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.idManager = managerService;
         this.elanInterfaceManager = elanInterfaceManager;
         this.interfaceManager = interfaceManager;
@@ -106,17 +110,15 @@ public class ElanInstanceManager extends AsyncDataTreeChangeListenerBase<ElanIns
             ElanUtils.delete(broker, LogicalDatastoreType.OPERATIONAL,
                     ElanUtils.getElanInfoEntriesOperationalDataPath(elanTag));
         }
-        elanInterfaceCache.getInterfaceNames(elanName).forEach(elanInterfaceName -> {
-            jobCoordinator.enqueueJob(ElanUtils.getElanInterfaceJobKey(elanInterfaceName), () -> {
-                WriteTransaction writeConfigTxn = broker.newWriteOnlyTransaction();
-                LOG.info("Deleting the elanInterface present under ConfigDS:{}", elanInterfaceName);
-                ElanUtils.delete(broker, LogicalDatastoreType.CONFIGURATION,
-                        ElanUtils.getElanInterfaceConfigurationDataPathId(elanInterfaceName));
-                elanInterfaceManager.unbindService(elanInterfaceName, writeConfigTxn);
-                LOG.info("unbind the Interface:{} service bounded to Elan:{}", elanInterfaceName, elanName);
-                return Collections.singletonList(writeConfigTxn.submit());
-            }, ElanConstants.JOB_MAX_RETRIES);
-        });
+        elanInterfaceCache.getInterfaceNames(elanName).forEach(
+            elanInterfaceName -> jobCoordinator.enqueueJob(ElanUtils.getElanInterfaceJobKey(elanInterfaceName),
+                () -> Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+                    LOG.info("Deleting the elanInterface present under ConfigDS:{}", elanInterfaceName);
+                    ElanUtils.delete(broker, LogicalDatastoreType.CONFIGURATION,
+                            ElanUtils.getElanInterfaceConfigurationDataPathId(elanInterfaceName));
+                    elanInterfaceManager.unbindService(elanInterfaceName, tx);
+                    LOG.info("unbind the Interface:{} service bounded to Elan:{}", elanInterfaceName, elanName);
+                })), ElanConstants.JOB_MAX_RETRIES));
         // Release tag
         ElanUtils.releaseId(idManager, ElanConstants.ELAN_ID_POOL_NAME, elanName);
         if (deletedElan.getAugmentation(EtreeInstance.class) != null) {
@@ -138,38 +140,34 @@ public class ElanInstanceManager extends AsyncDataTreeChangeListenerBase<ElanIns
     protected void update(InstanceIdentifier<ElanInstance> identifier, ElanInstance original, ElanInstance update) {
         Long existingElanTag = original.getElanTag();
         String elanName = update.getElanInstanceName();
-        if (existingElanTag != null && existingElanTag.equals(update.getElanTag())) {
-            return;
-        } else if (update.getElanTag() == null) {
-            // update the elan-Instance with new properties
-            WriteTransaction tx = broker.newWriteOnlyTransaction();
-            ElanUtils.updateOperationalDataStore(broker, idManager,
-                    update, new ArrayList<>(), tx);
-            ElanUtils.waitForTransactionToComplete(tx);
-            return;
-        }
-
-        jobCoordinator.enqueueJob(elanName, () -> {
-            try {
-                return elanInterfaceManager.handleunprocessedElanInterfaces(update);
-            } catch (ElanException e) {
-                LOG.error("update() failed for ElanInstance: " + identifier.toString(), e);
-                return emptyList();
+        if (existingElanTag == null || !existingElanTag.equals(update.getElanTag())) {
+            if (update.getElanTag() == null) {
+                // update the elan-Instance with new properties
+                ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+                    tx -> ElanUtils.updateOperationalDataStore(idManager, update, new ArrayList<>(), tx)),
+                    LOG, "Error updating ELAN tag in ELAN instance");
+            } else {
+                jobCoordinator.enqueueJob(elanName, () -> {
+                    try {
+                        return elanInterfaceManager.handleunprocessedElanInterfaces(update);
+                    } catch (ElanException e) {
+                        LOG.error("update() failed for ElanInstance: " + identifier.toString(), e);
+                        return emptyList();
+                    }
+                }, ElanConstants.JOB_MAX_RETRIES);
             }
-        }, ElanConstants.JOB_MAX_RETRIES);
-
+        }
     }
 
     @Override
     protected void add(InstanceIdentifier<ElanInstance> identifier, ElanInstance elanInstanceAdded) {
-        String elanInstanceName  = elanInstanceAdded.getElanInstanceName();
-        Elan elanInfo = ElanUtils.getElanByName(broker, elanInstanceName);
-        if (elanInfo == null) {
-            WriteTransaction tx = broker.newWriteOnlyTransaction();
-            ElanUtils.updateOperationalDataStore(broker, idManager,
-                elanInstanceAdded, new ArrayList<>(), tx);
-            ElanUtils.waitForTransactionToComplete(tx);
-        }
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            String elanInstanceName  = elanInstanceAdded.getElanInstanceName();
+            Elan elanInfo = ElanUtils.getElanByName(tx, elanInstanceName);
+            if (elanInfo == null) {
+                ElanUtils.updateOperationalDataStore(idManager, elanInstanceAdded, new ArrayList<>(), tx);
+            }
+        }), LOG, "Error adding an ELAN instance");
     }
 
     private static InstanceIdentifier<ElanDpnInterfacesList> getElanDpnOperationDataPath(String elanInstanceName) {
index 868617c967fedcb2c5173c92ff71408e537ae50e..2267764c3630649fd23afc0cca85c52a6d842922 100644 (file)
@@ -13,9 +13,10 @@ import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
 import org.opendaylight.netvirt.elan.cache.ElanInterfaceCache;
 import org.opendaylight.netvirt.elan.utils.ElanConstants;
@@ -35,6 +36,7 @@ public class ElanInterfaceConfigListener
     private static final Logger LOG = LoggerFactory.getLogger(ElanInterfaceConfigListener.class);
 
     private final DataBroker dataBroker;
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanInterfaceManager elanInterfaceManager;
     private final JobCoordinator jobCoordinator;
     private final ElanInterfaceCache elanInterfaceCache;
@@ -44,6 +46,7 @@ public class ElanInterfaceConfigListener
             JobCoordinator jobCoordinator, ElanInterfaceCache elanInterfaceCache) {
         super(Interface.class, ElanInterfaceConfigListener.class);
         this.dataBroker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.elanInterfaceManager = elanInterfaceManager;
         this.jobCoordinator = jobCoordinator;
         this.elanInterfaceCache = elanInterfaceCache;
@@ -77,12 +80,11 @@ public class ElanInterfaceConfigListener
             LOG.debug("There is no ELAN service for interface {}. Ignoring it", interfaceName);
             return;
         }
-        jobCoordinator.enqueueJob(ElanUtils.getElanInterfaceJobKey(interfaceName), () -> {
-            WriteTransaction writeConfigTxn = dataBroker.newWriteOnlyTransaction();
-            LOG.debug("unbinding elan service on interface {} for its config removal", interfaceName);
-            elanInterfaceManager.unbindService(interfaceName, writeConfigTxn);
-            return Collections.singletonList(writeConfigTxn.submit());
-        }, ElanConstants.JOB_MAX_RETRIES);
+        jobCoordinator.enqueueJob(ElanUtils.getElanInterfaceJobKey(interfaceName),
+            () -> Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+                LOG.debug("unbinding elan service on interface {} for its config removal", interfaceName);
+                elanInterfaceManager.unbindService(interfaceName, tx);
+            })), ElanConstants.JOB_MAX_RETRIES);
     }
 
     @Override
index 65ad1ae979dccc67175aa64214ba6faca85c96ca..c26c8d93322c09eb73c37a63931e512e2cf20a8c 100644 (file)
@@ -25,15 +25,18 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.apache.commons.lang3.StringUtils;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.itm.globals.ITMConstants;
@@ -58,6 +61,7 @@ import org.opendaylight.genius.mdsalutil.matches.MatchMetadata;
 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
 import org.opendaylight.genius.utils.ServiceIndex;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.ElanException;
 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
 import org.opendaylight.netvirt.elan.cache.ElanInterfaceCache;
@@ -135,6 +139,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
     private static final boolean SH_FLAG_UNSET = false;
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IMdsalApiManager mdsalManager;
     private final IInterfaceManager interfaceManager;
     private final IdManagerService idManager;
@@ -164,6 +169,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                                 final ElanInterfaceCache elanInterfaceCache) {
         super(ElanInterface.class, ElanInterfaceManager.class);
         this.broker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.idManager = managerService;
         this.mdsalManager = mdsalApiManager;
         this.interfaceManager = interfaceManager;
@@ -373,64 +379,63 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
     List<ListenableFuture<Void>> removeEntriesForElanInterface(ElanInstance elanInfo, InterfaceInfo
             interfaceInfo, String interfaceName, boolean isLastElanInterface) {
         String elanName = elanInfo.getElanInstanceName();
-        WriteTransaction interfaceTx = broker.newWriteOnlyTransaction();
-        WriteTransaction flowTx = broker.newWriteOnlyTransaction();
-        InstanceIdentifier<ElanInterfaceMac> elanInterfaceId = ElanUtils
-                .getElanInterfaceMacEntriesOperationalDataPath(interfaceName);
-        Optional<ElanInterfaceMac> existingElanInterfaceMac = ElanUtils.read(broker,
-                LogicalDatastoreType.OPERATIONAL, elanInterfaceId);
-        LOG.debug("Removing the Interface:{} from elan:{}", interfaceName, elanName);
-        if (interfaceInfo != null) {
-            if (existingElanInterfaceMac.isPresent()) {
-                List<MacEntry> existingMacEntries = existingElanInterfaceMac.get().getMacEntry();
-                List<MacEntry> macEntries = new ArrayList<>();
-                if (existingMacEntries != null && !existingMacEntries.isEmpty()) {
-                    macEntries.addAll(existingMacEntries);
-                }
-                List<PhysAddress> macAddresses = macEntries.stream().map(macEntry -> {
-                    PhysAddress macAddress = macEntry.getMacAddress();
-                    LOG.debug("removing the  mac-entry:{} present on elanInterface:{}",
-                            macAddress.getValue(), interfaceName);
-                    Optional<MacEntry> macEntryOptional = elanUtils.getMacEntryForElanInstance(elanName,
-                            macAddress);
-                    if (!isLastElanInterface && macEntryOptional.isPresent()) {
-                        interfaceTx.delete(LogicalDatastoreType.OPERATIONAL,
-                                ElanUtils.getMacEntryOperationalDataPath(elanName, macAddress));
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(flowTx -> {
+            futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(interfaceTx -> {
+                InstanceIdentifier<ElanInterfaceMac> elanInterfaceId = ElanUtils
+                        .getElanInterfaceMacEntriesOperationalDataPath(interfaceName);
+                Optional<ElanInterfaceMac> existingElanInterfaceMac =
+                        interfaceTx.read(LogicalDatastoreType.OPERATIONAL, elanInterfaceId).checkedGet();
+                LOG.debug("Removing the Interface:{} from elan:{}", interfaceName, elanName);
+                if (interfaceInfo != null) {
+                    if (existingElanInterfaceMac.isPresent()) {
+                        List<MacEntry> existingMacEntries = existingElanInterfaceMac.get().getMacEntry();
+                        if (existingMacEntries != null) {
+                            List<PhysAddress> macAddresses = new ArrayList<>();
+                            for (MacEntry macEntry : existingMacEntries) {
+                                PhysAddress macAddress = macEntry.getMacAddress();
+                                LOG.debug("removing the  mac-entry:{} present on elanInterface:{}",
+                                        macAddress.getValue(), interfaceName);
+                                Optional<MacEntry> macEntryOptional =
+                                        elanUtils.getMacEntryForElanInstance(interfaceTx, elanName, macAddress);
+                                if (!isLastElanInterface && macEntryOptional.isPresent()) {
+                                    interfaceTx.delete(LogicalDatastoreType.OPERATIONAL,
+                                            ElanUtils.getMacEntryOperationalDataPath(elanName, macAddress));
+                                }
+                                elanUtils.deleteMacFlows(elanInfo, interfaceInfo, macEntry, flowTx);
+                                macAddresses.add(macAddress);
+                            }
+
+                            // Removing all those MACs from External Devices belonging
+                            // to this ELAN
+                            if (isVxlanNetworkOrVxlanSegment(elanInfo) && ! macAddresses.isEmpty()) {
+                                elanL2GatewayUtils.removeMacsFromElanExternalDevices(elanInfo, macAddresses);
+                            }
+                        }
                     }
-                    elanUtils.deleteMacFlows(elanInfo, interfaceInfo, macEntry, flowTx);
-                    return macAddress;
-                }).collect(Collectors.toList());
-
-                // Removing all those MACs from External Devices belonging
-                // to this ELAN
-                if (isVxlanNetworkOrVxlanSegment(elanInfo) && ! macAddresses.isEmpty()) {
-                    elanL2GatewayUtils.removeMacsFromElanExternalDevices(elanInfo, macAddresses);
-                }
-            }
-            removeDefaultTermFlow(interfaceInfo.getDpId(), interfaceInfo.getInterfaceTag());
-            removeFilterEqualsTable(elanInfo, interfaceInfo, flowTx);
-        } else if (existingElanInterfaceMac.isPresent()) {
-            // Interface does not exist in ConfigDS, so lets remove everything
-            // about that interface related to Elan
-            List<MacEntry> macEntries = existingElanInterfaceMac.get().getMacEntry();
-            if (macEntries != null) {
-                for (MacEntry macEntry : macEntries) {
-                    PhysAddress macAddress = macEntry.getMacAddress();
-                    if (elanUtils.getMacEntryForElanInstance(elanName, macAddress).isPresent()) {
-                        interfaceTx.delete(LogicalDatastoreType.OPERATIONAL,
-                                ElanUtils.getMacEntryOperationalDataPath(elanName, macAddress));
+                    removeDefaultTermFlow(interfaceInfo.getDpId(), interfaceInfo.getInterfaceTag());
+                    removeFilterEqualsTable(elanInfo, interfaceInfo, flowTx);
+                } else if (existingElanInterfaceMac.isPresent()) {
+                    // Interface does not exist in ConfigDS, so lets remove everything
+                    // about that interface related to Elan
+                    List<MacEntry> macEntries = existingElanInterfaceMac.get().getMacEntry();
+                    if (macEntries != null) {
+                        for (MacEntry macEntry : macEntries) {
+                            PhysAddress macAddress = macEntry.getMacAddress();
+                            if (elanUtils.getMacEntryForElanInstance(elanName, macAddress).isPresent()) {
+                                interfaceTx.delete(LogicalDatastoreType.OPERATIONAL,
+                                        ElanUtils.getMacEntryOperationalDataPath(elanName, macAddress));
+                            }
+                        }
                     }
                 }
-            }
-        }
-        if (existingElanInterfaceMac.isPresent()) {
-            interfaceTx.delete(LogicalDatastoreType.OPERATIONAL, elanInterfaceId);
-        }
-        unbindService(interfaceName, interfaceTx);
-        deleteElanInterfaceFromConfigDS(interfaceName, interfaceTx);
-        List<ListenableFuture<Void>> futures = new ArrayList<>();
-        futures.add(ElanUtils.waitForTransactionToComplete(interfaceTx));
-        futures.add(ElanUtils.waitForTransactionToComplete(flowTx));
+                if (existingElanInterfaceMac.isPresent()) {
+                    interfaceTx.delete(LogicalDatastoreType.OPERATIONAL, elanInterfaceId);
+                }
+                unbindService(interfaceName, interfaceTx);
+                deleteElanInterfaceFromConfigDS(interfaceName, interfaceTx);
+            }));
+        }));
         return futures;
     }
 
@@ -541,43 +546,42 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
 
     @Override
     protected void add(InstanceIdentifier<ElanInterface> identifier, ElanInterface elanInterfaceAdded) {
-        String elanInstanceName = elanInterfaceAdded.getElanInstanceName();
-        String interfaceName = elanInterfaceAdded.getName();
-        InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(interfaceName);
-        if (interfaceInfo == null) {
-            LOG.info("Interface {} is removed from Interface Oper DS due to port down ", interfaceName);
-            return;
-        }
-        ElanInstance elanInstance = elanInstanceCache.get(elanInstanceName).orNull();
+        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            String elanInstanceName = elanInterfaceAdded.getElanInstanceName();
+            String interfaceName = elanInterfaceAdded.getName();
+            InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(interfaceName);
+            if (interfaceInfo == null) {
+                LOG.info("Interface {} is removed from Interface Oper DS due to port down ", interfaceName);
+                return;
+            }
+            ElanInstance elanInstance = elanInstanceCache.get(elanInstanceName).orNull();
 
-        if (elanInstance == null) {
-            elanInstance = new ElanInstanceBuilder().setElanInstanceName(elanInstanceName)
-                    .setDescription(elanInterfaceAdded.getDescription()).build();
-            // Add the ElanInstance in the Configuration data-store
-            WriteTransaction tx = broker.newWriteOnlyTransaction();
-            List<String> elanInterfaces = new ArrayList<>();
-            elanInterfaces.add(interfaceName);
-            elanInstance = ElanUtils.updateOperationalDataStore(broker, idManager,
-                    elanInstance, elanInterfaces, tx);
-            tx.submit();
-        }
-
-        Long elanTag = elanInstance.getElanTag();
-        // If elan tag is not updated, then put the elan interface into
-        // unprocessed entry map and entry. Let entries
-        // in this map get processed during ELAN update DCN.
-        if (elanTag == null) {
-            ConcurrentLinkedQueue<ElanInterface> elanInterfaces = unProcessedElanInterfaces.get(elanInstanceName);
-            if (elanInterfaces == null) {
-                elanInterfaces = new ConcurrentLinkedQueue<>();
+            if (elanInstance == null) {
+                elanInstance = new ElanInstanceBuilder().setElanInstanceName(elanInstanceName)
+                        .setDescription(elanInterfaceAdded.getDescription()).build();
+                // Add the ElanInstance in the Configuration data-store
+                List<String> elanInterfaces = new ArrayList<>();
+                elanInterfaces.add(interfaceName);
+                elanInstance = ElanUtils.updateOperationalDataStore(idManager, elanInstance, elanInterfaces, tx);
             }
-            elanInterfaces.add(elanInterfaceAdded);
-            unProcessedElanInterfaces.put(elanInstanceName, elanInterfaces);
-            return;
-        }
-        InterfaceAddWorkerOnElan addWorker = new InterfaceAddWorkerOnElan(elanInstanceName, elanInterfaceAdded,
-                interfaceInfo, elanInstance, this);
-        jobCoordinator.enqueueJob(elanInstanceName, addWorker, ElanConstants.JOB_MAX_RETRIES);
+
+            Long elanTag = elanInstance.getElanTag();
+            // If elan tag is not updated, then put the elan interface into
+            // unprocessed entry map and entry. Let entries
+            // in this map get processed during ELAN update DCN.
+            if (elanTag == null) {
+                ConcurrentLinkedQueue<ElanInterface> elanInterfaces = unProcessedElanInterfaces.get(elanInstanceName);
+                if (elanInterfaces == null) {
+                    elanInterfaces = new ConcurrentLinkedQueue<>();
+                }
+                elanInterfaces.add(elanInterfaceAdded);
+                unProcessedElanInterfaces.put(elanInstanceName, elanInterfaces);
+                return;
+            }
+            InterfaceAddWorkerOnElan addWorker = new InterfaceAddWorkerOnElan(elanInstanceName, elanInterfaceAdded,
+                    interfaceInfo, elanInstance, this);
+            jobCoordinator.enqueueJob(elanInstanceName, addWorker, ElanConstants.JOB_MAX_RETRIES);
+        }), LOG, "Error procedding added ELAN interface");
     }
 
     List<ListenableFuture<Void>> handleunprocessedElanInterfaces(ElanInstance elanInstance) throws ElanException {
@@ -646,8 +650,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         if (elanInfo == null) {
             List<String> elanInterfaces = new ArrayList<>();
             elanInterfaces.add(interfaceName);
-            ElanUtils.updateOperationalDataStore(broker, idManager,
-                    elanInstance, elanInterfaces, tx);
+            ElanUtils.updateOperationalDataStore(idManager, elanInstance, elanInterfaces, tx);
         } else {
             createElanStateList(elanInstanceName, interfaceName, tx);
         }
@@ -691,8 +694,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         createElanInterfaceTablesList(interfaceName, tx);
         List<ListenableFuture<Void>> futures = new ArrayList<>();
         futures.add(ElanUtils.waitForTransactionToComplete(tx));
-        installEntriesForFirstInterfaceonDpn(elanInstance, interfaceInfo, dpnInterfaces,
-                isFirstInterfaceInDpn, null);
+        installEntriesForFirstInterfaceonDpn(elanInstance, interfaceInfo, dpnInterfaces, isFirstInterfaceInDpn);
 
         // add the vlan provider interface to remote BC group for the elan
         // for internal vlan networks
@@ -797,10 +799,8 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
 
         MacEntry macEntry = new MacEntryBuilder().setMacAddress(physAddress).setInterface(interfaceName)
                 .setKey(new MacEntryKey(physAddress)).build();
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
         elanForwardingEntriesHandler.deleteElanInterfaceForwardingEntries(
-                elanInstanceCache.get(elanInstanceName).orNull(), interfaceInfo, macEntry, tx);
-        ElanUtils.waitForTransactionToComplete(tx);
+                elanInstanceCache.get(elanInstanceName).orNull(), interfaceInfo, macEntry);
     }
 
     private InstanceIdentifier<MacEntry> getMacEntryOperationalDataPath(String elanName, PhysAddress physAddress) {
@@ -844,7 +844,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
     }
 
     public void installEntriesForFirstInterfaceonDpn(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
-            DpnInterfaces dpnInterfaces, boolean isFirstInterfaceInDpn, WriteTransaction tx) {
+            DpnInterfaces dpnInterfaces, boolean isFirstInterfaceInDpn) {
         if (!isOperational(interfaceInfo)) {
             return;
         }
@@ -1394,12 +1394,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         return elanInterface.getAugmentation(EtreeInterface.class) == null;
     }
 
-    protected void unbindService(String interfaceName, WriteTransaction tx) {
+    protected void unbindService(String interfaceName, ReadWriteTransaction tx) throws ReadFailedException {
         short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
         InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-        Optional<BoundServices> existingElanService = ElanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
-                bindServiceId);
-        if (existingElanService.isPresent()) {
+        if (tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
             tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
         }
     }
index 60c36eb3bb9cce1c8967155e526af6140dcd74c3..b660dc99b80a7b6108d0416907fc7094c9837dab 100644 (file)
@@ -21,6 +21,8 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
@@ -46,6 +48,7 @@ public class ElanLearntVpnVipToPortListener extends
         AsyncDataTreeChangeListenerBase<LearntVpnVipToPort, ElanLearntVpnVipToPortListener> {
     private static final Logger LOG = LoggerFactory.getLogger(ElanLearntVpnVipToPortListener.class);
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IInterfaceManager interfaceManager;
     private final ElanUtils elanUtils;
     private final JobCoordinator jobCoordinator;
@@ -57,6 +60,7 @@ public class ElanLearntVpnVipToPortListener extends
             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache, ElanInterfaceCache elanInterfaceCache) {
         super(LearntVpnVipToPort.class, ElanLearntVpnVipToPortListener.class);
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.interfaceManager = interfaceManager;
         this.elanUtils = elanUtils;
         this.jobCoordinator = jobCoordinator;
@@ -119,13 +123,11 @@ public class ElanLearntVpnVipToPortListener extends
                 LOG.debug("ElanInterface Not present for interfaceName {} for add event", interfaceName);
                 return Collections.emptyList();
             }
-            WriteTransaction interfaceTx = broker.newWriteOnlyTransaction();
-            WriteTransaction flowTx = broker.newWriteOnlyTransaction();
-            addMacEntryToDsAndSetupFlows(elanInterface.get().getElanInstanceName(), interfaceTx, flowTx,
-                    ElanConstants.STATIC_MAC_TIMEOUT);
             List<ListenableFuture<Void>> futures = new ArrayList<>();
-            futures.add(interfaceTx.submit());
-            futures.add(flowTx.submit());
+            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(interfaceTx -> futures.add(
+                    txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+                        flowTx -> addMacEntryToDsAndSetupFlows(elanInterface.get().getElanInstanceName(),
+                                interfaceTx, flowTx, ElanConstants.STATIC_MAC_TIMEOUT)))));
             return futures;
         }
 
@@ -160,18 +162,17 @@ public class ElanLearntVpnVipToPortListener extends
         }
 
         @Override
-        public List<ListenableFuture<Void>> call() throws Exception {
+        public List<ListenableFuture<Void>> call() {
             Optional<ElanInterface> elanInterface = elanInterfaceCache.get(interfaceName);
             if (!elanInterface.isPresent()) {
                 LOG.debug("ElanInterface Not present for interfaceName {} for delete event", interfaceName);
                 return Collections.emptyList();
             }
-            WriteTransaction interfaceTx = broker.newWriteOnlyTransaction();
-            WriteTransaction flowTx = broker.newWriteOnlyTransaction();
-            deleteMacEntryFromDsAndRemoveFlows(elanInterface.get().getElanInstanceName(), interfaceTx, flowTx);
             List<ListenableFuture<Void>> futures = new ArrayList<>();
-            futures.add(interfaceTx.submit());
-            futures.add(flowTx.submit());
+            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(interfaceTx -> futures.add(
+                    txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+                        flowTx -> deleteMacEntryFromDsAndRemoveFlows(elanInterface.get().getElanInstanceName(),
+                                interfaceTx, flowTx)))));
             return futures;
         }
 
index db3444779c717b27678e9caf5446a740c56948a7..7b9db79103423f00bd64e4f0a58545a653d19452 100644 (file)
@@ -19,6 +19,8 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
 import org.opendaylight.genius.mdsalutil.BucketInfo;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
@@ -61,6 +63,7 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
     private static final int LEARN_MATCH_REG4_VALUE = 1;
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IMdsalApiManager mdsalManager;
     private final IdManagerService idManagerService;
     private final int tempSmacLearnTimeout;
@@ -71,6 +74,7 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
     public ElanNodeListener(DataBroker dataBroker, IMdsalApiManager mdsalManager, ElanConfig elanConfig,
             IdManagerService idManagerService, JobCoordinator jobCoordinator) {
         this.broker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.mdsalManager = mdsalManager;
         this.tempSmacLearnTimeout = elanConfig.getTempSmacLearnTimeout();
         this.puntLldpToController = elanConfig.isPuntLldpToController();
@@ -112,13 +116,12 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
     }
 
     private void createArpDefaultFlowsForArpCheckTable(BigInteger dpId) {
-        jobCoordinator.enqueueJob("ARP_CHECK_TABLE-" + dpId.toString(), () -> {
-            WriteTransaction writeFlowTx = broker.newWriteOnlyTransaction();
-            LOG.debug("Received notification to install Arp Check Default entries for dpn {} ", dpId);
-            createArpRequestMatchFlows(dpId, writeFlowTx);
-            createArpResponseMatchFlows(dpId, writeFlowTx);
-            return Arrays.asList(writeFlowTx.submit());
-        });
+        jobCoordinator.enqueueJob("ARP_CHECK_TABLE-" + dpId.toString(),
+            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                LOG.debug("Received notification to install Arp Check Default entries for dpn {} ", dpId);
+                createArpRequestMatchFlows(dpId, tx);
+                createArpResponseMatchFlows(dpId, tx);
+            })));
     }
 
     public void createTableMissEntry(BigInteger dpnId) {
index 6db538831c2d5193dcc45fff458dd92697b751dd..6ad81a0fad0ff2123339e457d0bb0443d24ff094 100755 (executable)
@@ -15,6 +15,8 @@ import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
@@ -22,6 +24,7 @@ import org.opendaylight.genius.mdsalutil.NWUtil;
 import org.opendaylight.genius.mdsalutil.NwConstants;
 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
 import org.opendaylight.netvirt.elan.evpn.utils.EvpnUtils;
 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
@@ -50,7 +53,7 @@ public class ElanPacketInHandler implements PacketProcessingListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(ElanPacketInHandler.class);
 
-    private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IInterfaceManager interfaceManager;
     private final ElanUtils elanUtils;
     private final ElanL2GatewayUtils elanL2GatewayUtils;
@@ -62,7 +65,7 @@ public class ElanPacketInHandler implements PacketProcessingListener {
     public ElanPacketInHandler(DataBroker dataBroker, final IInterfaceManager interfaceManager, ElanUtils elanUtils,
             EvpnUtils evpnUtils, ElanL2GatewayUtils elanL2GatewayUtils, JobCoordinator jobCoordinator,
             ElanInstanceCache elanInstanceCache) {
-        broker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.interfaceManager = interfaceManager;
         this.elanUtils = elanUtils;
         this.elanL2GatewayUtils = elanL2GatewayUtils;
@@ -156,39 +159,36 @@ public class ElanPacketInHandler implements PacketProcessingListener {
                                                String elanName, PhysAddress physAddress,
                                                MacEntry oldMacEntry, MacEntry newMacEntry,
                                                final boolean isVlanOrFlatProviderIface) {
-        jobCoordinator.enqueueJob(ElanUtils.getElanMacKey(elanTag, macAddress), () -> {
-            WriteTransaction writeTx = broker.newWriteOnlyTransaction();
-            if (oldMacEntry != null && oldMacEntry.getInterface().equals(interfaceName)) {
-                // This should never occur because of ovs temporary mac learning
-                ElanManagerCounters.unknown_smac_pktin_forwarding_entries_removed.inc();
-            } else if (oldMacEntry != null && !isVlanOrFlatProviderIface) {
-                long macTimeStamp = oldMacEntry.getControllerLearnedForwardingEntryTimestamp().longValue();
-                if (System.currentTimeMillis() > macTimeStamp + 1000) {
-                    InstanceIdentifier<MacEntry> macEntryId = ElanUtils
-                            .getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName,
-                                    physAddress);
-                    writeTx.delete(LogicalDatastoreType.OPERATIONAL, macEntryId);
-                } else {
-                    // New FEs flood their packets on all interfaces. This
-                    // can lead
-                    // to many contradicting packet_ins. Ignore all packets
-                    // received
-                    // within 1s after the first packet_in
-                    ElanManagerCounters.unknown_smac_pktin_mac_migration_ignored_due_to_protection.inc();
+        jobCoordinator.enqueueJob(ElanUtils.getElanMacKey(elanTag, macAddress),
+            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                if (oldMacEntry != null && oldMacEntry.getInterface().equals(interfaceName)) {
+                    // This should never occur because of ovs temporary mac learning
+                    ElanManagerCounters.unknown_smac_pktin_forwarding_entries_removed.inc();
+                } else if (oldMacEntry != null && !isVlanOrFlatProviderIface) {
+                    long macTimeStamp = oldMacEntry.getControllerLearnedForwardingEntryTimestamp().longValue();
+                    if (System.currentTimeMillis() > macTimeStamp + 1000) {
+                        InstanceIdentifier<MacEntry> macEntryId = ElanUtils
+                                .getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName,
+                                        physAddress);
+                        tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryId);
+                    } else {
+                        // New FEs flood their packets on all interfaces. This can lead
+                        // to many contradicting packet_ins. Ignore all packets received
+                        // within 1s after the first packet_in
+                        ElanManagerCounters.unknown_smac_pktin_mac_migration_ignored_due_to_protection.inc();
+                    }
+                } else if (oldMacEntry != null) {
+                    ElanManagerCounters.unknown_smac_pktin_removed_for_relearned.inc();
                 }
-            } else if (oldMacEntry != null) {
-                ElanManagerCounters.unknown_smac_pktin_removed_for_relearned.inc();
-            }
-            // This check is required only to update elan-forwarding-tables when mac is learned
-            // in ports (example: VM interfaces) other than on vlan provider port.
-            if (!isVlanOrFlatProviderIface && oldMacEntry == null) {
-                InstanceIdentifier<MacEntry> elanMacEntryId =
-                        ElanUtils.getMacEntryOperationalDataPath(elanName, physAddress);
-                writeTx.put(LogicalDatastoreType.OPERATIONAL, elanMacEntryId, newMacEntry,
-                        WriteTransaction.CREATE_MISSING_PARENTS);
-            }
-            return Collections.singletonList(writeTx.submit());
-        });
+                // This check is required only to update elan-forwarding-tables when mac is learned
+                // in ports (example: VM interfaces) other than on vlan provider port.
+                if (!isVlanOrFlatProviderIface && oldMacEntry == null) {
+                    InstanceIdentifier<MacEntry> elanMacEntryId =
+                            ElanUtils.getMacEntryOperationalDataPath(elanName, physAddress);
+                    tx.put(LogicalDatastoreType.OPERATIONAL, elanMacEntryId, newMacEntry,
+                            WriteTransaction.CREATE_MISSING_PARENTS);
+                }
+            })));
     }
 
     private void enqueueJobForDPNSpecificTasks(final String macAddress, final long elanTag, String interfaceName,
@@ -201,14 +201,14 @@ public class ElanPacketInHandler implements PacketProcessingListener {
             elanL2GatewayUtils.scheduleAddDpnMacInExtDevices(elanInstance.getElanInstanceName(), dpId,
                     Collections.singletonList(physAddress));
             ElanManagerCounters.unknown_smac_pktin_learned.inc();
-            WriteTransaction flowWritetx = broker.newWriteOnlyTransaction();
-            elanUtils.setupMacFlows(elanInstance, interfaceInfo, elanInstance.getMacTimeout(),
-                    macAddress, !isVlanOrFlatProviderIface, flowWritetx);
-            InstanceIdentifier<MacEntry> macEntryId =
-                    ElanUtils.getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, physAddress);
-            flowWritetx.put(LogicalDatastoreType.OPERATIONAL, macEntryId, newMacEntry,
-                    WriteTransaction.CREATE_MISSING_PARENTS);
-            return Collections.singletonList(flowWritetx.submit());
+            return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                elanUtils.setupMacFlows(elanInstance, interfaceInfo, elanInstance.getMacTimeout(),
+                        macAddress, !isVlanOrFlatProviderIface, tx);
+                InstanceIdentifier<MacEntry> macEntryId =
+                        ElanUtils.getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, physAddress);
+                tx.put(LogicalDatastoreType.OPERATIONAL, macEntryId, newMacEntry,
+                        WriteTransaction.CREATE_MISSING_PARENTS);
+            }));
         });
     }
 
@@ -241,9 +241,9 @@ public class ElanPacketInHandler implements PacketProcessingListener {
                     macEntry.getMacAddress(), macEntry.getInterface());
             return;
         }
-        WriteTransaction flowDeletetx = broker.newWriteOnlyTransaction();
-        elanUtils.deleteMacFlows(elanInfo, oldInterfaceLport, macEntry, flowDeletetx);
-        flowDeletetx.submit();
+        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+            tx -> elanUtils.deleteMacFlows(elanInfo, oldInterfaceLport, macEntry, tx)), LOG,
+            "Error deleting invalid MAC entry");
         elanL2GatewayUtils.removeMacsFromElanExternalDevices(elanInfo,
                 Collections.singletonList(macEntry.getMacAddress()));
     }
index 1ece47d77c296392b25c625069eaca072ab24161..3bc22ae1b1986fd904721d3a0cb111d9c5758378 100644 (file)
@@ -18,8 +18,9 @@ import java.util.Locale;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.MetaDataUtil;
@@ -49,6 +50,7 @@ public class ElanSmacFlowEventListener implements SalFlowListener {
     private static final Logger LOG = LoggerFactory.getLogger(ElanSmacFlowEventListener.class);
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IInterfaceManager interfaceManager;
     private final ElanUtils elanUtils;
     private final JobCoordinator jobCoordinator;
@@ -58,6 +60,7 @@ public class ElanSmacFlowEventListener implements SalFlowListener {
     public ElanSmacFlowEventListener(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.interfaceManager = interfaceManager;
         this.elanUtils = elanUtils;
         this.jobCoordinator = jobCoordinator;
@@ -84,7 +87,7 @@ public class ElanSmacFlowEventListener implements SalFlowListener {
                     .getEthernetSource().getAddress().getValue().toUpperCase(Locale.getDefault());
             int portTag = MetaDataUtil.getLportFromMetadata(metadata).intValue();
             if (portTag == 0) {
-                LOG.debug(String.format("Flow removed event on SMAC flow entry. But having port Tag as 0 "));
+                LOG.debug("Flow removed event on SMAC flow entry. But having port Tag as 0 ");
                 return;
             }
             Optional<IfIndexInterface> existingInterfaceInfo = elanUtils.getInterfaceInfoByInterfaceTag(portTag);
@@ -105,31 +108,30 @@ public class ElanSmacFlowEventListener implements SalFlowListener {
                 String elanInstanceName = elanTagInfo.getName();
                 LOG.info("Deleting the Mac-Entry:{} present on ElanInstance:{}", macEntry, elanInstanceName);
                 if (macEntry != null && interfaceInfo != null) {
-                    WriteTransaction deleteFlowTx = broker.newWriteOnlyTransaction();
-                    elanUtils.deleteMacFlows(elanInstanceCache.get(elanInstanceName).orNull(), interfaceInfo,
-                            macEntry, deleteFlowTx);
-                    ListenableFuture<Void> result = deleteFlowTx.submit();
+                    ListenableFuture<Void> result = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+                        tx -> elanUtils.deleteMacFlows(elanInstanceCache.get(elanInstanceName).orNull(),
+                                interfaceInfo, macEntry, tx));
                     elanFutures.add(result);
                     addCallBack(result, srcMacAddress);
                 }
                 InstanceIdentifier<MacEntry> macEntryIdForElanInterface = ElanUtils
                         .getInterfaceMacEntriesIdentifierOperationalDataPath(interfaceName, physAddress);
-                WriteTransaction tx = broker.newWriteOnlyTransaction();
                 Optional<MacEntry> existingInterfaceMacEntry = ElanUtils.read(broker,
                         LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInterface);
                 if (existingInterfaceMacEntry.isPresent()) {
-                    tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInterface);
-                    MacEntry macEntryInElanInstance = elanUtils.getMacEntryForElanInstance(elanInstanceName,
-                            physAddress).orNull();
-                    if (macEntryInElanInstance != null
-                            && macEntryInElanInstance.getInterface().equals(interfaceName)) {
-                        InstanceIdentifier<MacEntry> macEntryIdForElanInstance = ElanUtils
-                                .getMacEntryOperationalDataPath(elanInstanceName, physAddress);
-                        tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInstance);
-                    }
-                    ListenableFuture<Void> writeResult = tx.submit();
-                    elanFutures.add(writeResult);
-                    addCallBack(writeResult, srcMacAddress);
+                    ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                        tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInterface);
+                        MacEntry macEntryInElanInstance = elanUtils.getMacEntryForElanInstance(elanInstanceName,
+                                physAddress).orNull();
+                        if (macEntryInElanInstance != null
+                                && macEntryInElanInstance.getInterface().equals(interfaceName)) {
+                            InstanceIdentifier<MacEntry> macEntryIdForElanInstance = ElanUtils
+                                    .getMacEntryOperationalDataPath(elanInstanceName, physAddress);
+                            tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryIdForElanInstance);
+                        }
+                    });
+                    elanFutures.add(future);
+                    addCallBack(future, srcMacAddress);
                 }
                 return elanFutures;
             }, ElanConstants.JOB_MAX_RETRIES);
index 8175b7c86e932d07fe55757031dc47690b33f49f..0eb974628266b3d1a7583b022c2ec6eb9952b1b1 100644 (file)
@@ -22,6 +22,7 @@ import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -173,24 +174,22 @@ public class HAOpClusteredListener extends HwvtepNodeBaseListener implements Clu
             return Collections.emptySet();
         }
         return candidateds.stream()
-                .filter((iid) -> connectedNodes.contains(iid))
+                .filter(connectedNodes::contains)
                 .collect(Collectors.toSet());
     }
 
     public synchronized void runAfterNodeIsConnected(InstanceIdentifier<Node> iid, Consumer<Optional<Node>> consumer) {
         if (connectedNodes.contains(iid)) {
-            ReadWriteTransaction tx = getDataBroker().newReadWriteTransaction();
             HAJobScheduler.getInstance().submitJob(() -> {
-                try {
+                try (ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
                     consumer.accept(tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet());
                 } catch (ReadFailedException e) {
                     LOG.error("Failed to read oper ds {}", iid);
                 }
             });
-            return;
+        } else {
+            waitingJobs.computeIfAbsent(iid, key -> Sets.newConcurrentHashSet()).add(consumer);
         }
-        waitingJobs.putIfAbsent(iid, Sets.newConcurrentHashSet());
-        waitingJobs.get(iid).add(consumer);
     }
 }
 
index af777508256d0b303c8c32ac5c75de7a5e78b32e..5fd0d3ff91dab89eaac68f2dbeb16779786fe75f 100644 (file)
@@ -7,15 +7,18 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
 import java.util.List;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.ElanInstances;
@@ -33,12 +36,14 @@ public class ElanInstanceListener extends AsyncClusteredDataTreeChangeListenerBa
     private static final Logger LOG = LoggerFactory.getLogger(ElanInstanceListener.class);
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanClusterUtils elanClusterUtils;
 
     @Inject
     public ElanInstanceListener(final DataBroker db, final ElanClusterUtils elanClusterUtils) {
         super(ElanInstance.class, ElanInstanceListener.class);
         broker = db;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(db);
         this.elanClusterUtils = elanClusterUtils;
     }
 
@@ -56,21 +61,21 @@ public class ElanInstanceListener extends AsyncClusteredDataTreeChangeListenerBa
                 List<L2gatewayConnection> connections =
                         L2GatewayConnectionUtils.getL2GwConnectionsByElanName(
                                 this.broker, del.getElanInstanceName());
-                if (connections == null || connections.isEmpty()) {
-                    return null;
+                if (connections.isEmpty()) {
+                    return Collections.emptyList();
                 }
-                try {
-                    ReadWriteTransaction tx = this.broker.newReadWriteTransaction();
+                ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
                     for (L2gatewayConnection connection : connections) {
-                        InstanceIdentifier<L2gatewayConnection> iid = InstanceIdentifier.create(Neutron.class)
-                            .child(L2gatewayConnections.class).child(L2gatewayConnection.class, connection.getKey());
+                        InstanceIdentifier<L2gatewayConnection> iid =
+                                InstanceIdentifier.create(Neutron.class).child(
+                                        L2gatewayConnections.class).child(
+                                        L2gatewayConnection.class, connection.getKey());
                         tx.delete(LogicalDatastoreType.CONFIGURATION, iid);
                     }
-                    tx.submit().checkedGet();
-                } catch (TransactionCommitFailedException e) {
-                    LOG.error("Failed to delete associated l2gwconnection while deleting network", e);
-                }
-                return null;
+                });
+                ListenableFutures.addErrorLogging(future, LOG,
+                        "Failed to delete associate L2 gateway connection while deleting network");
+                return Collections.singletonList(future);
             });
     }
 
index bfb17b03129c0de62c9f93999b9fe8edfcded1aa..de467bbc4d1bf4de8156af62bca38e77495916bd 100644 (file)
@@ -22,9 +22,11 @@ import javax.inject.Singleton;
 
 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepAbstractDataTreeChangeListener;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
@@ -88,6 +90,7 @@ public class HwvtepPhysicalSwitchListener
 
     /** The data broker. */
     private final DataBroker dataBroker;
+    private final ManagedNewTransactionRunner txRunner;
 
     /** The itm rpc service. */
     private final ItmRpcService itmRpcService;
@@ -150,6 +153,7 @@ public class HwvtepPhysicalSwitchListener
             StaleVlanBindingsCleaner staleVlanBindingsCleaner) {
         super(PhysicalSwitchAugmentation.class, HwvtepPhysicalSwitchListener.class);
         this.dataBroker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.itmRpcService = itmRpcService;
         this.elanClusterUtils = elanClusterUtils;
         this.l2gwServiceProvider = l2gwServiceProvider;
@@ -323,11 +327,12 @@ public class HwvtepPhysicalSwitchListener
         });
     }
 
-    boolean updateHACacheIfHANode(DataBroker broker, InstanceIdentifier<Node> globalNodeId)
+    boolean updateHACacheIfHANode(InstanceIdentifier<Node> globalNodeId)
             throws ExecutionException, InterruptedException {
-        ReadWriteTransaction transaction = broker.newReadWriteTransaction();
-        Node node = transaction.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().get();
-        HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node);
+        try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction()) {
+            tx.read(LogicalDatastoreType.OPERATIONAL, globalNodeId).get().toJavaUtil().ifPresent(
+                node -> HAOpClusteredListener.addToCacheIfHAChildNode(globalNodeId, node));
+        }
         return hwvtepHACache.isHAEnabledDevice(globalNodeId);
     }
 
@@ -400,13 +405,14 @@ public class HwvtepPhysicalSwitchListener
     private void updateConfigTunnelIp(InstanceIdentifier<PhysicalSwitchAugmentation> identifier,
                                       PhysicalSwitchAugmentation phySwitchAdded) {
         if (phySwitchAdded.getTunnelIps() != null) {
-            ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-            PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
-            psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
-            tx.merge(LogicalDatastoreType.CONFIGURATION, identifier, psBuilder.build());
-            LOG.trace("Updating config tunnel ips {}", identifier);
-            ListenableFutures.addErrorLogging(tx.submit(), LOG,
-                    "Failed to update config tunnel ip for iid {}", identifier);
+            ListenableFutures.addErrorLogging(
+                txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+                    PhysicalSwitchAugmentationBuilder psBuilder = new PhysicalSwitchAugmentationBuilder();
+                    psBuilder.setTunnelIps(phySwitchAdded.getTunnelIps());
+                    tx.merge(LogicalDatastoreType.CONFIGURATION, identifier, psBuilder.build());
+                    LOG.trace("Updating config tunnel ips {}", identifier);
+                }),
+                LOG, "Failed to update config tunnel ip for iid {}", identifier);
         }
     }
 }
index 12f8d1762671314d96840147d750771ce323ccac..a62dd54def9a37d58935db6b3b2ab95c0bdffd9e 100644 (file)
@@ -7,29 +7,22 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import javax.annotation.Nonnull;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.hwvtep.HwvtepClusteredDataTreeChangeListener;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
 import org.opendaylight.genius.utils.hwvtep.HwvtepUtils;
 import org.opendaylight.netvirt.elan.l2gw.utils.ElanL2GatewayUtils;
 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
-import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
 import org.opendaylight.netvirt.elan.utils.ElanClusterUtils;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
@@ -61,6 +54,7 @@ public class HwvtepTerminationPointListener
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepTerminationPointListener.class);
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanL2GatewayUtils elanL2GatewayUtils;
     private final ElanClusterUtils elanClusterUtils;
     private final L2GatewayCache l2GatewayCache;
@@ -71,6 +65,7 @@ public class HwvtepTerminationPointListener
         super(TerminationPoint.class, HwvtepTerminationPointListener.class);
 
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.elanL2GatewayUtils = elanL2GatewayUtils;
         this.elanClusterUtils = elanClusterUtils;
         this.l2GatewayCache = l2GatewayCache;
@@ -85,11 +80,9 @@ public class HwvtepTerminationPointListener
         final HwvtepPhysicalPortAugmentation portAugmentation =
                 del.getAugmentation(HwvtepPhysicalPortAugmentation.class);
         if (portAugmentation != null) {
-            final NodeId nodeId = identifier.firstIdentifierOf(Node.class).firstKeyOf(Node.class).getNodeId();
             elanClusterUtils.runOnlyInOwnerNode(HwvtepSouthboundConstants.ELAN_ENTITY_NAME,
                 "Handling Physical port delete",
-                () -> handlePortDeleted(identifier, portAugmentation, del, nodeId));
-            return;
+                () -> handlePortDeleted(identifier));
         }
     }
 
@@ -151,33 +144,11 @@ public class HwvtepTerminationPointListener
         return Collections.emptyList();
     }
 
-    private List<ListenableFuture<Void>> handlePortDeleted(InstanceIdentifier<TerminationPoint> identifier,
-                                                           HwvtepPhysicalPortAugmentation portAugmentation,
-                                                           TerminationPoint portDeleted,
-                                                           NodeId psNodeId) throws ReadFailedException {
+    private List<ListenableFuture<Void>> handlePortDeleted(InstanceIdentifier<TerminationPoint> identifier) {
         InstanceIdentifier<Node> psNodeIid = identifier.firstIdentifierOf(Node.class);
-        final ReadWriteTransaction tx = broker.newReadWriteTransaction();
-        final SettableFuture<Void> settableFuture = SettableFuture.create();
-        List<ListenableFuture<Void>> futures = Collections.singletonList(settableFuture);
-        Futures.addCallback(tx.read(LogicalDatastoreType.CONFIGURATION, psNodeIid),
-            new FutureCallback<Optional<Node>>() {
-                @Override
-                public void onSuccess(@Nonnull Optional<Node> nodeOptional) {
-                    if (nodeOptional.isPresent()) {
-                        //case of port deleted
-                        tx.delete(LogicalDatastoreType.CONFIGURATION, identifier);
-                        Futures.addCallback(tx.submit(), new SettableFutureCallback<>(settableFuture),
-                                MoreExecutors.directExecutor());
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable failure) {
-                    LOG.error("Read of {} failed", psNodeIid, failure);
-                    tx.cancel();
-                }
-            }, MoreExecutors.directExecutor());
-        return futures;
+        return Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(
+            tx -> tx.read(LogicalDatastoreType.CONFIGURATION, psNodeIid).checkedGet().toJavaUtil().ifPresent(
+                node -> tx.delete(LogicalDatastoreType.CONFIGURATION, identifier))));
     }
 
     private List<VlanBindings> getVlanBindings(List<L2gatewayConnection> l2GwConns, NodeId hwvtepNodeId, String psName,
index 21e6496fc52e295bcf1b1c5d4ae305998ff157da..273c257e92e971a08ea03e71ca96307f53225754 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.netvirt.elan.l2gw.listeners;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.Collections;
@@ -17,6 +16,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Predicate;
+import javax.annotation.Nullable;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -24,13 +24,14 @@ import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeLis
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.utils.batching.ResourceBatchingManager;
 import org.opendaylight.genius.utils.hwvtep.HwvtepHACache;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
@@ -52,19 +53,17 @@ public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, S
 
     public static final String NODE_CHECK = "physical";
 
-    private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE_IID = (iid) -> {
-        return iid.firstKeyOf(Node.class).getNodeId().getValue().contains(NODE_CHECK);
-    };
+    private static final Predicate<InstanceIdentifier<Node>> IS_PS_NODE_IID =
+        (iid) -> iid.firstKeyOf(Node.class).getNodeId().getValue().contains(NODE_CHECK);
 
-    private static final Predicate<InstanceIdentifier<Node>> IS_NOT_HA_CHILD = (iid) -> {
-        return !HwvtepHACache.getInstance().isHAEnabledDevice(iid)
+    private static final Predicate<InstanceIdentifier<Node>> IS_NOT_HA_CHILD =
+        (iid) -> !HwvtepHACache.getInstance().isHAEnabledDevice(iid)
                 && !iid.firstKeyOf(Node.class).getNodeId().getValue().contains(HwvtepHAUtil.PHYSICALSWITCH);
-    };
 
-    private static final Predicate<InstanceIdentifier<Node>> IS_HA_CHILD = (iid) -> {
-        return HwvtepHACache.getInstance().isHAEnabledDevice(iid);
-    };
+    private static final Predicate<InstanceIdentifier<Node>> IS_HA_CHILD =
+        (iid) -> HwvtepHACache.getInstance().isHAEnabledDevice(iid);
 
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanL2GatewayUtils elanL2GatewayUtils;
     private final HAOpClusteredListener haOpClusteredListener;
     private final JobCoordinator jobCoordinator;
@@ -77,6 +76,7 @@ public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, S
                                  final JobCoordinator jobCoordinator,
                                  final ElanInstanceCache elanInstanceCache) {
         super(dataBroker, false);
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.elanL2GatewayUtils = elanL2GatewayUtils;
         this.haOpClusteredListener = haOpClusteredListener;
         this.jobCoordinator = jobCoordinator;
@@ -96,8 +96,7 @@ public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, S
     }
 
     protected String getElanName(final LocalUcastMacs mac) {
-        return ((InstanceIdentifier<LogicalSwitches>) mac.getLogicalSwitchRef().getValue())
-                .firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
+        return mac.getLogicalSwitchRef().getValue().firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
     }
 
     @Override
@@ -109,16 +108,8 @@ public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, S
     @Override
     protected void onUpdate(final Map<String, Map<InstanceIdentifier, LocalUcastMacs>> updatedMacsGrouped,
                             final Map<String, Map<InstanceIdentifier, LocalUcastMacs>> deletedMacsGrouped) {
-        updatedMacsGrouped.entrySet().forEach((entry) -> {
-            entry.getValue().entrySet().forEach((entry2) -> {
-                added(entry2.getKey(), entry2.getValue());
-            });
-        });
-        deletedMacsGrouped.entrySet().forEach((entry) -> {
-            entry.getValue().entrySet().forEach((entry2) -> {
-                removed(entry2.getKey(), entry2.getValue());
-            });
-        });
+        updatedMacsGrouped.forEach((key, value) -> value.forEach(this::added));
+        deletedMacsGrouped.forEach((key, value) -> value.forEach(this::removed));
     }
 
     public void removed(final InstanceIdentifier<LocalUcastMacs> identifier, final LocalUcastMacs macRemoved) {
@@ -215,20 +206,18 @@ public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, S
         if (IS_PS_NODE_IID.test(nodeIid)) {
             return;
         }
-        ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
-        haOpClusteredListener.onGlobalNodeAdd(nodeIid, modification.getRootNode().getDataAfter(), tx);
-        tx.submit();
-        if (IS_HA_CHILD.test(nodeIid)) {
-            return;
-        }
-
-        LOG.trace("On parent add {}", nodeIid);
-        Node operNode = modification.getRootNode().getDataAfter();
-        Optional<Node> configNode = MDSALUtil.read(dataBroker, LogicalDatastoreType.CONFIGURATION, nodeIid);
-        Set<LocalUcastMacs> configMacs = getMacs(configNode);
-        Set<LocalUcastMacs> operMacs = getMacs(Optional.of(operNode));
-        Set<LocalUcastMacs> staleMacs = Sets.difference(configMacs, operMacs);
-        staleMacs.forEach(staleMac -> removed(getMacIid(nodeIid, staleMac), staleMac));
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            haOpClusteredListener.onGlobalNodeAdd(nodeIid, modification.getRootNode().getDataAfter(), tx);
+            if (!IS_HA_CHILD.test(nodeIid)) {
+                LOG.trace("On parent add {}", nodeIid);
+                Node operNode = modification.getRootNode().getDataAfter();
+                Set<LocalUcastMacs> configMacs =
+                        getMacs(tx.read(LogicalDatastoreType.CONFIGURATION, nodeIid).checkedGet().orNull());
+                Set<LocalUcastMacs> operMacs = getMacs(operNode);
+                Set<LocalUcastMacs> staleMacs = Sets.difference(configMacs, operMacs);
+                staleMacs.forEach(staleMac -> removed(getMacIid(nodeIid, staleMac), staleMac));
+            }
+        }), LOG, "Error processing added parent");
     }
 
     InstanceIdentifier<LocalUcastMacs> getMacIid(InstanceIdentifier<Node> nodeIid, LocalUcastMacs mac) {
@@ -236,9 +225,9 @@ public class LocalUcastMacListener extends ChildListener<Node, LocalUcastMacs, S
                 .child(LocalUcastMacs.class, mac.getKey());
     }
 
-    Set<LocalUcastMacs> getMacs(Optional<Node> node) {
-        if (node.isPresent()) {
-            HwvtepGlobalAugmentation augmentation = node.get().getAugmentation(HwvtepGlobalAugmentation.class);
+    private Set<LocalUcastMacs> getMacs(@Nullable Node node) {
+        if (node != null) {
+            HwvtepGlobalAugmentation augmentation = node.getAugmentation(HwvtepGlobalAugmentation.class);
             if (augmentation != null && augmentation.getLocalUcastMacs() != null) {
                 return new HashSet<>(augmentation.getLocalUcastMacs());
             }
index 488ee4846893d73323b1b23a0a6658312d1d1019..6563f72bc455ed1f92a5b76d0b250c9a31beb3a2 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.netvirt.elan.l2gw.utils;
 
 import static org.opendaylight.netvirt.elan.utils.ElanUtils.isVxlan;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.math.BigInteger;
@@ -25,6 +24,8 @@ import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.mdsalutil.actions.ActionGroup;
@@ -81,6 +82,7 @@ public class ElanL2GatewayMulticastUtils {
 
     /** The broker. */
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
 
     private final ElanItmUtils elanItmUtils;
     private final JobCoordinator jobCoordinator;
@@ -92,6 +94,7 @@ public class ElanL2GatewayMulticastUtils {
     public ElanL2GatewayMulticastUtils(DataBroker broker, ElanItmUtils elanItmUtils, JobCoordinator jobCoordinator,
             ElanUtils elanUtils, IMdsalApiManager mdsalManager, IInterfaceManager interfaceManager) {
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.elanItmUtils = elanItmUtils;
         this.jobCoordinator = jobCoordinator;
         this.elanUtils = elanUtils;
@@ -122,16 +125,11 @@ public class ElanL2GatewayMulticastUtils {
      */
     @SuppressWarnings("checkstyle:IllegalCatch")
     public ListenableFuture<Void> updateRemoteMcastMacOnElanL2GwDevices(String elanName) {
-        try {
-            WriteTransaction transaction = broker.newWriteOnlyTransaction();
+        return txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
             for (L2GatewayDevice device : ElanL2GwCacheUtils.getInvolvedL2GwDevices(elanName).values()) {
-                prepareRemoteMcastMacUpdateOnDevice(transaction, elanName, device);
+                prepareRemoteMcastMacUpdateOnDevice(tx, elanName, device);
             }
-            return transaction.submit();
-        } catch (RuntimeException e) {
-            LOG.error("Failed to configure mcast mac on elan " + elanName, e);
-            return Futures.immediateFailedCheckedFuture(e);
-        }
+        });
     }
 
     public void scheduleMcastMacUpdateJob(String elanName, L2GatewayDevice device) {
@@ -149,9 +147,8 @@ public class ElanL2GatewayMulticastUtils {
      * @return the listenable future
      */
     public ListenableFuture<Void> updateRemoteMcastMacOnElanL2GwDevice(String elanName, L2GatewayDevice device) {
-        WriteTransaction transaction = broker.newWriteOnlyTransaction();
-        prepareRemoteMcastMacUpdateOnDevice(transaction, elanName, device);
-        return transaction.submit();
+        return txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+            tx -> prepareRemoteMcastMacUpdateOnDevice(tx, elanName, device));
     }
 
     public void prepareRemoteMcastMacUpdateOnDevice(WriteTransaction transaction,String elanName,
@@ -199,19 +196,19 @@ public class ElanL2GatewayMulticastUtils {
         // return ft;
         // }
 
-        WriteTransaction transaction = broker.newWriteOnlyTransaction();
-        if (updateThisDevice) {
-            preapareRemoteMcastMacEntry(transaction, elanName, device, dpnsTepIps, l2GwDevicesTepIps);
-        }
+        return txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            if (updateThisDevice) {
+                preapareRemoteMcastMacEntry(tx, elanName, device, dpnsTepIps, l2GwDevicesTepIps);
+            }
 
-        // TODO: Need to revisit below logic as logical switches might not be
-        // present to configure RemoteMcastMac entry
-        for (L2GatewayDevice otherDevice : devices.values()) {
-            if (!otherDevice.getDeviceName().equals(device.getDeviceName())) {
-                preapareRemoteMcastMacEntry(transaction, elanName, otherDevice, dpnsTepIps, l2GwDevicesTepIps);
+            // TODO: Need to revisit below logic as logical switches might not be
+            // present to configure RemoteMcastMac entry
+            for (L2GatewayDevice otherDevice : devices.values()) {
+                if (!otherDevice.getDeviceName().equals(device.getDeviceName())) {
+                    preapareRemoteMcastMacEntry(tx, elanName, otherDevice, dpnsTepIps, l2GwDevicesTepIps);
+                }
             }
-        }
-        return transaction.submit();
+        });
 
     }
 
index 2ebd0e5b2732c8b1409892106e315c23542f6d0e..ed64743430cebe529fb610951e99b28a86a1425c 100644 (file)
@@ -37,10 +37,11 @@ import javax.inject.Singleton;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.utils.SystemPropertyReader;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
@@ -113,6 +114,7 @@ public class ElanL2GatewayUtils {
     private static final int DEFAULT_LOGICAL_SWITCH_DELETE_DELAY_SECS = 20;
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanDmacUtils elanDmacUtils;
     private final ElanItmUtils elanItmUtils;
     private final ElanClusterUtils elanClusterUtils;
@@ -135,6 +137,7 @@ public class ElanL2GatewayUtils {
             Scheduler scheduler, ElanConfig elanConfig, ElanInstanceCache elanInstanceCache,
             ElanInstanceDpnsCache elanInstanceDpnsCache) {
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.elanDmacUtils = elanDmacUtils;
         this.elanItmUtils = elanItmUtils;
         this.elanClusterUtils = elanClusterUtils;
@@ -223,9 +226,7 @@ public class ElanL2GatewayUtils {
                 .createLogicalSwitchesInstanceIdentifier(nodeId, new HwvtepNodeName(logicalSwitchName));
         RemoteMcastMacsKey remoteMcastMacsKey = new RemoteMcastMacsKey(new HwvtepLogicalSwitchRef(logicalSwitch),
                 new MacAddress(ElanConstants.UNKNOWN_DMAC));
-        RemoteMcastMacs remoteMcastMac = HwvtepUtils.getRemoteMcastMac(broker, datastoreType, nodeId,
-                remoteMcastMacsKey);
-        return remoteMcastMac;
+        return HwvtepUtils.getRemoteMcastMac(broker, datastoreType, nodeId, remoteMcastMacsKey);
     }
 
     /**
@@ -270,14 +271,9 @@ public class ElanL2GatewayUtils {
                 (InstanceIdentifier<LogicalSwitches>) localUcastMac.getLogicalSwitchRef().getValue());
         if (lsOpc.isPresent()) {
             LogicalSwitches ls = lsOpc.get();
-            if (ls != null) {
-                // Logical switch name is Elan name
-                String elanName = getElanFromLogicalSwitch(ls.getHwvtepNodeName().getValue());
-                return elanInstanceCache.get(elanName).orNull();
-            } else {
-                String macAddress = localUcastMac.getMacEntryKey().getValue();
-                LOG.error("Could not find logical_switch for {} being added/deleted", macAddress);
-            }
+            // Logical switch name is Elan name
+            String elanName = getElanFromLogicalSwitch(ls.getHwvtepNodeName().getValue());
+            return elanInstanceCache.get(elanName).orNull();
         }
         return null;
     }
@@ -811,30 +807,29 @@ public class ElanL2GatewayUtils {
             return Futures.immediateFailedFuture(new RuntimeException(errMsg));
         }
 
-        WriteTransaction transaction = broker.newWriteOnlyTransaction();
-        for (org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712
-                 .l2gateway.attributes.devices.Interfaces deviceInterface : hwVtepDevice.getInterfaces()) {
-            //Removed the check for checking terminationPoint present in OP or not
-            //for coniguring vlan bindings
-            //As we are not any more dependent on it , plugin takes care of this
-            // with port reconcilation.
-            List<VlanBindings> vlanBindings = new ArrayList<>();
-            if (deviceInterface.getSegmentationIds() != null && !deviceInterface.getSegmentationIds().isEmpty()) {
-                for (Integer vlanId : deviceInterface.getSegmentationIds()) {
-                    vlanBindings.add(HwvtepSouthboundUtils.createVlanBinding(nodeId, vlanId, logicalSwitchName));
+        return txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            for (org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712
+                    .l2gateway.attributes.devices.Interfaces deviceInterface : hwVtepDevice.getInterfaces()) {
+                //Removed the check for checking terminationPoint present in OP or not
+                //for coniguring vlan bindings
+                //As we are not any more dependent on it , plugin takes care of this
+                // with port reconcilation.
+                List<VlanBindings> vlanBindings = new ArrayList<>();
+                if (deviceInterface.getSegmentationIds() != null && !deviceInterface.getSegmentationIds().isEmpty()) {
+                    for (Integer vlanId : deviceInterface.getSegmentationIds()) {
+                        vlanBindings.add(HwvtepSouthboundUtils.createVlanBinding(nodeId, vlanId, logicalSwitchName));
+                    }
+                } else {
+                    // Use defaultVlanId (specified in L2GatewayConnection) if Vlan
+                    // ID not specified at interface level.
+                    vlanBindings.add(HwvtepSouthboundUtils.createVlanBinding(nodeId, defaultVlanId, logicalSwitchName));
                 }
-            } else {
-                // Use defaultVlanId (specified in L2GatewayConnection) if Vlan
-                // ID not specified at interface level.
-                vlanBindings.add(HwvtepSouthboundUtils.createVlanBinding(nodeId, defaultVlanId, logicalSwitchName));
+                HwvtepUtils.mergeVlanBindings(tx, nodeId, hwVtepDevice.getDeviceName(),
+                        deviceInterface.getInterfaceName(), vlanBindings);
             }
-            HwvtepUtils.mergeVlanBindings(transaction, nodeId, hwVtepDevice.getDeviceName(),
-                    deviceInterface.getInterfaceName(), vlanBindings);
-        }
-        ListenableFuture<Void> future = transaction.submit();
-        LOG.info("Updated Hwvtep VlanBindings in config DS. NodeID: {}, LogicalSwitch: {}", nodeId.getValue(),
-                logicalSwitchName);
-        return future;
+            LOG.info("Updated Hwvtep VlanBindings in config DS. NodeID: {}, LogicalSwitch: {}", nodeId.getValue(),
+                    logicalSwitchName);
+        });
     }
 
     /**
@@ -852,11 +847,10 @@ public class ElanL2GatewayUtils {
      */
     public ListenableFuture<Void> updateVlanBindingsInL2GatewayDevice(NodeId nodeId, String psName,
             String interfaceName, List<VlanBindings> vlanBindings) {
-        WriteTransaction transaction = broker.newWriteOnlyTransaction();
-        HwvtepUtils.mergeVlanBindings(transaction, nodeId, psName, interfaceName, vlanBindings);
-        ListenableFuture<Void> future = transaction.submit();
-        LOG.info("Updated Hwvtep VlanBindings in config DS. NodeID: {}", nodeId.getValue());
-        return future;
+        return txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            HwvtepUtils.mergeVlanBindings(tx, nodeId, psName, interfaceName, vlanBindings);
+            LOG.info("Updated Hwvtep VlanBindings in config DS. NodeID: {}", nodeId.getValue());
+        });
     }
 
     /**
@@ -879,25 +873,23 @@ public class ElanL2GatewayUtils {
         }
         NodeId physicalSwitchNodeId = HwvtepSouthboundUtils.createManagedNodeId(nodeId, hwVtepDevice.getDeviceName());
 
-        WriteTransaction transaction = broker.newWriteOnlyTransaction();
-        for (org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712
-                 .l2gateway.attributes.devices.Interfaces deviceInterface : hwVtepDevice.getInterfaces()) {
-            String phyPortName = deviceInterface.getInterfaceName();
-            if (deviceInterface.getSegmentationIds() != null && !deviceInterface.getSegmentationIds().isEmpty()) {
-                for (Integer vlanId : deviceInterface.getSegmentationIds()) {
-                    HwvtepUtils.deleteVlanBinding(transaction, physicalSwitchNodeId, phyPortName, vlanId);
+        return txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            for (org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712
+                    .l2gateway.attributes.devices.Interfaces deviceInterface : hwVtepDevice.getInterfaces()) {
+                String phyPortName = deviceInterface.getInterfaceName();
+                if (deviceInterface.getSegmentationIds() != null && !deviceInterface.getSegmentationIds().isEmpty()) {
+                    for (Integer vlanId : deviceInterface.getSegmentationIds()) {
+                        HwvtepUtils.deleteVlanBinding(tx, physicalSwitchNodeId, phyPortName, vlanId);
+                    }
+                } else {
+                    // Use defaultVlanId (specified in L2GatewayConnection) if Vlan
+                    // ID not specified at interface level.
+                    HwvtepUtils.deleteVlanBinding(tx, physicalSwitchNodeId, phyPortName, defaultVlanId);
                 }
-            } else {
-                // Use defaultVlanId (specified in L2GatewayConnection) if Vlan
-                // ID not specified at interface level.
-                HwvtepUtils.deleteVlanBinding(transaction, physicalSwitchNodeId, phyPortName, defaultVlanId);
             }
-        }
-        ListenableFuture<Void> future = transaction.submit();
-
-        LOG.info("Deleted Hwvtep VlanBindings from config DS. NodeID: {}, hwVtepDevice: {}, defaultVlanId: {} ",
-                nodeId.getValue(), hwVtepDevice, defaultVlanId);
-        return future;
+            LOG.info("Deleted Hwvtep VlanBindings from config DS. NodeID: {}, hwVtepDevice: {}, defaultVlanId: {} ",
+                    nodeId.getValue(), hwVtepDevice, defaultVlanId);
+        });
     }
 
     /**
index 56be989feeeb833bf6a86ff7cd26119e28cdc7bc..4bd8aebbebd37d6070b9bc29b40f2d2d9402e581 100644 (file)
@@ -24,13 +24,13 @@ import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
-import org.opendaylight.netvirt.elan.utils.ElanUtils;
 import org.opendaylight.netvirt.elan.utils.Scheduler;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
@@ -54,24 +54,20 @@ public class StaleVlanBindingsCleaner {
     private static final Logger LOG = LoggerFactory.getLogger(StaleVlanBindingsCleaner.class);
     private static final int DEFAULT_STALE_CLEANUP_DELAY_SECS = 900;
 
-    private static Function<VlanBindings, String> LOGICAL_SWITCH_FROM_BINDING = (binding) -> {
-        InstanceIdentifier<LogicalSwitches> lsRef = (InstanceIdentifier<LogicalSwitches>)
-                binding.getLogicalSwitchRef().getValue();
-        return lsRef.firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
-    };
+    private static Function<VlanBindings, String> LOGICAL_SWITCH_FROM_BINDING =
+        (binding) -> binding.getLogicalSwitchRef().getValue().firstKeyOf(
+                LogicalSwitches.class).getHwvtepNodeName().getValue();
 
-    private static BiPredicate<List<String>, String> IS_STALE_LOGICAL_SWITCH = (validNetworks, logicalSwitch) -> {
-        return !validNetworks.contains(logicalSwitch);
-    };
+    private static BiPredicate<List<String>, String> IS_STALE_LOGICAL_SWITCH =
+        (validNetworks, logicalSwitch) -> !validNetworks.contains(logicalSwitch);
 
-    private static Predicate<TerminationPoint> CONTAINS_VLANBINDINGS = (port) -> {
-        return port.getAugmentation(HwvtepPhysicalPortAugmentation.class) != null
-                && port.getAugmentation(HwvtepPhysicalPortAugmentation.class).getVlanBindings() != null;
-    };
+    private static Predicate<TerminationPoint> CONTAINS_VLANBINDINGS = (port) ->
+            port.getAugmentation(HwvtepPhysicalPortAugmentation.class) != null
+                    && port.getAugmentation(HwvtepPhysicalPortAugmentation.class).getVlanBindings() != null;
 
 
     private final DataBroker broker;
-    private final ElanUtils elanUtils;
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanL2GatewayUtils elanL2GatewayUtils;
     private final Scheduler scheduler;
     private final ElanConfig elanConfig;
@@ -81,14 +77,13 @@ public class StaleVlanBindingsCleaner {
 
     @Inject
     public StaleVlanBindingsCleaner(final DataBroker broker,
-                                    final ElanUtils elanUtils,
                                     final ElanL2GatewayUtils elanL2GatewayUtils,
                                     final Scheduler scheduler,
                                     final ElanConfig elanConfig,
                                     final L2GatewayCache l2GatewayCache,
                                     final ElanInstanceCache elanInstanceCache) {
         this.broker = broker;
-        this.elanUtils = elanUtils;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.elanL2GatewayUtils = elanL2GatewayUtils;
         this.scheduler = scheduler;
         this.elanConfig = elanConfig;
@@ -193,13 +188,14 @@ public class StaleVlanBindingsCleaner {
                                       final String staleLogicalSwitch) {
 
         LOG.trace("CleanupStaleBindings for logical switch {}", staleLogicalSwitch);
-        ReadWriteTransaction tx = broker.newReadWriteTransaction();
-        if (vlans.containsKey(staleLogicalSwitch)) {
-            vlans.get(staleLogicalSwitch)
-                    .forEach((vlanIid) -> tx.delete(LogicalDatastoreType.CONFIGURATION, vlanIid));
-        }
-        ListenableFutures.addErrorLogging(tx.submit(), LOG,
-                "Failed to delete stale vlan bindings from node {}", globalNodeId);
+        ListenableFutures.addErrorLogging(
+            txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+                if (vlans.containsKey(staleLogicalSwitch)) {
+                    vlans.get(staleLogicalSwitch)
+                            .forEach((vlanIid) -> tx.delete(LogicalDatastoreType.CONFIGURATION, vlanIid));
+                }
+            }),
+            LOG, "Failed to delete stale vlan bindings from node {}", globalNodeId);
         elanL2GatewayUtils.scheduleDeleteLogicalSwitch(new NodeId(globalNodeId), staleLogicalSwitch, true);
     }
 
index 150dfbaa1c6fc52c57b0af59867ad3e8db12f086..05ee64fb3d5bb746b3d8b6ca957bb490b3a021b4 100644 (file)
@@ -8,12 +8,18 @@
 package org.opendaylight.netvirt.elan.utils;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.List;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.PhysAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.interfaces.elan._interface.StaticMacEntries;
@@ -30,11 +36,13 @@ public class ElanForwardingEntriesHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ElanForwardingEntriesHandler.class);
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final ElanUtils elanUtils;
 
     @Inject
     public ElanForwardingEntriesHandler(DataBroker dataBroker, ElanUtils elanUtils) {
         this.broker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.elanUtils = elanUtils;
     }
 
@@ -119,14 +127,20 @@ public class ElanForwardingEntriesHandler {
     }
 
     public void deleteElanInterfaceForwardingEntries(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
-            MacEntry macEntry, WriteTransaction tx) {
-        InstanceIdentifier<MacEntry> macEntryId = ElanUtils
-                .getMacEntryOperationalDataPath(elanInfo.getElanInstanceName(), macEntry.getMacAddress());
-        tx.delete(LogicalDatastoreType.OPERATIONAL, macEntryId);
-        deleteElanInterfaceForwardingTablesList(interfaceInfo.getInterfaceName(), macEntry, tx);
-        WriteTransaction deleteFlowtx = broker.newWriteOnlyTransaction();
-        elanUtils.deleteMacFlows(elanInfo, interfaceInfo, macEntry, deleteFlowtx);
-        deleteFlowtx.submit();
+            MacEntry macEntry) {
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(interfaceTx -> {
+            InstanceIdentifier<MacEntry> macEntryId = ElanUtils
+                    .getMacEntryOperationalDataPath(elanInfo.getElanInstanceName(), macEntry.getMacAddress());
+            interfaceTx.delete(LogicalDatastoreType.OPERATIONAL, macEntryId);
+            deleteElanInterfaceForwardingTablesList(interfaceInfo.getInterfaceName(), macEntry, interfaceTx);
+            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(flowTx -> {
+                elanUtils.deleteMacFlows(elanInfo, interfaceInfo, macEntry, flowTx);
+            }));
+        }));
+        for (ListenableFuture<Void> future : futures) {
+            ListenableFutures.addErrorLogging(future, LOG, "Error deleting ELAN interface forwarding entries");
+        }
     }
 
     public void deleteElanInterfaceMacForwardingEntries(String interfaceName, PhysAddress physAddress,
index 3c1f3d579253b8498aa19b1d923ca730baf17549..7d6cabadfdc0f5d76254cc277924e3fd3f0d4b78 100755 (executable)
@@ -31,10 +31,13 @@ import javax.inject.Singleton;
 import org.apache.commons.lang3.StringUtils;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceServiceUtil;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
@@ -60,6 +63,7 @@ import org.opendaylight.genius.mdsalutil.packet.ARP;
 import org.opendaylight.genius.mdsalutil.packet.Ethernet;
 import org.opendaylight.genius.mdsalutil.packet.IPv4;
 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.ElanException;
 import org.opendaylight.netvirt.elan.arp.responder.ArpResponderUtil;
 import org.opendaylight.netvirt.elan.cache.ElanInterfaceCache;
@@ -174,6 +178,7 @@ public class ElanUtils {
     private static final Logger LOG = LoggerFactory.getLogger(ElanUtils.class);
 
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IMdsalApiManager mdsalManager;
     private final OdlInterfaceRpcService interfaceManagerRpcService;
     private final ItmRpcService itmRpcService;
@@ -201,6 +206,7 @@ public class ElanUtils {
             IInterfaceManager interfaceManager, ElanEtreeUtils elanEtreeUtils, ElanItmUtils elanItmUtils,
             ElanInterfaceCache elanInterfaceCache) {
         this.broker = dataBroker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.mdsalManager = mdsalManager;
         this.interfaceManagerRpcService = interfaceManagerRpcService;
         this.itmRpcService = itmRpcService;
@@ -308,6 +314,12 @@ public class ElanUtils {
         return MDSALUtil.read(broker, LogicalDatastoreType.OPERATIONAL, elanIdentifier).orNull();
     }
 
+    @Nullable
+    public static Elan getElanByName(ReadTransaction tx, String elanInstanceName) throws ReadFailedException {
+        return tx.read(LogicalDatastoreType.OPERATIONAL,
+                getElanInstanceOperationalDataPath(elanInstanceName)).checkedGet().orNull();
+    }
+
     public static InstanceIdentifier<Elan> getElanInstanceOperationalDataPath(String elanInstanceName) {
         return InstanceIdentifier.builder(ElanState.class).child(Elan.class, new ElanKey(elanInstanceName)).build();
     }
@@ -339,6 +351,12 @@ public class ElanUtils {
         return read(broker, LogicalDatastoreType.OPERATIONAL, macId);
     }
 
+    public Optional<MacEntry> getMacEntryForElanInstance(ReadTransaction tx, String elanName, PhysAddress physAddress)
+            throws ReadFailedException {
+        InstanceIdentifier<MacEntry> macId = getMacEntryOperationalDataPath(elanName, physAddress);
+        return tx.read(LogicalDatastoreType.OPERATIONAL, macId).checkedGet();
+    }
+
     public MacEntry getMacEntryFromElanMacId(InstanceIdentifier identifier) {
         Optional<MacEntry> existingInterfaceMacEntry = read(broker,
                 LogicalDatastoreType.OPERATIONAL, identifier);
@@ -857,24 +875,21 @@ public class ElanUtils {
 
     public static String getKnownDynamicmacFlowRef(short tableId, BigInteger dpId, long lporTag, String macAddress,
             long elanTag) {
-        return new StringBuffer().append(tableId).append(elanTag).append(dpId).append(lporTag).append(macAddress)
-                .toString();
+        return String.valueOf(tableId) + elanTag + dpId + lporTag + macAddress;
     }
 
     public static String getKnownDynamicmacFlowRef(short tableId, BigInteger dpId, BigInteger remoteDpId,
             String macAddress, long elanTag) {
-        return new StringBuffer().append(tableId).append(elanTag).append(dpId).append(remoteDpId).append(macAddress)
-                .toString();
+        return String.valueOf(tableId) + elanTag + dpId + remoteDpId + macAddress;
     }
 
     public static String getKnownDynamicmacFlowRef(short tableId, BigInteger dpId, String macAddress, long elanTag) {
-        return new StringBuffer().append(tableId).append(elanTag).append(dpId).append(macAddress).toString();
+        return String.valueOf(tableId) + elanTag + dpId + macAddress;
     }
 
     public static String getKnownDynamicmacFlowRef(short elanDmacTable, BigInteger dpId, String extDeviceNodeId,
             String dstMacAddress, long elanTag, boolean shFlag) {
-        return new StringBuffer().append(elanDmacTable).append(elanTag).append(dpId).append(extDeviceNodeId)
-                .append(dstMacAddress).append(shFlag).toString();
+        return String.valueOf(elanDmacTable) + elanTag + dpId + extDeviceNodeId + dstMacAddress + shFlag;
     }
 
     /**
@@ -1018,8 +1033,8 @@ public class ElanUtils {
 
     }
 
-    public void deleteMacFlows(ElanInstance elanInfo, InterfaceInfo interfaceInfo, MacEntry macEntry,
-            WriteTransaction deleteFlowGroupTx) {
+    public void deleteMacFlows(@Nullable ElanInstance elanInfo, @Nullable InterfaceInfo interfaceInfo,
+            MacEntry macEntry, WriteTransaction deleteFlowGroupTx) {
         if (elanInfo == null || interfaceInfo == null) {
             return;
         }
@@ -1104,8 +1119,6 @@ public class ElanUtils {
      * Updates the Elan information in the Operational DS. It also updates the
      * ElanInstance in the Config DS by setting the adquired elanTag.
      *
-     * @param broker
-     *            the broker
      * @param idManager
      *            the id manager
      * @param elanInstanceAdded
@@ -1117,7 +1130,7 @@ public class ElanUtils {
      *
      * @return the updated ELAN instance.
      */
-    public static ElanInstance updateOperationalDataStore(DataBroker broker, IdManagerService idManager,
+    public static ElanInstance updateOperationalDataStore(IdManagerService idManager,
             ElanInstance elanInstanceAdded, List<String> elanInterfaces, WriteTransaction tx) {
         String elanInstanceName = elanInstanceAdded.getElanInstanceName();
         Long elanTag = elanInstanceAdded.getElanTag();
@@ -1145,7 +1158,7 @@ public class ElanUtils {
             EtreeLeafTagName etreeLeafTagName = new EtreeLeafTagNameBuilder()
                     .setEtreeLeafTag(new EtreeLeafTag(etreeLeafTag)).build();
             elanTagNameBuilder.addAugmentation(EtreeLeafTagName.class, etreeLeafTagName);
-            addTheLeafTagAsElanTag(broker, elanInstanceName, etreeLeafTag, tx);
+            addTheLeafTagAsElanTag(elanInstanceName, etreeLeafTag, tx);
         }
         ElanTagName elanTagName = elanTagNameBuilder.build();
 
@@ -1172,8 +1185,7 @@ public class ElanUtils {
         return elanInstanceWithTag;
     }
 
-    private static void addTheLeafTagAsElanTag(DataBroker broker, String elanInstanceName, long etreeLeafTag,
-            WriteTransaction tx) {
+    private static void addTheLeafTagAsElanTag(String elanInstanceName, long etreeLeafTag, WriteTransaction tx) {
         ElanTagName etreeTagAsElanTag = new ElanTagNameBuilder().setElanTag(etreeLeafTag)
                 .setKey(new ElanTagNameKey(etreeLeafTag)).setName(elanInstanceName).build();
         tx.put(LogicalDatastoreType.OPERATIONAL,
@@ -1375,7 +1387,7 @@ public class ElanUtils {
         return futures;
     }
 
-    public static boolean isVxlan(ElanInstance elanInstance) {
+    public static boolean isVxlan(@Nullable ElanInstance elanInstance) {
         return elanInstance != null && elanInstance.getSegmentType() != null
                 && elanInstance.getSegmentType().isAssignableFrom(SegmentTypeVxlan.class)
                 && elanInstance.getSegmentationId() != null && elanInstance.getSegmentationId() != 0;
@@ -1437,10 +1449,10 @@ public class ElanUtils {
     public void addDmacRedirectToDispatcherFlows(Long elanTag, String displayName,
             String macAddress, List<BigInteger> dpnIds) {
         for (BigInteger dpId : dpnIds) {
-            WriteTransaction writeTx = broker.newWriteOnlyTransaction();
-            mdsalManager.addFlowToTx(buildDmacRedirectToDispatcherFlow(dpId, macAddress, displayName, elanTag),
-                    writeTx);
-            writeTx.submit();
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+                tx -> mdsalManager.addFlowToTx(
+                        buildDmacRedirectToDispatcherFlow(dpId, macAddress, displayName, elanTag), tx)), LOG,
+                "Error adding DMAC redirect to dispatcher flows");
         }
     }
 
@@ -1462,10 +1474,9 @@ public class ElanUtils {
 
         instructions.add(new InstructionApplyActions(actions));
         String flowId = getKnownDynamicmacFlowRef(NwConstants.ELAN_DMAC_TABLE, dpId, dstMacAddress, elanTag);
-        FlowEntity flow  = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_DMAC_TABLE, flowId, 20, displayName, 0, 0,
+        return MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_DMAC_TABLE, flowId, 20, displayName, 0, 0,
                 ElanConstants.COOKIE_ELAN_KNOWN_DMAC.add(BigInteger.valueOf(elanTag)),
                 matches, instructions);
-        return flow;
     }
 
     public String getExternalElanInterface(String elanInstanceName, BigInteger dpnId) {
index 117d012b361e155052794b2c06ec5668d81b2aaa..aa3049372b2c9a64a5d29192e81f5b8621ad3dc7 100644 (file)
@@ -21,15 +21,18 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
-import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
 import org.opendaylight.netvirt.elan.internal.ElanBridgeManager;
 import org.opendaylight.netvirt.elanmanager.api.IElanService;
@@ -59,7 +62,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.transp
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.config.rev150710.ElanConfig;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.interfaces.ElanInterface;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data.vpn.instance.op.data.entry.vpn.to.dpn.list.VpnInterfaces;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
@@ -74,8 +76,7 @@ public class TransportZoneNotificationUtil {
     private static final char IP_NETWORK_ZONE_NAME_DELIMITER = '-';
     private static final String ALL_SUBNETS_GW = "0.0.0.0";
     private static final String ALL_SUBNETS = "0.0.0.0/0";
-    private final DataBroker dataBroker;
-    private final MdsalUtils mdsalUtils;
+    private final ManagedNewTransactionRunner txRunner;
     private final SouthboundUtils southBoundUtils;
     private final IElanService elanService;
     private final ElanConfig elanConfig;
@@ -83,16 +84,15 @@ public class TransportZoneNotificationUtil {
     private final ElanInstanceCache elanInstanceCache;
 
     @Inject
-    public TransportZoneNotificationUtil(final DataBroker dbx, final IInterfaceManager interfaceManager,
+    public TransportZoneNotificationUtil(final DataBroker dbx,
             final IElanService elanService, final ElanConfig elanConfig, final ElanBridgeManager elanBridgeManager,
             final ElanInstanceCache elanInstanceCache) {
-        this.dataBroker = dbx;
-        this.mdsalUtils = new MdsalUtils(dbx);
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dbx);
         this.elanService = elanService;
         this.elanConfig = elanConfig;
         this.elanBridgeManager = elanBridgeManager;
         this.elanInstanceCache = elanInstanceCache;
-        southBoundUtils = new SouthboundUtils(mdsalUtils);
+        southBoundUtils = new SouthboundUtils(new MdsalUtils(dbx));
     }
 
     public boolean shouldCreateVtep(List<VpnInterfaces> vpnInterfaces) {
@@ -126,46 +126,40 @@ public class TransportZoneNotificationUtil {
         return tzb.build();
     }
 
-    private void updateTransportZone(TransportZone zone, BigInteger dpnId, WriteTransaction tx)
-            throws TransactionCommitFailedException {
+    private void updateTransportZone(TransportZone zone, BigInteger dpnId, @Nonnull WriteTransaction tx) {
         InstanceIdentifier<TransportZone> path = InstanceIdentifier.builder(TransportZones.class)
                 .child(TransportZone.class, new TransportZoneKey(zone.getZoneName())).build();
 
-        if (tx == null) {
-            SingleTransactionDataBroker.syncUpdate(dataBroker, LogicalDatastoreType.CONFIGURATION, path, zone);
-        } else {
-            tx.merge(LogicalDatastoreType.CONFIGURATION, path, zone);
-        }
+        tx.merge(LogicalDatastoreType.CONFIGURATION, path, zone);
         LOG.info("Transport zone {} updated due to dpn {} handling.", zone.getZoneName(), dpnId);
     }
 
     public void updateTransportZone(String zoneNamePrefix, BigInteger dpnId) {
-        Map<String, String> localIps = getDpnLocalIps(dpnId);
-        if (localIps != null && !localIps.isEmpty()) {
-            LOG.debug("Will use local_ips for transport zone update for dpn {} and zone name prefix {}", dpnId,
-                    zoneNamePrefix);
-            for (Entry<String, String> entry : localIps.entrySet()) {
-                String localIp = entry.getKey();
-                String underlayNetworkName = entry.getValue();
-                String zoneName = getTzNameForUnderlayNetwork(zoneNamePrefix, underlayNetworkName);
-                updateTransportZone(zoneName, dpnId, localIp);
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            Map<String, String> localIps = getDpnLocalIps(dpnId, tx);
+            if (!localIps.isEmpty()) {
+                LOG.debug("Will use local_ips for transport zone update for dpn {} and zone name prefix {}", dpnId,
+                        zoneNamePrefix);
+                for (Entry<String, String> entry : localIps.entrySet()) {
+                    String localIp = entry.getKey();
+                    String underlayNetworkName = entry.getValue();
+                    String zoneName = getTzNameForUnderlayNetwork(zoneNamePrefix, underlayNetworkName);
+                    updateTransportZone(zoneName, dpnId, localIp, tx);
+                }
+            } else {
+                updateTransportZone(zoneNamePrefix, dpnId, getDpnLocalIp(dpnId, tx), tx);
             }
-        } else {
-            updateTransportZone(zoneNamePrefix, dpnId, getDpnLocalIp(dpnId));
-        }
-    }
-
-    private void updateTransportZone(String zoneName, BigInteger dpnId, String localIp) {
-        updateTransportZone(zoneName, dpnId, localIp, null);
+        }), LOG, "Error updating transport zone");
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void updateTransportZone(String zoneName, BigInteger dpnId, String localIp, WriteTransaction tx) {
+    private void updateTransportZone(String zoneName, BigInteger dpnId, @Nullable String localIp,
+            @Nonnull ReadWriteTransaction tx) throws ReadFailedException {
         InstanceIdentifier<TransportZone> inst = InstanceIdentifier.create(TransportZones.class)
                 .child(TransportZone.class, new TransportZoneKey(zoneName));
 
         // FIXME: Read this through a cache
-        TransportZone zone = mdsalUtils.read(LogicalDatastoreType.CONFIGURATION, inst);
+        TransportZone zone = tx.read(LogicalDatastoreType.CONFIGURATION, inst).checkedGet().orNull();
 
         if (zone == null) {
             zone = createZone(ALL_SUBNETS, zoneName);
@@ -180,46 +174,37 @@ public class TransportZoneNotificationUtil {
         }
     }
 
-    private void deleteTransportZone(TransportZone zone, BigInteger dpnId, WriteTransaction tx)
-            throws TransactionCommitFailedException {
+    private void deleteTransportZone(TransportZone zone, BigInteger dpnId, @Nonnull WriteTransaction tx) {
         InstanceIdentifier<TransportZone> path = InstanceIdentifier.builder(TransportZones.class)
                 .child(TransportZone.class, new TransportZoneKey(zone.getZoneName())).build();
-
-        if (tx == null) {
-            SingleTransactionDataBroker.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, path);
-        } else {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, path);
-        }
+        tx.delete(LogicalDatastoreType.CONFIGURATION, path);
         LOG.info("Transport zone {} deleted due to dpn {} handling.", zone.getZoneName(), dpnId);
     }
 
     public void deleteTransportZone(String zoneNamePrefix, BigInteger dpnId) {
-        Map<String, String> localIps = getDpnLocalIps(dpnId);
-        if (localIps != null && !localIps.isEmpty()) {
-            LOG.debug("Will use local_ips for transport zone delete for dpn {} and zone name prefix {}", dpnId,
-                    zoneNamePrefix);
-            for (Entry<String, String> entry : localIps.entrySet()) {
-                String localIp = entry.getKey();
-                String underlayNetworkName = entry.getValue();
-                String zoneName = getTzNameForUnderlayNetwork(zoneNamePrefix, underlayNetworkName);
-                deleteTransportZone(zoneName, dpnId, localIp);
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            Map<String, String> localIps = getDpnLocalIps(dpnId, tx);
+            if (!localIps.isEmpty()) {
+                LOG.debug("Will use local_ips for transport zone delete for dpn {} and zone name prefix {}", dpnId,
+                        zoneNamePrefix);
+                for (String underlayNetworkName : localIps.values()) {
+                    String zoneName = getTzNameForUnderlayNetwork(zoneNamePrefix, underlayNetworkName);
+                    deleteTransportZone(zoneName, dpnId, tx);
+                }
+            } else {
+                deleteTransportZone(zoneNamePrefix, dpnId, tx);
             }
-        } else {
-            deleteTransportZone(zoneNamePrefix, dpnId, getDpnLocalIp(dpnId));
-        }
-    }
-
-    private void deleteTransportZone(String zoneName, BigInteger dpnId, String localIp) {
-        deleteTransportZone(zoneName, dpnId, localIp, null);
+        }), LOG, "Error deleting transport zone");
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void deleteTransportZone(String zoneName, BigInteger dpnId, String localIp, WriteTransaction tx) {
+    private void deleteTransportZone(String zoneName, BigInteger dpnId, @Nonnull ReadWriteTransaction tx)
+            throws ReadFailedException {
         InstanceIdentifier<TransportZone> inst = InstanceIdentifier.create(TransportZones.class)
                 .child(TransportZone.class, new TransportZoneKey(zoneName));
 
         // FIXME: Read this through a cache
-        TransportZone zone = mdsalUtils.read(LogicalDatastoreType.CONFIGURATION, inst);
+        TransportZone zone = tx.read(LogicalDatastoreType.CONFIGURATION, inst).checkedGet().orNull();
         if (zone != null) {
             try {
                 deleteTransportZone(zone, dpnId, tx);
@@ -267,54 +252,61 @@ public class TransportZoneNotificationUtil {
             return;
         }
 
-        BigInteger dpId = dpIdOpt.get();
-        Optional<DPNTEPsInfo> dpnTepsInfoOpt = getDpnTepsInfo(dpId);
-        if (!dpnTepsInfoOpt.isPresent()) {
-            LOG.debug("No DPNTEPsInfo found for DPN id {}", dpId);
-            return;
-        }
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+            BigInteger dpId = dpIdOpt.get();
+            Optional<DPNTEPsInfo> dpnTepsInfoOpt = getDpnTepsInfo(dpId, tx);
+            if (!dpnTepsInfoOpt.isPresent()) {
+                LOG.debug("No DPNTEPsInfo found for DPN id {}", dpId);
+                return;
+            }
 
-        List<TunnelEndPoints> tunnelEndPoints = dpnTepsInfoOpt.get().getTunnelEndPoints();
-        if (tunnelEndPoints == null || tunnelEndPoints.isEmpty()) {
-            LOG.debug("No tunnel endpoints defined for DPN id {}", dpId);
-            return;
-        }
+            List<TunnelEndPoints> tunnelEndPoints = dpnTepsInfoOpt.get().getTunnelEndPoints();
+            if (tunnelEndPoints == null || tunnelEndPoints.isEmpty()) {
+                LOG.debug("No tunnel endpoints defined for DPN id {}", dpId);
+                return;
+            }
 
-        Set<String> zonePrefixes = new HashSet<>();
-        Map<String, List<String>> tepTzMap = tunnelEndPoints.stream().collect(Collectors
-                .toMap(tep -> String.valueOf(tep.getIpAddress().getValue()), this::getTepTransportZoneNames));
-        LOG.trace("Transport zone prefixes {}", tepTzMap);
+            Set<String> zonePrefixes = new HashSet<>();
+            Map<String, List<String>> tepTzMap = tunnelEndPoints.stream().collect(Collectors
+                    .toMap(tep -> String.valueOf(tep.getIpAddress().getValue()), this::getTepTransportZoneNames));
+            LOG.trace("Transport zone prefixes {}", tepTzMap);
 
-        WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-        handleRemovedLocalIps(mapDiff.entriesOnlyOnLeft(), dpId, zonePrefixes, tepTzMap, tx);
-        handleChangedLocalIps(mapDiff.entriesDiffering(), dpId, zonePrefixes, tepTzMap, tx);
-        handleAddedLocalIps(mapDiff.entriesOnlyOnRight(), dpId, zonePrefixes, tx);
-        tx.submit();
+            handleRemovedLocalIps(mapDiff.entriesOnlyOnLeft(), dpId, zonePrefixes, tepTzMap, tx);
+            handleChangedLocalIps(mapDiff.entriesDiffering(), dpId, zonePrefixes, tepTzMap, tx);
+            handleAddedLocalIps(mapDiff.entriesOnlyOnRight(), dpId, zonePrefixes, tx);
+        }), LOG, "Error handling OVSDB node update");
     }
 
     private void handleAddedLocalIps(Map<String, String> addedEntries, BigInteger dpId, Set<String> zonePrefixes,
-            WriteTransaction tx) {
+            ReadWriteTransaction tx) throws ReadFailedException {
         if (addedEntries == null || addedEntries.isEmpty()) {
             LOG.trace("No added local_ips found for DPN {}", dpId);
             return;
         }
 
         LOG.debug("Added local_ips {} on DPN {}", addedEntries.keySet(), dpId);
-        addedEntries.forEach((ipAddress, underlayNetworkName) -> zonePrefixes.forEach(zonePrefix -> {
-            String zoneName = getTzNameForUnderlayNetwork(zonePrefix, underlayNetworkName);
-            updateTransportZone(zoneName, dpId, ipAddress, tx);
-        }));
+        for (Map.Entry<String, String> addedEntry : addedEntries.entrySet()) {
+            String ipAddress = addedEntry.getKey();
+            String underlayNetworkName = addedEntry.getValue();
+            for (String zonePrefix : zonePrefixes) {
+                String zoneName = getTzNameForUnderlayNetwork(zonePrefix, underlayNetworkName);
+                updateTransportZone(zoneName, dpId, ipAddress, tx);
+            }
+        }
     }
 
     private void handleChangedLocalIps(Map<String, ValueDifference<String>> changedEntries, BigInteger dpId,
-            Set<String> zonePrefixes, Map<String, List<String>> tepTzMap, WriteTransaction tx) {
+            Set<String> zonePrefixes, Map<String, List<String>> tepTzMap, @Nonnull ReadWriteTransaction tx)
+            throws ReadFailedException {
         if (changedEntries == null || changedEntries.isEmpty()) {
             LOG.trace("No changed local_ips found for DPN {}", dpId);
             return;
         }
 
         LOG.debug("Changing underlay network mapping for local_ips {} on DPN {}", changedEntries.keySet(), dpId);
-        changedEntries.forEach((ipAddress, underlayNetworkDiff) -> {
+        for (Map.Entry<String, ValueDifference<String>> changedEntry : changedEntries.entrySet()) {
+            String ipAddress = changedEntry.getKey();
+            ValueDifference<String> underlayNetworkDiff = changedEntry.getValue();
             List<String> zoneNames = tepTzMap.get(ipAddress);
             if (zoneNames != null) {
                 for (String zoneName : zoneNames) {
@@ -323,18 +315,18 @@ public class TransportZoneNotificationUtil {
                     Optional<String> zonePrefixOpt = getZonePrefixForUnderlayNetwork(zoneName, removedUnderlayNetwork);
                     if (zonePrefixOpt.isPresent()) {
                         String zonePrefix = zonePrefixOpt.get();
-                        removeVtep(zoneName, dpId, ipAddress, tx);
+                        removeVtep(zoneName, dpId, tx);
                         zonePrefixes.add(zonePrefix);
                         String newZoneName = getTzNameForUnderlayNetwork(zonePrefix, addedUnderlayNetwork);
                         updateTransportZone(newZoneName, dpId, ipAddress, tx);
                     }
                 }
             }
-        });
+        }
     }
 
     private void handleRemovedLocalIps(Map<String, String> removedEntries, BigInteger dpId, Set<String> zonePrefixes,
-            Map<String, List<String>> tepTzMap, WriteTransaction tx) {
+            Map<String, List<String>> tepTzMap, @Nonnull WriteTransaction tx) {
         if (removedEntries == null || removedEntries.isEmpty()) {
             LOG.trace("No removed local_ips found on DPN {}", dpId);
             return;
@@ -347,7 +339,7 @@ public class TransportZoneNotificationUtil {
                 for (String zoneName : zoneNames) {
                     Optional<String> zonePrefix = getZonePrefixForUnderlayNetwork(zoneName, underlayNetworkName);
                     if (zonePrefix.isPresent()) {
-                        removeVtep(zoneName, dpId, ipAddress, tx);
+                        removeVtep(zoneName, dpId, tx);
                         zonePrefixes.add(zonePrefix.get());
                     }
                 }
@@ -366,12 +358,11 @@ public class TransportZoneNotificationUtil {
                 .collect(Collectors.toList());
     }
 
-    private Optional<DPNTEPsInfo> getDpnTepsInfo(BigInteger dpId) {
+    private Optional<DPNTEPsInfo> getDpnTepsInfo(BigInteger dpId, ReadTransaction tx) {
         InstanceIdentifier<DPNTEPsInfo> identifier = InstanceIdentifier.builder(DpnEndpoints.class)
                 .child(DPNTEPsInfo.class, new DPNTEPsInfoKey(dpId)).build();
         try {
-            return SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.CONFIGURATION,
-                    identifier);
+            return tx.read(LogicalDatastoreType.CONFIGURATION, identifier).checkedGet();
         } catch (ReadFailedException e) {
             LOG.warn("Failed to read DPNTEPsInfo for DPN id {}", dpId);
             return Optional.absent();
@@ -383,7 +374,7 @@ public class TransportZoneNotificationUtil {
      *
      * @return Whether a vtep was added or not.
      */
-    private boolean addVtep(TransportZone zone, String subnetIp, BigInteger dpnId, String localIp) {
+    private boolean addVtep(TransportZone zone, String subnetIp, BigInteger dpnId, @Nullable String localIp) {
         List<Subnets> zoneSubnets = zone.getSubnets();
         if (zoneSubnets == null) {
             return false;
@@ -407,20 +398,12 @@ public class TransportZoneNotificationUtil {
         return false;
     }
 
-    private void removeVtep(String zoneName, BigInteger dpId, String localIp, WriteTransaction tx) {
+    private void removeVtep(String zoneName, BigInteger dpId, @Nonnull WriteTransaction tx) {
         InstanceIdentifier<Vteps> path = InstanceIdentifier.builder(TransportZones.class)
                 .child(TransportZone.class, new TransportZoneKey(zoneName))
                 .child(Subnets.class, new SubnetsKey(new IpPrefix(ALL_SUBNETS.toCharArray())))
                 .child(Vteps.class, new VtepsKey(dpId, TUNNEL_PORT)).build();
-        if (tx == null) {
-            try {
-                SingleTransactionDataBroker.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, path);
-            } catch (TransactionCommitFailedException e) {
-                LOG.error("Failed to remove VTEP {} from transport-zone {} for DPN {}", localIp, zoneName, dpId);
-            }
-        } else {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, path);
-        }
+        tx.delete(LogicalDatastoreType.CONFIGURATION, path);
     }
 
     // search for relevant subnets for the given subnetIP, add one if it is
@@ -444,12 +427,12 @@ public class TransportZoneNotificationUtil {
         SubnetsBuilder subnetsBuilder = new SubnetsBuilder().setDeviceVteps(new ArrayList<>())
                 .setGatewayIp(new IpAddress(ALL_SUBNETS_GW.toCharArray()))
                 .setKey(new SubnetsKey(new IpPrefix(subnetIp.toCharArray()))).setVlanId(0)
-                .setVteps(new ArrayList<Vteps>());
+                .setVteps(new ArrayList<>());
         return subnetsBuilder.build();
     }
 
-    private String getDpnLocalIp(BigInteger dpId) {
-        Optional<Node> node = getPortsNode(dpId);
+    private String getDpnLocalIp(BigInteger dpId, ReadTransaction tx) throws ReadFailedException {
+        Optional<Node> node = getPortsNode(dpId, tx);
 
         if (node.isPresent()) {
             String localIp = southBoundUtils.getOpenvswitchOtherConfig(node.get(), LOCAL_IP);
@@ -464,42 +447,36 @@ public class TransportZoneNotificationUtil {
         return null;
     }
 
-    private Map<String, String> getDpnLocalIps(BigInteger dpId) {
+    @Nonnull
+    private Map<String, String> getDpnLocalIps(BigInteger dpId, ReadTransaction tx) throws ReadFailedException {
         // Example of local IPs from other_config:
         // local_ips="10.0.43.159:MPLS,11.11.11.11:DSL,ip:underlay-network"
-        Optional<Node> node;
-        node = getPortsNode(dpId);
-        if (node.isPresent()) {
-            return elanBridgeManager.getOpenvswitchOtherConfigMap(node.get(), LOCAL_IPS);
-        }
-
-        return null;
+        return getPortsNode(dpId, tx).toJavaUtil().map(
+            node -> elanBridgeManager.getOpenvswitchOtherConfigMap(node, LOCAL_IPS)).orElse(Collections.emptyMap());
     }
 
     @SuppressWarnings("unchecked")
-    private Optional<Node> getPortsNode(BigInteger dpnId) {
+    private Optional<Node> getPortsNode(BigInteger dpnId, ReadTransaction tx) throws ReadFailedException {
         InstanceIdentifier<BridgeRefEntry> bridgeRefInfoPath = InstanceIdentifier.create(BridgeRefInfo.class)
                 .child(BridgeRefEntry.class, new BridgeRefEntryKey(dpnId));
 
         // FIXME: Read this through a cache
-        BridgeRefEntry bridgeRefEntry = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, bridgeRefInfoPath);
-        if (bridgeRefEntry == null) {
+        Optional<BridgeRefEntry> optionalBridgeRefEntry =
+                tx.read(LogicalDatastoreType.OPERATIONAL, bridgeRefInfoPath).checkedGet();
+        if (!optionalBridgeRefEntry.isPresent()) {
             LOG.error("no bridge ref entry found for dpnId: " + dpnId);
             return Optional.absent();
         }
 
-        InstanceIdentifier<Node> nodeId = ((InstanceIdentifier<OvsdbBridgeAugmentation>) bridgeRefEntry
-                .getBridgeReference().getValue()).firstIdentifierOf(Node.class);
+        InstanceIdentifier<Node> nodeId =
+                optionalBridgeRefEntry.get().getBridgeReference().getValue().firstIdentifierOf(Node.class);
 
         // FIXME: Read this through a cache
-        Node node = mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, nodeId);
-
-        if (node == null) {
+        Optional<Node> optionalNode = tx.read(LogicalDatastoreType.OPERATIONAL, nodeId).checkedGet();
+        if (!optionalNode.isPresent()) {
             LOG.error("missing node for dpnId: " + dpnId);
-            return Optional.absent();
         }
-
-        return Optional.of(node);
+        return optionalNode;
     }
 
     private String getTzNameForUnderlayNetwork(String zoneNamePrefix, String underlayNetworkName) {