Use managed transactions in elanmanager-impl 72/71772/5
authorStephen Kitt <skitt@redhat.com>
Thu, 3 May 2018 17:27:12 +0000 (19:27 +0200)
committerSam Hague <shague@redhat.com>
Fri, 25 May 2018 17:13:26 +0000 (17:13 +0000)
This also enforces restrictions on newReadWriteTransaction and
newWriteOnlyTransaction calls, to prevent new code introducing
unmanaged transactions.

Change-Id: I36935c5f6195bccba98a8e885e38a0041fe88499
Signed-off-by: Stephen Kitt <skitt@redhat.com>
elanmanager/impl/pom.xml
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInstanceManager.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/internal/ElanInterfaceManager.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/ha/handlers/NodeConnectedHandler.java
elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/utils/ElanUtils.java

index d056ca09fd9f7238aeb0a38311587fad68937ab0..32196777007ee058363ae0de3f95bf568d754e77 100644 (file)
@@ -11,9 +11,9 @@ this distribution, and is available at http://www.eclipse.org/legal/epl-v10.html
 
     <parent>
         <groupId>org.opendaylight.netvirt</groupId>
-        <artifactId>binding-parent</artifactId>
+        <artifactId>managed-tx-parent</artifactId>
         <version>0.7.0-SNAPSHOT</version>
-        <relativePath>../../commons/binding-parent</relativePath>
+        <relativePath>../../commons/managed-tx-parent</relativePath>
     </parent>
 
     <artifactId>elanmanager-impl</artifactId>
index 2c1d9e64412a52c6c125418d17f4c4ca395895e3..9f74fb8ac914182a79089a702aae6af248c3497a 100644 (file)
@@ -99,29 +99,27 @@ public class ElanInstanceManager extends AsyncDataTreeChangeListenerBase<ElanIns
                         InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(elanInterfaceName);
                         futureList.addAll(elanInterfaceManager.removeElanInterface(deletedElan, elanInterfaceName,
                                 interfaceInfo));
-                        ElanUtils.delete(broker, LogicalDatastoreType.CONFIGURATION,
-                                elanInterfaceId);
+                        ElanUtils.delete(txRunner, LogicalDatastoreType.CONFIGURATION, elanInterfaceId);
                         return futureList;
                     },ElanConstants.JOB_MAX_RETRIES);
                 });
             }
