Use JvmGlobalLocks in elanmanager
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / internal / ElanInterfaceManager.java
index 8b6ab601d89c8a1e665e8d155a11b50994e1d0e0..62255cec3e5cd121808b4a9d4b448cc4c78405f7 100644 (file)
@@ -7,10 +7,12 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static java.util.Collections.emptyList;
 import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
 import static org.opendaylight.genius.infra.Datastore.OPERATIONAL;
 import static org.opendaylight.netvirt.elan.utils.ElanUtils.isVxlanNetworkOrVxlanSegment;
+import static org.opendaylight.netvirt.elan.utils.ElanUtils.requireNonNullElse;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -28,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -65,6 +68,7 @@ import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
 import org.opendaylight.genius.mdsalutil.matches.MatchMetadata;
 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
+import org.opendaylight.genius.utils.JvmGlobalLocks;
 import org.opendaylight.genius.utils.ServiceIndex;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
@@ -264,7 +268,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                 return;
             }
             futures.add(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, flowTx -> {
-                List<String> elanInterfaces = elanState.getElanInterfaces();
+                List<String> elanInterfaces = requireNonNullElse(elanState.getElanInterfaces(), emptyList());
                 if (elanInterfaces.isEmpty()) {
                     holder.isLastElanInterface = true;
                 }
@@ -294,11 +298,11 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                                 elanUtils.removeTerminatingServiceAction(holder.dpId,
                                     ElanUtils.getVxlanSegmentationId(elanInfo).intValue());
                             }
-                            unsetExternalTunnelTable(holder.dpId, elanInfo);
+                            unsetExternalTunnelTable(holder.dpId, elanInfo, flowTx);
                         }
                         holder.isLastInterfaceOnDpn = true;
                     } else {
-                        setupLocalBroadcastGroups(elanInfo, dpnInterfaces, interfaceInfo);
+                        setupLocalBroadcastGroups(elanInfo, dpnInterfaces, interfaceInfo, flowTx);
                     }
                 }
             }));
@@ -306,7 +310,9 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         futures.forEach(ElanUtils::waitForTransactionToComplete);
 
         if (holder.isLastInterfaceOnDpn && holder.dpId != null && isVxlanNetworkOrVxlanSegment(elanInfo)) {
-            setElanAndEtreeBCGrouponOtherDpns(elanInfo, holder.dpId);
+            futures.add(
+                ElanUtils.waitForTransactionToComplete(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                    confTx -> setElanAndEtreeBCGrouponOtherDpns(elanInfo, holder.dpId, confTx))));
         }
         InterfaceRemoveWorkerOnElanInterface removeInterfaceWorker = new InterfaceRemoveWorkerOnElanInterface(
                 interfaceName, elanInfo, interfaceInfo, this, holder.isLastElanInterface);
@@ -365,7 +371,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         if (elanState == null) {
             return elanState;
         }