-            ElanUtils.delete(broker, LogicalDatastoreType.OPERATIONAL,
+            ElanUtils.delete(txRunner, LogicalDatastoreType.OPERATIONAL,
                     ElanUtils.getElanInstanceOperationalDataPath(elanName));
             Optional<ElanDpnInterfacesList> elanDpnInterfaceList = MDSALUtil.read(broker,
                     LogicalDatastoreType.OPERATIONAL,
                     ElanUtils.getElanDpnOperationDataPath(elanName));
             if (elanDpnInterfaceList.isPresent()) {
-                ElanUtils.delete(broker, LogicalDatastoreType.OPERATIONAL,
-                        getElanDpnOperationDataPath(elanName));
+                ElanUtils.delete(txRunner, LogicalDatastoreType.OPERATIONAL, getElanDpnOperationDataPath(elanName));
             }
-            ElanUtils.delete(broker, LogicalDatastoreType.OPERATIONAL,
+            ElanUtils.delete(txRunner, LogicalDatastoreType.OPERATIONAL,
                     ElanUtils.getElanInfoEntriesOperationalDataPath(elanTag));
         }
         elanInterfaceCache.getInterfaceNames(elanName).forEach(
             elanInterfaceName -> jobCoordinator.enqueueJob(ElanUtils.getElanInterfaceJobKey(elanInterfaceName),
                 () -> Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
                     LOG.info("Deleting the elanInterface present under ConfigDS:{}", elanInterfaceName);
-                    ElanUtils.delete(broker, LogicalDatastoreType.CONFIGURATION,
+                    ElanUtils.delete(txRunner, LogicalDatastoreType.CONFIGURATION,
                             ElanUtils.getElanInterfaceConfigurationDataPathId(elanInterfaceName));
                     elanInterfaceManager.unbindService(elanInterfaceName, tx);
                     LOG.info("unbind the Interface:{} service bounded to Elan:{}", elanInterfaceName, elanName);
@@ -138,7 +136,7 @@ public class ElanInstanceManager extends AsyncDataTreeChangeListenerBase<ElanIns
         ElanUtils.releaseId(idManager, ElanConstants.ELAN_ID_POOL_NAME,
                 deletedElan.getElanInstanceName() + ElanConstants.LEAVES_POSTFIX);
 
-        ElanUtils.delete(broker, LogicalDatastoreType.OPERATIONAL,
+        ElanUtils.delete(txRunner, LogicalDatastoreType.OPERATIONAL,
                 ElanUtils.getElanInfoEntriesOperationalDataPath(
                         deletedElan.getAugmentation(EtreeInstance.class).getEtreeLeafTagVal().getValue()));
     }
index 9780285bafe9818ab4bacc79763330e3d910c8b8..0049e8f2d5fd1f217ab9d31993e0cc1ccfb8cddc 100644 (file)
@@ -24,6 +24,8 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -231,67 +233,75 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         jobCoordinator.enqueueJob(elanInstanceName, configWorker, ElanConstants.JOB_MAX_RETRIES);
     }
 
+    /**
+     * DPN interface information holder, for removal (used to extract information from a lambda).
+     */
+    private static class DpnInterfaceInfoHolderForRemoval {
+        private BigInteger dpId;
+        public boolean lastElanInterface;
+        public boolean lastInterfaceOnDpn;
+    }
+
     public List<ListenableFuture<Void>> removeElanInterface(ElanInstance elanInfo, String interfaceName,
             InterfaceInfo interfaceInfo) {
+        DpnInterfaceInfoHolderForRemoval dpnInterfaceInfoHolder = new DpnInterfaceInfoHolderForRemoval();
         String elanName = elanInfo.getElanInstanceName();
-        boolean isLastElanInterface = false;
-        boolean isLastInterfaceOnDpn = false;
-        BigInteger dpId = null;
         long elanTag = elanInfo.getElanTag();
         // We use two transaction so we don't suffer on multiple shards (interfaces and flows)
-        WriteTransaction interfaceTx = broker.newWriteOnlyTransaction();
-        Elan elanState = removeElanStateForInterface(elanInfo, interfaceName, interfaceTx);
-        if (elanState == null) {
-            interfaceTx.cancel();
-            return Collections.emptyList();
-        }
-        WriteTransaction flowTx = broker.newWriteOnlyTransaction();
-        List<String> elanInterfaces = elanState.getElanInterfaces();
-        if (elanInterfaces.isEmpty()) {
-            isLastElanInterface = true;
-        }
-        if (interfaceInfo != null) {
-            dpId = interfaceInfo.getDpId();
-            DpnInterfaces dpnInterfaces = removeElanDpnInterfaceFromOperationalDataStore(elanName, dpId,
-                    interfaceName, elanTag, interfaceTx);
-            /*
-             * If there are not elan ports, remove the unknown dmac, terminating
-             * service table flows, remote/local bc group
-             */
-            if (dpnInterfaces == null || dpnInterfaces.getInterfaces() == null
-                    || dpnInterfaces.getInterfaces().isEmpty()) {
-                // No more Elan Interfaces in this DPN
-                LOG.debug("deleting the elan: {} present on dpId: {}", elanInfo.getElanInstanceName(), dpId);
-                if (!elanUtils.isOpenstackVniSemanticsEnforced()) {
-                    removeDefaultTermFlow(dpId, elanInfo.getElanTag());
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        futures.add(waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(interfaceTx -> {
+            Elan elanState = removeElanStateForInterface(elanInfo, interfaceName, interfaceTx);
+            if (elanState == null) {
+                return;
+            }
+            futures.add(waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(flowTx -> {
+                List<String> elanInterfaces = elanState.getElanInterfaces();
+                if (elanInterfaces.isEmpty()) {
+                    dpnInterfaceInfoHolder.lastElanInterface = true;
                 }
-                removeUnknownDmacFlow(dpId, elanInfo, flowTx, elanInfo.getElanTag());
-                removeEtreeUnknownDmacFlow(dpId, elanInfo, flowTx);
-                removeElanBroadcastGroup(elanInfo, interfaceInfo, flowTx);
-                removeLocalBroadcastGroup(elanInfo, interfaceInfo, flowTx);
-                removeEtreeBroadcastGrups(elanInfo, interfaceInfo, flowTx);
-                if (isVxlanNetworkOrVxlanSegment(elanInfo)) {
-                    if (elanUtils.isOpenstackVniSemanticsEnforced()) {
-                        elanUtils.removeTerminatingServiceAction(dpId,
-                                elanUtils.getVxlanSegmentationId(elanInfo).intValue());
+                if (interfaceInfo != null) {
+                    dpnInterfaceInfoHolder.dpId = interfaceInfo.getDpId();
+                    DpnInterfaces dpnInterfaces =
+                            removeElanDpnInterfaceFromOperationalDataStore(elanName, dpnInterfaceInfoHolder.dpId,
+                                    interfaceName, elanTag, interfaceTx);
+                    /*
+                     * If there are not elan ports, remove the unknown dmac, terminating
+                     * service table flows, remote/local bc group
+                     */
+                    if (dpnInterfaces == null || dpnInterfaces.getInterfaces() == null
+                            || dpnInterfaces.getInterfaces().isEmpty()) {
+                        // No more Elan Interfaces in this DPN
+                        LOG.debug("deleting the elan: {} present on dpId: {}", elanInfo.getElanInstanceName(),
+                                dpnInterfaceInfoHolder.dpId);
+                        if (!elanUtils.isOpenstackVniSemanticsEnforced()) {
+                            removeDefaultTermFlow(dpnInterfaceInfoHolder.dpId, elanInfo.getElanTag());
+                        }
+                        removeUnknownDmacFlow(dpnInterfaceInfoHolder.dpId, elanInfo, flowTx, elanInfo.getElanTag());
+                        removeEtreeUnknownDmacFlow(dpnInterfaceInfoHolder.dpId, elanInfo, flowTx);
+                        removeElanBroadcastGroup(elanInfo, interfaceInfo, flowTx);
+                        removeLocalBroadcastGroup(elanInfo, interfaceInfo, flowTx);
+                        removeEtreeBroadcastGrups(elanInfo, interfaceInfo, flowTx);
+                        if (isVxlanNetworkOrVxlanSegment(elanInfo)) {
+                            if (elanUtils.isOpenstackVniSemanticsEnforced()) {
+                                elanUtils.removeTerminatingServiceAction(dpnInterfaceInfoHolder.dpId,
+                                        ElanUtils.getVxlanSegmentationId(elanInfo).intValue());
+                            }
+                            unsetExternalTunnelTable(dpnInterfaceInfoHolder.dpId, elanInfo);
+                        }
+                        dpnInterfaceInfoHolder.lastInterfaceOnDpn = true;
+                    } else {
+                        setupLocalBroadcastGroups(elanInfo, dpnInterfaces, interfaceInfo);
                     }
-                    unsetExternalTunnelTable(dpId, elanInfo);
                 }
-                isLastInterfaceOnDpn = true;
-            } else {
-                setupLocalBroadcastGroups(elanInfo, dpnInterfaces, interfaceInfo);
-            }
-        }
-
-        List<ListenableFuture<Void>> futures = new ArrayList<>();
-        futures.add(ElanUtils.waitForTransactionToComplete(interfaceTx));
-        futures.add(ElanUtils.waitForTransactionToComplete(flowTx));
+            })));
+        })));
 
-        if (isLastInterfaceOnDpn && dpId != null && isVxlanNetworkOrVxlanSegment(elanInfo)) {
-            setElanAndEtreeBCGrouponOtherDpns(elanInfo, dpId);
+        if (dpnInterfaceInfoHolder.lastInterfaceOnDpn && dpnInterfaceInfoHolder.dpId != null
+                && isVxlanNetworkOrVxlanSegment(elanInfo)) {
+            setElanAndEtreeBCGrouponOtherDpns(elanInfo, dpnInterfaceInfoHolder.dpId);
         }
         InterfaceRemoveWorkerOnElanInterface removeInterfaceWorker = new InterfaceRemoveWorkerOnElanInterface(
-                interfaceName, elanInfo, interfaceInfo, this, isLastElanInterface);
+                interfaceName, elanInfo, interfaceInfo, this, dpnInterfaceInfoHolder.lastElanInterface);
         jobCoordinator.enqueueJob(ElanUtils.getElanInterfaceJobKey(interfaceName), removeInterfaceWorker,
                 ElanConstants.JOB_MAX_RETRIES);
 
@@ -543,17 +553,19 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                     staticMacEntry.getMacAddress());
             Optional<MacEntry> existingMacEntry = ElanUtils.read(broker,
                     LogicalDatastoreType.OPERATIONAL, macEntryIdentifier);
-            WriteTransaction tx = broker.newWriteOnlyTransaction();
-            if (existingMacEntry.isPresent()) {
-                elanForwardingEntriesHandler.updateElanInterfaceForwardingTablesList(
-                        elanName, interfaceName, existingMacEntry.get().getInterface(), existingMacEntry.get(),
-                        tx);
-            } else {
-                elanForwardingEntriesHandler.addElanInterfaceForwardingTableList(
-                        elanName, interfaceName, staticMacEntry, tx);
-            }
-            ListenableFutures.addErrorLogging(ElanUtils.waitForTransactionToComplete(tx), LOG,
-                    "Error in update: identifier={}, original={}, update={}", identifier, original, update);
+            ListenableFutures.addErrorLogging(
+                    waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                        if (existingMacEntry.isPresent()) {
+                            elanForwardingEntriesHandler.updateElanInterfaceForwardingTablesList(
+                                    elanName, interfaceName, existingMacEntry.get().getInterface(),
+                                    existingMacEntry.get(),
+                                    tx);
+                        } else {
+                            elanForwardingEntriesHandler.addElanInterfaceForwardingTableList(
+                                    elanName, interfaceName, staticMacEntry, tx);
+                        }
+                    })), LOG, "Error updating ELAN interface {} (original: {}; update: {})", identifier, original,
+                    update);
         }
     }
 
@@ -649,8 +661,17 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         }
     }
 
+    /**
+     * DPN interface information holder, for addition (used to extract information from a lambda).
+     */
+    private static class DpnInterfaceInfoHolderForAddition {
+        private boolean firstInterfaceInDpn;
+        private DpnInterfaces dpnInterfaces;
+        private BigInteger dpId;
+    }
+
     List<ListenableFuture<Void>> addElanInterface(ElanInterface elanInterface,
-            InterfaceInfo interfaceInfo, ElanInstance elanInstance) throws ElanException {
+            InterfaceInfo interfaceInfo, ElanInstance elanInstance) {
         Preconditions.checkNotNull(elanInstance, "elanInstance cannot be null");
         Preconditions.checkNotNull(interfaceInfo, "interfaceInfo cannot be null");
         Preconditions.checkNotNull(elanInterface, "elanInterface cannot be null");
@@ -659,145 +680,165 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         String elanInstanceName = elanInterface.getElanInstanceName();
 
         Elan elanInfo = ElanUtils.getElanByName(broker, elanInstanceName);
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        if (elanInfo == null) {
-            List<String> elanInterfaces = new ArrayList<>();
-            elanInterfaces.add(interfaceName);
-            ElanUtils.updateOperationalDataStore(idManager, elanInstance, elanInterfaces, tx);
-        } else {
-            createElanStateList(elanInstanceName, interfaceName, tx);
-        }
-        boolean isFirstInterfaceInDpn = false;
-        // Specific actions to the DPN where the ElanInterface has been added,
-        // for example, programming the
-        // External tunnel table if needed or adding the ElanInterface to the
-        // DpnInterfaces in the operational DS.
-        BigInteger dpId = interfaceInfo.getDpId();
-        DpnInterfaces dpnInterfaces = null;
-        if (dpId != null && !dpId.equals(ElanConstants.INVALID_DPN)) {
-            InstanceIdentifier<DpnInterfaces> elanDpnInterfaces = ElanUtils
-                    .getElanDpnInterfaceOperationalDataPath(elanInstanceName, dpId);
-            Optional<DpnInterfaces> existingElanDpnInterfaces = ElanUtils.read(broker,
-                    LogicalDatastoreType.OPERATIONAL, elanDpnInterfaces);
-            if (ElanUtils.isVlan(elanInstance)) {
-                isFirstInterfaceInDpn =  checkIfFirstInterface(interfaceName,
-                        elanInstanceName, existingElanDpnInterfaces);
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        DpnInterfaceInfoHolderForAddition dpnInterfaceInfoHolder = new DpnInterfaceInfoHolderForAddition();
+        futures.add(waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            if (elanInfo == null) {
+                List<String> elanInterfaces = new ArrayList<>();
+                elanInterfaces.add(interfaceName);
+                ElanUtils.updateOperationalDataStore(idManager, elanInstance, elanInterfaces, tx);
             } else {
-                isFirstInterfaceInDpn = !existingElanDpnInterfaces.isPresent();
+                createElanStateList(elanInstanceName, interfaceName, tx);
             }
-            if (isFirstInterfaceInDpn) {
-                // ELAN's 1st ElanInterface added to this DPN
-                if (!existingElanDpnInterfaces.isPresent()) {
-                    dpnInterfaces = createElanInterfacesList(elanInstanceName, interfaceName, dpId, tx);
+            // Specific actions to the DPN where the ElanInterface has been added,
+            // for example, programming the
+            // External tunnel table if needed or adding the ElanInterface to the
+            // DpnInterfaces in the operational DS.
+            dpnInterfaceInfoHolder.dpId = interfaceInfo.getDpId();
+            if (dpnInterfaceInfoHolder.dpId != null && !dpnInterfaceInfoHolder.dpId.equals(ElanConstants.INVALID_DPN)) {
+                InstanceIdentifier<DpnInterfaces> elanDpnInterfaces = ElanUtils
+                        .getElanDpnInterfaceOperationalDataPath(elanInstanceName, dpnInterfaceInfoHolder.dpId);
+                Optional<DpnInterfaces> existingElanDpnInterfaces = ElanUtils.read(broker,
+                        LogicalDatastoreType.OPERATIONAL, elanDpnInterfaces);
+                if (ElanUtils.isVlan(elanInstance)) {
+                    dpnInterfaceInfoHolder.firstInterfaceInDpn = checkIfFirstInterface(interfaceName,
+                            elanInstanceName, existingElanDpnInterfaces);
+                } else {
+                    dpnInterfaceInfoHolder.firstInterfaceInDpn = !existingElanDpnInterfaces.isPresent();
+                }
+                if (dpnInterfaceInfoHolder.firstInterfaceInDpn) {
+                    // ELAN's 1st ElanInterface added to this DPN
+                    if (!existingElanDpnInterfaces.isPresent()) {
+                        dpnInterfaceInfoHolder.dpnInterfaces =
+                                createElanInterfacesList(elanInstanceName, interfaceName, dpnInterfaceInfoHolder.dpId,
+                                        tx);
+                    } else {
+                        List<String> elanInterfaces = existingElanDpnInterfaces.get().getInterfaces();
+                        elanInterfaces.add(interfaceName);
+                        dpnInterfaceInfoHolder.dpnInterfaces =
+                                updateElanDpnInterfacesList(elanInstanceName, dpnInterfaceInfoHolder.dpId,
+                                        elanInterfaces, tx);
+                    }
+                    // The 1st ElanInterface in a DPN must program the Ext Tunnel
+                    // table, but only if Elan has VNI
+                    if (isVxlanNetworkOrVxlanSegment(elanInstance)) {
+                        setExternalTunnelTable(dpnInterfaceInfoHolder.dpId, elanInstance);
+                    }
+                    elanL2GatewayUtils.installElanL2gwDevicesLocalMacsInDpn(dpnInterfaceInfoHolder.dpId, elanInstance,
+                            interfaceName);
                 } else {
                     List<String> elanInterfaces = existingElanDpnInterfaces.get().getInterfaces();
                     elanInterfaces.add(interfaceName);
-                    dpnInterfaces = updateElanDpnInterfacesList(elanInstanceName, dpId,
-                            elanInterfaces, tx);
-                }
-                // The 1st ElanInterface in a DPN must program the Ext Tunnel
-                // table, but only if Elan has VNI
-                if (isVxlanNetworkOrVxlanSegment(elanInstance)) {
-                    setExternalTunnelTable(dpId, elanInstance);
-                }
-                elanL2GatewayUtils.installElanL2gwDevicesLocalMacsInDpn(dpId, elanInstance, interfaceName);
-            } else {
-                List<String> elanInterfaces = existingElanDpnInterfaces.get().getInterfaces();
-                elanInterfaces.add(interfaceName);
-                if (elanInterfaces.size() == 1) { // 1st dpn interface
-                    elanL2GatewayUtils.installElanL2gwDevicesLocalMacsInDpn(dpId, elanInstance, interfaceName);
+                    if (elanInterfaces.size() == 1) { // 1st dpn interface
+                        elanL2GatewayUtils.installElanL2gwDevicesLocalMacsInDpn(dpnInterfaceInfoHolder.dpId,
+                                elanInstance, interfaceName);
+                    }
+                    dpnInterfaceInfoHolder.dpnInterfaces =
+                            updateElanDpnInterfacesList(elanInstanceName, dpnInterfaceInfoHolder.dpId, elanInterfaces,
+                                    tx);
                 }
-                dpnInterfaces = updateElanDpnInterfacesList(elanInstanceName, dpId, elanInterfaces, tx);
             }
-        }
 
-        // add code to install Local/Remote BC group, unknow DMAC entry,
-        // terminating service table flow entry
-        // call bindservice of interfacemanager to create ingress table flow
-        // enty.
-        // Add interface to the ElanInterfaceForwardingEntires Container
-        createElanInterfaceTablesList(interfaceName, tx);
-        List<ListenableFuture<Void>> futures = new ArrayList<>();
-        futures.add(ElanUtils.waitForTransactionToComplete(tx));
-        installEntriesForFirstInterfaceonDpn(elanInstance, interfaceInfo, dpnInterfaces, isFirstInterfaceInDpn);
+            // add code to install Local/Remote BC group, unknow DMAC entry,
+            // terminating service table flow entry
+            // call bindservice of interfacemanager to create ingress table flow
+            // enty.
+            // Add interface to the ElanInterfaceForwardingEntires Container
+            createElanInterfaceTablesList(interfaceName, tx);
+        })));
+        installEntriesForFirstInterfaceonDpn(elanInstance, interfaceInfo, dpnInterfaceInfoHolder.dpnInterfaces,
+                dpnInterfaceInfoHolder.firstInterfaceInDpn);
 
         // add the vlan provider interface to remote BC group for the elan
         // for internal vlan networks
         if (ElanUtils.isVlan(elanInstance) && !elanInstance.isExternal()) {
             if (interfaceManager.isExternalInterface(interfaceName)) {
                 LOG.debug("adding vlan prv intf {} to elan {} BC group", interfaceName, elanInstanceName);
-                handleExternalInterfaceEvent(elanInstance, dpnInterfaces, dpId);
+                handleExternalInterfaceEvent(elanInstance, dpnInterfaceInfoHolder.dpnInterfaces,
+                        dpnInterfaceInfoHolder.dpId);
             }
         }
 
-        if (isFirstInterfaceInDpn && isVxlanNetworkOrVxlanSegment(elanInstance)) {
+        if (dpnInterfaceInfoHolder.firstInterfaceInDpn && isVxlanNetworkOrVxlanSegment(elanInstance)) {
             //update the remote-DPNs remoteBC group entry with Tunnels
-            LOG.trace("update remote bc group for elan {} on other DPNs for newly added dpn {}", elanInstance, dpId);
-            setElanAndEtreeBCGrouponOtherDpns(elanInstance, dpId);
+            LOG.trace("update remote bc group for elan {} on other DPNs for newly added dpn {}", elanInstance,
+                    dpnInterfaceInfoHolder.dpId);
+            setElanAndEtreeBCGrouponOtherDpns(elanInstance, dpnInterfaceInfoHolder.dpId);
         }
 
         String jobKey = ElanUtils.getElanInterfaceJobKey(interfaceName);
         InterfaceAddWorkerOnElanInterface addWorker = new InterfaceAddWorkerOnElanInterface(jobKey,
-                elanInterface, interfaceInfo, elanInstance, isFirstInterfaceInDpn, this);
+                elanInterface, interfaceInfo, elanInstance, dpnInterfaceInfoHolder.firstInterfaceInDpn, this);
         jobCoordinator.enqueueJob(jobKey, addWorker, ElanConstants.JOB_MAX_RETRIES);
         return futures;
     }
 