-        List<String> elanInterfaces = elanState.getElanInterfaces();
+        List<String> elanInterfaces = requireNonNullElse(elanState.getElanInterfaces(), emptyList());
         boolean isRemoved = elanInterfaces.remove(interfaceName);
         if (!isRemoved) {
             return elanState;
@@ -458,8 +464,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                                                                          String interfaceName, long elanTag,
                                                                          TypedReadWriteTransaction<Operational> tx)
             throws ExecutionException, InterruptedException {
-        synchronized (elanName.intern()) {
-
+        // FIXME: pass in and use ElanInstanceKey instead?
+        final ReentrantLock lock = JvmGlobalLocks.getLockForString(elanName);
+        lock.lock();
+        try {
             DpnInterfaces dpnInterfaces = elanUtils.getElanInterfaceInfoByElanDpn(elanName, dpId);
             if (dpnInterfaces != null) {
                 List<String> interfaceLists = dpnInterfaces.getInterfaces();
@@ -475,38 +483,45 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                 }
             }
             return dpnInterfaces;
+        } finally {
+            lock.unlock();
         }
     }
 
     private void deleteAllRemoteMacsInADpn(String elanName, BigInteger dpId, long elanTag) {
         List<DpnInterfaces> dpnInterfaces = elanUtils.getInvolvedDpnsInElan(elanName);
-        for (DpnInterfaces dpnInterface : dpnInterfaces) {
-            BigInteger currentDpId = dpnInterface.getDpId();
-            if (!currentDpId.equals(dpId)) {
-                for (String elanInterface : dpnInterface.getInterfaces()) {
-                    ElanInterfaceMac macs = elanUtils.getElanInterfaceMacByInterfaceName(elanInterface);
-                    if (macs == null || macs.getMacEntry() == null) {
-                        continue;
-                    }
-                    for (MacEntry mac : macs.getMacEntry()) {
-                        removeTheMacFlowInTheDPN(dpId, elanTag, currentDpId, mac);
-                        removeEtreeMacFlowInTheDPN(dpId, elanTag, currentDpId, mac);
+        ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, confTx -> {
+            for (DpnInterfaces dpnInterface : dpnInterfaces) {
+                BigInteger currentDpId = dpnInterface.getDpId();
+                if (!currentDpId.equals(dpId)) {
+                    for (String elanInterface : requireNonNullElse(dpnInterface.getInterfaces(),
+                            Collections.<String>emptyList())) {
+                        ElanInterfaceMac macs = elanUtils.getElanInterfaceMacByInterfaceName(elanInterface);
+                        if (macs == null || macs.getMacEntry() == null) {
+                            continue;
+                        }
+                        for (MacEntry mac : macs.getMacEntry()) {
+                            removeTheMacFlowInTheDPN(dpId, elanTag, currentDpId, mac, confTx);
+                            removeEtreeMacFlowInTheDPN(dpId, elanTag, currentDpId, mac, confTx);
+                        }
                     }
                 }
             }
-        }
+        }), LOG, "Error deleting remote MACs in DPN {}", dpId);
     }
 
-    private void removeEtreeMacFlowInTheDPN(BigInteger dpId, long elanTag, BigInteger currentDpId, MacEntry mac) {
+    private void removeEtreeMacFlowInTheDPN(BigInteger dpId, long elanTag, BigInteger currentDpId, MacEntry mac,
+            TypedReadWriteTransaction<Configuration> confTx) throws ExecutionException, InterruptedException {
         EtreeLeafTagName etreeLeafTag = elanEtreeUtils.getEtreeLeafTagByElanTag(elanTag);
         if (etreeLeafTag != null) {
-            removeTheMacFlowInTheDPN(dpId, etreeLeafTag.getEtreeLeafTag().getValue(), currentDpId, mac);
+            removeTheMacFlowInTheDPN(dpId, etreeLeafTag.getEtreeLeafTag().getValue(), currentDpId, mac, confTx);
         }
     }
 
-    private void removeTheMacFlowInTheDPN(BigInteger dpId, long elanTag, BigInteger currentDpId, MacEntry mac) {
+    private void removeTheMacFlowInTheDPN(BigInteger dpId, long elanTag, BigInteger currentDpId, MacEntry mac,
+            TypedReadWriteTransaction<Configuration> confTx) throws ExecutionException, InterruptedException {
         mdsalManager
-                .removeFlow(dpId,
+                .removeFlow(confTx, dpId,
                         MDSALUtil.buildFlow(NwConstants.ELAN_DMAC_TABLE,
                                 ElanUtils.getKnownDynamicmacFlowRef(NwConstants.ELAN_DMAC_TABLE, dpId, currentDpId,
                                         mac.getMacAddress().getValue(), elanTag)));
@@ -635,7 +650,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         }
         for (DpnInterfaces dpnInterfaces : dpnInterfaceLists) {
             BigInteger dstDpId = interfaceInfo.getDpId();
-            if (dpnInterfaces.getDpId().equals(dstDpId)) {
+            if (Objects.equals(dpnInterfaces.getDpId(), dstDpId)) {
                 continue;
             }
             List<String> remoteElanInterfaces = dpnInterfaces.getInterfaces();
@@ -695,7 +710,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             // DpnInterfaces in the operational DS.
             holder.dpId = interfaceInfo.getDpId();
             if (holder.dpId != null && !holder.dpId.equals(ElanConstants.INVALID_DPN)) {
-                synchronized (elanInstanceName.intern()) {
+                // FIXME: use elanInstaince.key() instead?
+                final ReentrantLock lock = JvmGlobalLocks.getLockForString(elanInstanceName);
+                lock.lock();
+                try {
                     InstanceIdentifier<DpnInterfaces> elanDpnInterfaces = ElanUtils
                         .getElanDpnInterfaceOperationalDataPath(elanInstanceName, holder.dpId);
                     Optional<DpnInterfaces> existingElanDpnInterfaces = operTx.read(elanDpnInterfaces).get();
@@ -711,7 +729,8 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                             holder.dpnInterfaces =
                                 createElanInterfacesList(elanInstanceName, interfaceName, holder.dpId, operTx);
                         } else {
-                            List<String> elanInterfaces = existingElanDpnInterfaces.get().getInterfaces();
+                            List<String> elanInterfaces =
+                                requireNonNullElse(existingElanDpnInterfaces.get().getInterfaces(), emptyList());
                             elanInterfaces.add(interfaceName);
                             holder.dpnInterfaces = updateElanDpnInterfacesList(elanInstanceName, holder.dpId,
                                 elanInterfaces, operTx);
@@ -719,12 +738,14 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                         // The 1st ElanInterface in a DPN must program the Ext Tunnel
                         // table, but only if Elan has VNI
                         if (isVxlanNetworkOrVxlanSegment(elanInstance)) {
-                            setExternalTunnelTable(holder.dpId, elanInstance);
+                            futures.add(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                                confTx -> setExternalTunnelTable(holder.dpId, elanInstance, confTx)));
                         }
                         elanL2GatewayUtils.installElanL2gwDevicesLocalMacsInDpn(holder.dpId, elanInstance,
                             interfaceName);
                     } else {
-                        List<String> elanInterfaces = existingElanDpnInterfaces.get().getInterfaces();
+                        List<String> elanInterfaces =
+                            requireNonNullElse(existingElanDpnInterfaces.get().getInterfaces(), emptyList());
                         elanInterfaces.add(interfaceName);
                         if (elanInterfaces.size() == 1) { // 1st dpn interface
                             elanL2GatewayUtils.installElanL2gwDevicesLocalMacsInDpn(holder.dpId, elanInstance,
@@ -733,6 +754,8 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                         holder.dpnInterfaces =
                             updateElanDpnInterfacesList(elanInstanceName, holder.dpId, elanInterfaces, operTx);
                     }
+                } finally {
+                    lock.unlock();
                 }
             }
 
@@ -744,8 +767,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             createElanInterfaceTablesList(interfaceName, operTx);
         }));
         futures.forEach(ElanUtils::waitForTransactionToComplete);
-        installEntriesForFirstInterfaceonDpn(elanInstance, interfaceInfo, holder.dpnInterfaces,
-            holder.isFirstInterfaceInDpn);
+        futures.add(
+            ElanUtils.waitForTransactionToComplete(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                confTx -> installEntriesForFirstInterfaceonDpn(elanInstance, interfaceInfo, holder.dpnInterfaces,
+                    holder.isFirstInterfaceInDpn, confTx))));
 
         // add the vlan provider interface to remote BC group for the elan
         // for internal vlan networks
@@ -760,7 +785,9 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             //update the remote-DPNs remoteBC group entry with Tunnels
             LOG.trace("update remote bc group for elan {} on other DPNs for newly added dpn {}", elanInstance,
                 holder.dpId);
-            setElanAndEtreeBCGrouponOtherDpns(elanInstance, holder.dpId);
+            futures.add(
+                ElanUtils.waitForTransactionToComplete(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                    confTx -> setElanAndEtreeBCGrouponOtherDpns(elanInstance, holder.dpId, confTx))));
         }
 
         String jobKey = ElanUtils.getElanInterfaceJobKey(interfaceName);
@@ -869,13 +896,14 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         }
         DpnInterfaces dpnInterfaces = existingElanDpnInterfaces.get();
         int dummyInterfaceCount =  0;