+    private static ListenableFuture<Void> waitForCompletion(ListenableFuture<Void> future) {
+        try {
+            future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            // NETVIRT-1215: Do not log.error() here, only debug()
+            LOG.debug("Error writing to datastore", e);
+        }
+        return future;
+    }
+
     List<ListenableFuture<Void>> setupEntriesForElanInterface(ElanInstance elanInstance,
             ElanInterface elanInterface, InterfaceInfo interfaceInfo, boolean isFirstInterfaceInDpn)
             throws ElanException {
         String elanInstanceName = elanInstance.getElanInstanceName();
         String interfaceName = elanInterface.getName();
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        BigInteger dpId = interfaceInfo.getDpId();
-        WriteTransaction writeFlowGroupTx = broker.newWriteOnlyTransaction();
-        installEntriesForElanInterface(elanInstance, elanInterface, interfaceInfo,
-                isFirstInterfaceInDpn, tx, writeFlowGroupTx);
-
-        List<StaticMacEntries> staticMacEntriesList = elanInterface.getStaticMacEntries();
-        List<PhysAddress> staticMacAddresses = Lists.newArrayList();
-
-        boolean isInterfaceOperational = isOperational(interfaceInfo);
-        if (ElanUtils.isNotEmpty(staticMacEntriesList)) {
-            for (StaticMacEntries staticMacEntry : staticMacEntriesList) {
-                InstanceIdentifier<MacEntry> macId = getMacEntryOperationalDataPath(elanInstanceName,
-                        staticMacEntry.getMacAddress());
-                Optional<MacEntry> existingMacEntry = ElanUtils.read(broker,
-                        LogicalDatastoreType.OPERATIONAL, macId);
-                if (existingMacEntry.isPresent()) {
-                    elanForwardingEntriesHandler.updateElanInterfaceForwardingTablesList(
-                            elanInstanceName, interfaceName, existingMacEntry.get().getInterface(),
-                            existingMacEntry.get(), tx);
-                } else {
-                    elanForwardingEntriesHandler
-                            .addElanInterfaceForwardingTableList(elanInstanceName, interfaceName, staticMacEntry, tx);
-                }
+        List<ListenableFuture<Void>> futures = new ArrayList<>();
+        AtomicBoolean interfaceOperational = new AtomicBoolean();
+        futures.add(waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+            BigInteger dpId = interfaceInfo.getDpId();
+            futures.add(waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(writeFlowGroupTx -> {
+                installEntriesForElanInterface(elanInstance, elanInterface, interfaceInfo,
+                        isFirstInterfaceInDpn, tx, writeFlowGroupTx);
+
+                List<StaticMacEntries> staticMacEntriesList = elanInterface.getStaticMacEntries();
+                List<PhysAddress> staticMacAddresses = Lists.newArrayList();
+
+                interfaceOperational.set(isOperational(interfaceInfo));
+                if (ElanUtils.isNotEmpty(staticMacEntriesList)) {
+                    for (StaticMacEntries staticMacEntry : staticMacEntriesList) {
+                        InstanceIdentifier<MacEntry> macId = getMacEntryOperationalDataPath(elanInstanceName,
+                                staticMacEntry.getMacAddress());
+                        Optional<MacEntry> existingMacEntry = ElanUtils.read(broker,
+                                LogicalDatastoreType.OPERATIONAL, macId);
+                        if (existingMacEntry.isPresent()) {
+                            elanForwardingEntriesHandler.updateElanInterfaceForwardingTablesList(
+                                    elanInstanceName, interfaceName, existingMacEntry.get().getInterface(),
+                                    existingMacEntry.get(), tx);
+                        } else {
+                            elanForwardingEntriesHandler.addElanInterfaceForwardingTableList(elanInstanceName,
+                                    interfaceName, staticMacEntry, tx);
+                        }
 
-                if (isInterfaceOperational) {
-                    // Setting SMAC, DMAC, UDMAC in this DPN and also in other
-                    // DPNs
-                    String macAddress = staticMacEntry.getMacAddress().getValue();
-                    LOG.info("programming smac and dmacs for {} on source and other DPNs for elan {} and interface {}",
-                            macAddress, elanInstanceName, interfaceName);
-                    elanUtils.setupMacFlows(elanInstance, interfaceInfo, ElanConstants.STATIC_MAC_TIMEOUT,
-                            staticMacEntry.getMacAddress().getValue(), true, writeFlowGroupTx);
-                }
-            }
+                        if (interfaceOperational.get()) {
+                            // Setting SMAC, DMAC, UDMAC in this DPN and also in other
+                            // DPNs
+                            String macAddress = staticMacEntry.getMacAddress().getValue();
+                            LOG.info("programming smac and dmacs for {} on source and other DPNs for elan {} and "
+                                    + "interface {}", macAddress, elanInstanceName, interfaceName);
+                            elanUtils.setupMacFlows(elanInstance, interfaceInfo, ElanConstants.STATIC_MAC_TIMEOUT,
+                                    staticMacEntry.getMacAddress().getValue(), true, writeFlowGroupTx);
+                        }
+                    }
 
-            if (isInterfaceOperational) {
-                // Add MAC in TOR's remote MACs via OVSDB. Outside of the loop
-                // on purpose.
-                for (StaticMacEntries staticMacEntry : staticMacEntriesList) {
-                    staticMacAddresses.add(staticMacEntry.getMacAddress());
+                    if (interfaceOperational.get()) {
+                        // Add MAC in TOR's remote MACs via OVSDB. Outside of the loop
+                        // on purpose.
+                        for (StaticMacEntries staticMacEntry : staticMacEntriesList) {
+                            staticMacAddresses.add(staticMacEntry.getMacAddress());
+                        }
+                        elanL2GatewayUtils.scheduleAddDpnMacInExtDevices(elanInstance.getElanInstanceName(), dpId,
+                                staticMacAddresses);
+                    }
                 }
-                elanL2GatewayUtils.scheduleAddDpnMacInExtDevices(elanInstance.getElanInstanceName(), dpId,
-                        staticMacAddresses);
-            }
-        }
-        List<ListenableFuture<Void>> futures = new ArrayList<>();
-        futures.add(ElanUtils.waitForTransactionToComplete(tx));
-        futures.add(ElanUtils.waitForTransactionToComplete(writeFlowGroupTx));
-        if (isInterfaceOperational && !interfaceManager.isExternalInterface(interfaceName)) {
+            })));
+        })));
+        if (interfaceOperational.get() && !interfaceManager.isExternalInterface(interfaceName)) {
             //At this point, the interface is operational and D/SMAC flows have been configured, mark the port active
             try {
                 Port neutronPort = neutronVpnManager.getNeutronPort(interfaceName);
@@ -836,11 +877,8 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         }
         DpnInterfaces dpnInterfaces = existingElanDpnInterfaces.get();
 
-        if (dpnInterfaces.getInterfaces().size() ==  0 || (dpnInterfaces.getInterfaces().size() == 1
-                && dpnInterfaces.getInterfaces().contains(routerPortUuid))) {
-            return true;
-        }
-        return false;
+        return dpnInterfaces.getInterfaces().size() == 0 || (dpnInterfaces.getInterfaces().size() == 1
+                && dpnInterfaces.getInterfaces().contains(routerPortUuid));
     }
 
     private InstanceIdentifier<MacEntry> getMacEntryOperationalDataPath(String elanName, PhysAddress physAddress) {
@@ -988,7 +1026,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                                 List<Action> remoteListActionInfo = elanItmUtils.getInternalTunnelItmEgressAction(
                                         dpnInterface.getDpId(), otherFes.getDpId(),
                                         elanUtils.isOpenstackVniSemanticsEnforced()
-                                                ? elanUtils.getVxlanSegmentationId(elanInfo) : elanTag);
+                                                ? ElanUtils.getVxlanSegmentationId(elanInfo) : elanTag);
                                 if (!remoteListActionInfo.isEmpty()) {
                                     remoteListBucketInfo.add(MDSALUtil.buildBucket(remoteListActionInfo, MDSALUtil
                                             .GROUP_WEIGHT, bucketId, MDSALUtil.WATCH_PORT, MDSALUtil.WATCH_GROUP));
@@ -1074,23 +1112,23 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
 
     // Install DMAC entry on dst DPN
     public List<ListenableFuture<Void>> installDMacAddressTables(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
-            BigInteger dstDpId) throws ElanException {
+            BigInteger dstDpId) {
         String interfaceName = interfaceInfo.getInterfaceName();
         ElanInterfaceMac elanInterfaceMac = elanUtils.getElanInterfaceMacByInterfaceName(interfaceName);
         if (elanInterfaceMac != null && elanInterfaceMac.getMacEntry() != null) {
             List<MacEntry> macEntries = elanInterfaceMac.getMacEntry();
-            WriteTransaction writeFlowTx = broker.newWriteOnlyTransaction();
-            for (MacEntry macEntry : macEntries) {
-                String macAddress = macEntry.getMacAddress().getValue();
-                LOG.info("Installing remote dmac for mac address {} and interface {}", macAddress, interfaceName);
-                synchronized (ElanUtils.getElanMacDPNKey(elanInfo.getElanTag(), macAddress,
-                        interfaceInfo.getDpId())) {
-                    LOG.info("Acquired lock for mac : {}, proceeding with remote dmac install operation", macAddress);
-                    elanUtils.setupDMacFlowOnRemoteDpn(elanInfo, interfaceInfo, dstDpId, macAddress,
-                            writeFlowTx);
+            return Collections.singletonList(waitForCompletion(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+                for (MacEntry macEntry : macEntries) {
+                    String macAddress = macEntry.getMacAddress().getValue();
+                    LOG.info("Installing remote dmac for mac address {} and interface {}", macAddress, interfaceName);
+                    synchronized (ElanUtils.getElanMacDPNKey(elanInfo.getElanTag(), macAddress,
+                            interfaceInfo.getDpId())) {
+                        LOG.info("Acquired lock for mac : {}, proceeding with remote dmac install operation",
+                                macAddress);
+                        elanUtils.setupDMacFlowOnRemoteDpn(elanInfo, interfaceInfo, dstDpId, macAddress, tx);
+                    }
                 }
-            }
-            return Collections.singletonList(ElanUtils.waitForTransactionToComplete(writeFlowTx));
+            })));
         }
         return Collections.emptyList();
     }
@@ -1291,7 +1329,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             listMatchInfoBase = ElanUtils.getTunnelMatchesForServiceId((int) elanTag);
             instructionInfos = getInstructionsForOutGroup(ElanUtils.getElanLocalBCGId(elanTag));
         } else {
-            serviceId = elanUtils.getVxlanSegmentationId(elanInfo);
+            serviceId = ElanUtils.getVxlanSegmentationId(elanInfo);
             listMatchInfoBase = buildMatchesForVni(serviceId);
             instructionInfos = getInstructionsIntOrExtTunnelTable(elanTag);
         }
index ad1d24995ba739d3036b94f626dad2aeca141220..3d8e61a595d1f90f3b2cbb083bbcd1ef2a418a72 100644 (file)
@@ -12,11 +12,13 @@ import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastor
 
 import com.google.common.base.Optional;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
 import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler;
@@ -44,11 +46,11 @@ public class NodeConnectedHandler {
     private final PSAugmentationMerger psAugmentationMerger = PSAugmentationMerger.getInstance();
     private final GlobalNodeMerger globalNodeMerger = GlobalNodeMerger.getInstance();
     private final PSNodeMerger psNodeMerger = PSNodeMerger.getInstance();
-    private final DataBroker db;
+    private final ManagedNewTransactionRunner txRunner;
     private final HwvtepNodeHACache hwvtepNodeHACache;
 
     public NodeConnectedHandler(final DataBroker db, final HwvtepNodeHACache hwvtepNodeHACache) {
-        this.db = db;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(db);
         this.hwvtepNodeHACache = hwvtepNodeHACache;
     }
 
@@ -93,10 +95,9 @@ public class NodeConnectedHandler {
                         hwvtepNodeHACache.updateConnectedNodeStatus(childNodePath);
                         LOG.info("HA child reconnected handleNodeReConnected {}",
                                 childNode.getNodeId().getValue());
-                        ReadWriteTransaction tx1 = db.newReadWriteTransaction();
-                        copyHAPSConfigToChildPS(haPSCfg.get(), childNodePath, tx1);
-                        tx1.submit().checkedGet();
-                    } catch (TransactionCommitFailedException e) {
+                        txRunner.callWithNewReadWriteTransactionAndSubmit(
+                            tx1 -> copyHAPSConfigToChildPS(haPSCfg.get(), childNodePath, tx1)).get();
+                    } catch (InterruptedException | ExecutionException e) {
                         LOG.error("Failed to process ", e);
                     }
                 });