-        if (dpnInterfaces.getInterfaces().contains(routerPortUuid)) {
+        List<String> interfaces = requireNonNullElse(dpnInterfaces.getInterfaces(), emptyList());
+        if (interfaces.contains(routerPortUuid)) {
             dummyInterfaceCount++;
         }
-        if (dpnInterfaces.getInterfaces().contains(elanInstanceName)) {
+        if (interfaces.contains(elanInstanceName)) {
             dummyInterfaceCount++;
         }
-        return dpnInterfaces.getInterfaces().size() - dummyInterfaceCount == 0;
+        return interfaces.size() - dummyInterfaceCount == 0;
     }
 
     private InstanceIdentifier<MacEntry> getMacEntryOperationalDataPath(String elanName, PhysAddress physAddress) {
@@ -918,13 +946,13 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         bindService(elanInstance, elanInterface, interfaceInfo.getInterfaceTag(), confTx);
     }
 
-    public void installEntriesForFirstInterfaceonDpn(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
-            DpnInterfaces dpnInterfaces, boolean isFirstInterfaceInDpn) {
+    private void installEntriesForFirstInterfaceonDpn(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
+            DpnInterfaces dpnInterfaces, boolean isFirstInterfaceInDpn, TypedWriteTransaction<Configuration> confTx) {
         if (!isOperational(interfaceInfo)) {
             return;
         }
         // LocalBroadcast Group creation with elan-Interfaces
-        setupLocalBroadcastGroups(elanInfo, dpnInterfaces, interfaceInfo);
+        setupLocalBroadcastGroups(elanInfo, dpnInterfaces, interfaceInfo, confTx);
         if (isFirstInterfaceInDpn) {
             LOG.trace("waitTimeForSyncInstall is {}", WAIT_TIME_FOR_SYNC_INSTALL);
             BigInteger dpId = interfaceInfo.getDpId();
@@ -934,7 +962,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             } catch (InterruptedException e1) {
                 LOG.warn("Error while waiting for local BC group for ELAN {} to install", elanInfo);
             }
-            elanL2GatewayMulticastUtils.setupElanBroadcastGroups(elanInfo, dpnInterfaces, dpId);
+            elanL2GatewayMulticastUtils.setupElanBroadcastGroups(elanInfo, dpnInterfaces, dpId, confTx);
             try {
                 Thread.sleep(WAIT_TIME_FOR_SYNC_INSTALL);
             } catch (InterruptedException e1) {
@@ -978,24 +1006,26 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         mdsalManager.removeFlow(flowTx, interfaceInfo.getDpId(), flowEntity);
     }
 
-    private void setElanAndEtreeBCGrouponOtherDpns(ElanInstance elanInfo, BigInteger dpId) {
+    private void setElanAndEtreeBCGrouponOtherDpns(ElanInstance elanInfo, BigInteger dpId,
+            TypedWriteTransaction<Configuration> confTx) {
         int elanTag = elanInfo.getElanTag().intValue();
         long groupId = ElanUtils.getElanRemoteBCGId(elanTag);
-        setBCGrouponOtherDpns(elanInfo, dpId, elanTag, groupId);
+        setBCGrouponOtherDpns(elanInfo, dpId, elanTag, groupId, confTx);
         EtreeInstance etreeInstance = elanInfo.augmentation(EtreeInstance.class);
         if (etreeInstance != null) {
             int etreeLeafTag = etreeInstance.getEtreeLeafTagVal().getValue().intValue();
             long etreeLeafGroupId = ElanUtils.getEtreeLeafRemoteBCGId(etreeLeafTag);
-            setBCGrouponOtherDpns(elanInfo, dpId, etreeLeafTag, etreeLeafGroupId);
+            setBCGrouponOtherDpns(elanInfo, dpId, etreeLeafTag, etreeLeafGroupId, confTx);
         }
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void setBCGrouponOtherDpns(ElanInstance elanInfo, BigInteger dpId, int elanTag, long groupId) {
+    private void setBCGrouponOtherDpns(ElanInstance elanInfo, BigInteger dpId, int elanTag, long groupId,
+            TypedWriteTransaction<Configuration> confTx) {
         int bucketId = 0;
         ElanDpnInterfacesList elanDpns = elanUtils.getElanDpnInterfacesList(elanInfo.getElanInstanceName());
         if (elanDpns != null) {
-            List<DpnInterfaces> dpnInterfaces = elanDpns.getDpnInterfaces();
+            List<DpnInterfaces> dpnInterfaces = requireNonNullElse(elanDpns.getDpnInterfaces(), emptyList());
             for (DpnInterfaces dpnInterface : dpnInterfaces) {
                 List<Bucket> remoteListBucketInfo = new ArrayList<>();
                 if (elanUtils.isDpnPresent(dpnInterface.getDpId()) && !Objects.equals(dpnInterface.getDpId(), dpId)
@@ -1039,7 +1069,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                     Group group = MDSALUtil.buildGroup(groupId, elanInfo.getElanInstanceName(), GroupTypes.GroupAll,
                             MDSALUtil.buildBucketLists(remoteListBucketInfo));
                     LOG.trace("Installing remote bc group {} on dpnId {}", group, dpnInterface.getDpId());
-                    mdsalManager.syncInstallGroup(dpnInterface.getDpId(), group);
+                    mdsalManager.addGroup(confTx, dpnInterface.getDpId(), group);
                 }
             }
             try {
@@ -1091,7 +1121,6 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
     }
 
     // Install DMAC entry on dst DPN
-    @SuppressWarnings("checkstyle:ForbidCertainMethod")
     public List<ListenableFuture<Void>> installDMacAddressTables(ElanInstance elanInfo, InterfaceInfo interfaceInfo,
             BigInteger dstDpId) {
         String interfaceName = interfaceInfo.getInterfaceName();
@@ -1113,7 +1142,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                     }
                 })));
         }
-        return Collections.emptyList();
+        return emptyList();
     }
 
     private void createDropBucket(List<Bucket> listBucket) {
@@ -1124,20 +1153,20 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         listBucket.add(dropBucket);
     }
 
-    public void setupLocalBroadcastGroups(ElanInstance elanInfo, DpnInterfaces newDpnInterface,
-            InterfaceInfo interfaceInfo) {
-        setupStandardLocalBroadcastGroups(elanInfo, newDpnInterface, interfaceInfo);
-        setupLeavesLocalBroadcastGroups(elanInfo, newDpnInterface, interfaceInfo);
+    private void setupLocalBroadcastGroups(ElanInstance elanInfo, DpnInterfaces newDpnInterface,
+            InterfaceInfo interfaceInfo, TypedWriteTransaction<Configuration> confTx) {
+        setupStandardLocalBroadcastGroups(elanInfo, newDpnInterface, interfaceInfo, confTx);
+        setupLeavesLocalBroadcastGroups(elanInfo, newDpnInterface, interfaceInfo, confTx);
     }
 
-    public void setupStandardLocalBroadcastGroups(ElanInstance elanInfo, DpnInterfaces newDpnInterface,
-            InterfaceInfo interfaceInfo) {
+    private void setupStandardLocalBroadcastGroups(ElanInstance elanInfo, DpnInterfaces newDpnInterface,
+            InterfaceInfo interfaceInfo, TypedWriteTransaction<Configuration> confTx) {
         List<Bucket> listBucket = new ArrayList<>();
         int bucketId = 0;
         long groupId = ElanUtils.getElanLocalBCGId(elanInfo.getElanTag());
 
         List<String> interfaces = new ArrayList<>();
-        if (newDpnInterface != null) {
+        if (newDpnInterface != null && newDpnInterface.getInterfaces() != null) {
             interfaces = newDpnInterface.getInterfaces();
         }
         for (String ifName : interfaces) {
@@ -1159,18 +1188,18 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         Group group = MDSALUtil.buildGroup(groupId, elanInfo.getElanInstanceName(), GroupTypes.GroupAll,
                 MDSALUtil.buildBucketLists(listBucket));
         LOG.trace("installing the localBroadCast Group:{}", group);
-        mdsalManager.syncInstallGroup(interfaceInfo.getDpId(), group);
+        mdsalManager.addGroup(confTx, interfaceInfo.getDpId(), group);
     }
 
     private void setupLeavesLocalBroadcastGroups(ElanInstance elanInfo, DpnInterfaces newDpnInterface,
-            InterfaceInfo interfaceInfo) {
+            InterfaceInfo interfaceInfo, TypedWriteTransaction<Configuration> confTx) {
         EtreeInstance etreeInstance = elanInfo.augmentation(EtreeInstance.class);
         if (etreeInstance != null) {
             List<Bucket> listBucket = new ArrayList<>();
             int bucketId = 0;
 
             List<String> interfaces = new ArrayList<>();
-            if (newDpnInterface != null) {
+            if (newDpnInterface != null && newDpnInterface.getInterfaces() != null) {
                 interfaces = newDpnInterface.getInterfaces();
             }
             for (String ifName : interfaces) {
@@ -1198,7 +1227,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             Group group = MDSALUtil.buildGroup(groupId, elanInfo.getElanInstanceName(), GroupTypes.GroupAll,
                     MDSALUtil.buildBucketLists(listBucket));
             LOG.trace("installing the localBroadCast Group:{}", group);
-            mdsalManager.syncInstallGroup(interfaceInfo.getDpId(), group);
+            mdsalManager.addGroup(confTx, interfaceInfo.getDpId(), group);
         }
     }
 
@@ -1235,13 +1264,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
      * Installs a flow in the External Tunnel table consisting in translating
      * the VNI retrieved from the packet that came over a tunnel with a TOR into
      * elanTag that will be used later in the ELANs pipeline.
-     *
-     * @param dpnId
-     *            the dpn id
-     * @param elanInfo
-     *            the elan info
+     * @param dpnId the dpn id
      */
-    public void setExternalTunnelTable(BigInteger dpnId, ElanInstance elanInfo) {
+    private void setExternalTunnelTable(BigInteger dpnId, ElanInstance elanInfo,
+            TypedWriteTransaction<Configuration> confTx) {
         long elanTag = elanInfo.getElanTag();
         FlowEntity flowEntity = MDSALUtil.buildFlowEntity(dpnId, NwConstants.EXTERNAL_TUNNEL_TABLE,
                 getFlowRef(NwConstants.EXTERNAL_TUNNEL_TABLE, elanTag), 5, // prio
@@ -1252,7 +1278,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                 buildMatchesForVni(ElanUtils.getVxlanSegmentationId(elanInfo)),
                 getInstructionsIntOrExtTunnelTable(elanTag));
 
-        mdsalManager.installFlow(flowEntity);
+        mdsalManager.addFlow(confTx, flowEntity);
     }
 
     /**
@@ -1260,12 +1286,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
      * elanTag. Important: ensure this method is only called whenever there is
      * no other ElanInterface in the specified DPN
      *
-     * @param dpnId
-     *            DPN whose Ext Tunnel table is going to be modified
-     * @param elanInfo
-     *            holds the elanTag needed for selecting the flow to be removed
+     * @param dpnId DPN whose Ext Tunnel table is going to be modified
      */
-    public void unsetExternalTunnelTable(BigInteger dpnId, ElanInstance elanInfo) {
+    private void unsetExternalTunnelTable(BigInteger dpnId, ElanInstance elanInfo,
+            TypedReadWriteTransaction<Configuration> confTx) throws ExecutionException, InterruptedException {
         // TODO: Use DataStoreJobCoordinator in order to avoid that removing the
         // last ElanInstance plus
         // adding a new one does (almost at the same time) are executed in that
@@ -1277,7 +1301,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             .setTableId(NwConstants.EXTERNAL_TUNNEL_TABLE)
             .setFlowId(flowId)
             .build();
-        mdsalManager.removeFlow(flowEntity);
+        mdsalManager.removeFlow(confTx, flowEntity);
     }
 
     public void setupTerminateServiceTable(ElanInstance elanInfo, BigInteger dpId,
@@ -1558,7 +1582,8 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         if (dpnInterfaceLists == null) {
             return;
         }
-        List<ElanDpnInterfacesList> elanDpnIf = dpnInterfaceLists.getElanDpnInterfacesList();
+        List<ElanDpnInterfacesList> elanDpnIf =
+            requireNonNullElse(dpnInterfaceLists.getElanDpnInterfacesList(), emptyList());
         for (ElanDpnInterfacesList elanDpns : elanDpnIf) {
             int cnt = 0;
             String elanName = elanDpns.getElanInstanceName();
@@ -1579,9 +1604,9 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             DpnInterfaces dstDpnIf = null;
             for (DpnInterfaces dpnIf : dpnInterfaces) {
                 BigInteger dpnIfDpId = dpnIf.getDpId();
-                if (dpnIfDpId.equals(srcDpId)) {
+                if (Objects.equals(dpnIfDpId, srcDpId)) {
                     cnt++;
-                } else if (dpnIfDpId.equals(dstDpId)) {
+                } else if (Objects.equals(dpnIfDpId, dstDpId)) {
                     cnt++;
                     dstDpnIf = dpnIf;
                 }
@@ -1593,7 +1618,9 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                     // update Remote BC Group
                     LOG.trace("procesing elan remote bc group for tunnel event {}", elanInfo);
                     try {
-                        elanL2GatewayMulticastUtils.setupElanBroadcastGroups(elanInfo, srcDpId);
+                        txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                            confTx -> elanL2GatewayMulticastUtils.setupElanBroadcastGroups(elanInfo, srcDpId,
+                                confTx)).get();
                     } catch (RuntimeException e) {
                         LOG.error("Error while adding remote bc group for {} on dpId {} ", elanName, srcDpId);
                     }
@@ -1606,10 +1633,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
                             if (isOperational(interfaceInfo)) {
                                 return installDMacAddressTables(elanInfo, interfaceInfo, srcDpId);
                             }
-                            return Collections.emptyList();
+                            return emptyList();
                         }, ElanConstants.JOB_MAX_RETRIES);
                     }
-                    return Collections.emptyList();
+                    return emptyList();
                 }, ElanConstants.JOB_MAX_RETRIES);
             }
 
@@ -1624,7 +1651,7 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
      * @param intrf
      *            the interface
      */
-    public void handleExternalTunnelStateEvent(ExternalTunnel externalTunnel, Interface intrf) {
+    void handleExternalTunnelStateEvent(ExternalTunnel externalTunnel, Interface intrf) {
         if (!validateExternalTunnelStateEvent(externalTunnel, intrf)) {
             return;
         }
@@ -1648,7 +1675,8 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
         if (dpnInterfaceLists == null) {
             return;
         }
-        List<ElanDpnInterfacesList> elanDpnIf = dpnInterfaceLists.getElanDpnInterfacesList();
+        List<ElanDpnInterfacesList> elanDpnIf =
+            requireNonNullElse(dpnInterfaceLists.getElanDpnInterfacesList(), emptyList());
         for (ElanDpnInterfacesList elanDpns : elanDpnIf) {
             String elanName = elanDpns.getElanInstanceName();
             ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
@@ -1660,7 +1688,10 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
             }
             LOG.debug("Elan instance:{} is present in Dpn:{} ", elanName, dpId);
 
-            elanL2GatewayMulticastUtils.setupElanBroadcastGroups(elanInfo, dpId);
+            final BigInteger finalDpId = dpId;
+            ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+                confTx -> elanL2GatewayMulticastUtils.setupElanBroadcastGroups(elanInfo, finalDpId, confTx)), LOG,
+                "Error setting up ELAN BGs");
             // install L2gwDevices local macs in dpn.
             elanL2GatewayUtils.installL2gwDeviceMacsInDpn(dpId, externalNodeId, elanInfo, intrf.getName());
             // Install dpn macs on external device
@@ -1767,7 +1798,9 @@ public class ElanInterfaceManager extends AsyncDataTreeChangeListenerBase<ElanIn
     public void handleExternalInterfaceEvent(ElanInstance elanInstance, DpnInterfaces dpnInterfaces,
                                              BigInteger dpId) {
         LOG.debug("setting up remote BC group for elan {}", elanInstance.getPhysicalNetworkName());
-        elanL2GatewayMulticastUtils.setupStandardElanBroadcastGroups(elanInstance, dpnInterfaces, dpId);
+        ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+            confTx -> elanL2GatewayMulticastUtils.setupStandardElanBroadcastGroups(elanInstance, dpnInterfaces, dpId,
+                confTx)), LOG, "Error setting up remote BC group for ELAN {}", elanInstance.getPhysicalNetworkName());
         try {
             Thread.sleep(WAIT_TIME_FOR_SYNC_INSTALL);
         } catch (InterruptedException e) {