index 2eaffa8d07c8a4789ff3fa7f55d2129f068d7706..99bfad39bbf4fbcf0fffbb86ee0af8210262d83a 100755 (executable)
@@ -288,18 +288,22 @@ public class ElanUtils {
         }
     }
 
+    public static <T extends DataObject> void delete(ManagedNewTransactionRunner txRunner,
+            LogicalDatastoreType datastoreType, InstanceIdentifier<T> path) {
+        Futures.addCallback(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+            tx -> tx.delete(datastoreType, path)), DEFAULT_CALLBACK, MoreExecutors.directExecutor());
+    }
+
     public static <T extends DataObject> void delete(DataBroker broker, LogicalDatastoreType datastoreType,
             InstanceIdentifier<T> path) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        tx.delete(datastoreType, path);
-        Futures.addCallback(tx.submit(), DEFAULT_CALLBACK, MoreExecutors.directExecutor());
+        Futures.addCallback(new ManagedNewTransactionRunnerImpl(broker).callWithNewWriteOnlyTransactionAndSubmit(
+            tx -> tx.delete(datastoreType, path)), DEFAULT_CALLBACK, MoreExecutors.directExecutor());
     }
 
     public static <T extends DataObject> void delete(DataBroker broker, LogicalDatastoreType datastoreType,
             InstanceIdentifier<T> path, FutureCallback<Void> callback) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        tx.delete(datastoreType, path);
-        Futures.addCallback(tx.submit(), callback, MoreExecutors.directExecutor());
+        Futures.addCallback(new ManagedNewTransactionRunnerImpl(broker).callWithNewWriteOnlyTransactionAndSubmit(
+            tx -> tx.delete(datastoreType, path)), callback, MoreExecutors.directExecutor());
     }
 
     public static InstanceIdentifier<ElanInstance> getElanInstanceIdentifier() {
@@ -1378,18 +1382,6 @@ public class ElanUtils {
         return idBuilder.build();
     }
 
-    public static CheckedFuture<Void, TransactionCommitFailedException> waitForTransactionToComplete(
-            WriteTransaction tx) {
-        CheckedFuture<Void, TransactionCommitFailedException> futures = tx.submit();
-        try {
-            futures.get();
-        } catch (InterruptedException | ExecutionException e) {
-            // NETVIRT-1215: Do not log.error() here, only debug()
-            LOG.debug("Error writing to datastore", e);
-        }
-        return futures;
-    }
-
     public static boolean isVxlan(@Nullable ElanInstance elanInstance) {
         return elanInstance != null && elanInstance.getSegmentType() != null
                 && elanInstance.getSegmentType().isAssignableFrom(SegmentTypeVxlan.class)