NETVIRT-1630 migrate to md-sal APIs
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / internal / ElanNodeListener.java
index 2e068a23976865f51c4b966c37adc68890788f5d..de14506d7bef476f820273866054aa7e70e04f75 100644 (file)
@@ -7,23 +7,30 @@
  */
 package org.opendaylight.netvirt.elan.internal;
 
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+import static org.opendaylight.infrautils.utils.concurrent.LoggingFutures.addErrorLogging;
+
 import java.math.BigInteger;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import javax.annotation.PostConstruct;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.listeners.DataTreeEventCallbackRegistrar;
+import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.Datastore.Configuration;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
+import org.opendaylight.genius.infra.TypedReadWriteTransaction;
+import org.opendaylight.genius.infra.TypedWriteTransaction;
 import org.opendaylight.genius.mdsalutil.ActionInfo;
 import org.opendaylight.genius.mdsalutil.BucketInfo;
 import org.opendaylight.genius.mdsalutil.FlowEntity;
+import org.opendaylight.genius.mdsalutil.GroupEntity;
 import org.opendaylight.genius.mdsalutil.InstructionInfo;
 import org.opendaylight.genius.mdsalutil.MDSALUtil;
 import org.opendaylight.genius.mdsalutil.MatchInfo;
@@ -48,22 +55,31 @@ import org.opendaylight.genius.mdsalutil.matches.MatchEthernetDestination;
 import org.opendaylight.genius.mdsalutil.matches.MatchEthernetType;
 import org.opendaylight.genius.mdsalutil.nxmatches.NxMatchRegister;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.netvirt.elan.arp.responder.ArpResponderConstant;
 import org.opendaylight.netvirt.elan.arp.responder.ArpResponderUtil;
 import org.opendaylight.netvirt.elan.utils.ElanConstants;
+import org.opendaylight.netvirt.elan.utils.ElanUtils;
+import org.opendaylight.serviceutils.tools.listener.AbstractAsyncDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.MacAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.config.rev150710.ElanConfig;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowjava.nx.match.rev140421.NxmNxReg4;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
-public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, ElanNodeListener> {
+public class ElanNodeListener extends AbstractAsyncDataTreeChangeListener<Node> {
 
     private static final Logger LOG = LoggerFactory.getLogger(ElanNodeListener.class);
     private static final int LEARN_MATCH_REG4_VALUE = 1;
@@ -80,91 +96,104 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
     private final int arpPuntTimeout;
     private final boolean puntLldpToController;
     private final JobCoordinator jobCoordinator;
+    private final DataTreeEventCallbackRegistrar eventCallbacks;
 
     @Inject
     public ElanNodeListener(DataBroker dataBroker, IMdsalApiManager mdsalManager, ElanConfig elanConfig,
-            IdManagerService idManagerService, JobCoordinator jobCoordinator) {
+            IdManagerService idManagerService, JobCoordinator jobCoordinator,
+            DataTreeEventCallbackRegistrar eventCallbacks) {
+        super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class),
+                Executors.newListeningSingleThreadExecutor("ElanNodeListener", LOG));
         this.broker = dataBroker;
         this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.mdsalManager = mdsalManager;
-        this.tempSmacLearnTimeout = elanConfig.getTempSmacLearnTimeout();
+        this.tempSmacLearnTimeout = elanConfig.getTempSmacLearnTimeout().toJava();
         this.arpPuntTimeout = elanConfig.getArpPuntTimeout().intValue();
         this.puntLldpToController = elanConfig.isPuntLldpToController();
         this.idManagerService = idManagerService;
         this.jobCoordinator = jobCoordinator;
+        this.eventCallbacks = eventCallbacks;
     }
 
-    @Override
-    @PostConstruct
     public void init() {
-        registerListener(LogicalDatastoreType.OPERATIONAL, broker);
+        LOG.info("{} registered", getClass().getSimpleName());
     }
 
     @Override
-    protected InstanceIdentifier<Node> getWildCardPath() {
-        return InstanceIdentifier.create(Nodes.class).child(Node.class);
+    @PreDestroy
+    public void close() {
+        super.close();
+        Executors.shutdownAndAwaitTermination(getExecutorService());
     }
 
     @Override
-    protected void remove(InstanceIdentifier<Node> identifier, Node del) {
+    public void remove(InstanceIdentifier<Node> identifier, Node del) {
     }
 
     @Override
-    protected void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
+    public void update(InstanceIdentifier<Node> identifier, Node original, Node update) {
     }
 
     @Override
-    protected void add(InstanceIdentifier<Node> identifier, Node add) {
+    public void add(InstanceIdentifier<Node> identifier, Node add) {
         NodeId nodeId = add.getId();
         String[] node = nodeId.getValue().split(":");
         if (node.length < 2) {
             LOG.warn("Unexpected nodeId {}", nodeId.getValue());
             return;
         }
-        BigInteger dpId = new BigInteger(node[1]);
-        createTableMissEntry(dpId);
-        createMulticastFlows(dpId);
-        createArpDefaultFlowsForArpCheckTable(dpId);
+        addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION, tx -> {
+            Uint64 dpId = Uint64.valueOf(node[1]);
+            createTableMissEntry(tx, dpId);
+            createMulticastFlows(tx, dpId);
+            createArpDefaultFlowsForArpCheckTable(dpId);
+        }), LOG, "Error handling ELAN node addition for {}", add);
     }
 
-    private void createArpDefaultFlowsForArpCheckTable(BigInteger dpId) {
+    private void createArpDefaultFlowsForArpCheckTable(Uint64 dpId) {
+
         jobCoordinator.enqueueJob("ARP_CHECK_TABLE-" + dpId.toString(),
-            () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
-                LOG.debug("Received notification to install Arp Check Default entries for dpn {} ", dpId);
-                createArpRequestMatchFlows(dpId, tx);
-                createArpResponseMatchFlows(dpId, tx);
-                createArpPuntAndLearnFlow(dpId, tx);
-                addGarpLearnMatchFlow(dpId, tx);
-                addArpLearnMatchFlow(dpId, tx);
+            () -> Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+                try {
+                    LOG.debug("Received notification to install Arp Check Default entries for dpn {} ", dpId);
+                    createArpRequestMatchFlows(dpId, tx);
+                    createArpResponseMatchFlows(dpId, tx);
+                    createArpPuntAndLearnFlow(dpId, tx);
+                    addGarpLearnMatchFlow(dpId, tx);
+                    addArpLearnMatchFlow(dpId, tx);
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Error programming ARP rules for dpn {}", dpId, e);
+                }
             })));
     }
 
-    public void createTableMissEntry(BigInteger dpnId) {
-        setupTableMissSmacFlow(dpnId);
-        setupTableMissDmacFlow(dpnId);
-        setupTableMissArpCheckFlow(dpnId);
-        setupTableMissApResponderFlow(dpnId);
-        setupExternalL2vniTableMissFlow(dpnId);
+    public void createTableMissEntry(TypedWriteTransaction<Configuration> tx, Uint64 dpnId) {
+        setupTableMissSmacFlow(tx, dpnId);
+        setupTableMissDmacFlow(tx, dpnId);
+        setupTableMissArpCheckFlow(tx, dpnId);
+        setupTableMissApResponderFlow(tx, dpnId);
+        setupExternalL2vniTableMissFlow(tx, dpnId);
     }
 
-    private void createMulticastFlows(BigInteger dpId) {
-        createL2ControlProtocolDropFlows(dpId);
-        createMulticastPuntFlows(dpId);
+    private void createMulticastFlows(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
+        createL2ControlProtocolDropFlows(tx, dpId);
+        createMulticastPuntFlows(tx, dpId);
     }
 
-    private void createMulticastPuntFlows(BigInteger dpId) {
+    private void createMulticastPuntFlows(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
         if (puntLldpToController) {
-            createLldpFlows(dpId);
+            createLldpFlows(tx, dpId);
         }
     }
 
-    private void createLldpFlows(BigInteger dpId) {
-        createLldpFlow(dpId, ElanConstants.LLDP_DST_1, "LLDP dMac Table Flow 1");
-        createLldpFlow(dpId, ElanConstants.LLDP_DST_2, "LLDP dMac Table Flow 2");
-        createLldpFlow(dpId, ElanConstants.LLDP_DST_3, "LLDP dMac Table Flow 3");
+    private void createLldpFlows(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
+        createLldpFlow(tx, dpId, ElanConstants.LLDP_DST_1, "LLDP dMac Table Flow 1");
+        createLldpFlow(tx, dpId, ElanConstants.LLDP_DST_2, "LLDP dMac Table Flow 2");
+        createLldpFlow(tx, dpId, ElanConstants.LLDP_DST_3, "LLDP dMac Table Flow 3");
     }
 
-    private void createLldpFlow(BigInteger dpId, String dstMac, String flowName) {
+    private void createLldpFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, String dstMac,
+            String flowName) {
         List<MatchInfo> mkMatches = new ArrayList<>();
         mkMatches.add(new MatchEthernetType(ElanConstants.LLDP_ETH_TYPE));
         mkMatches.add(new MatchEthernetDestination(new MacAddress(dstMac)));
@@ -179,10 +208,10 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity lldpFlow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_DMAC_TABLE, flowId, 16, flowName, 0, 0,
                 ElanConstants.COOKIE_ELAN_KNOWN_DMAC, mkMatches, mkInstructions);
 
-        mdsalManager.installFlow(lldpFlow);
+        mdsalManager.addFlow(tx, lldpFlow);
     }
 
-    private void setupTableMissSmacFlow(BigInteger dpId) {
+    private void setupTableMissSmacFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
         List<ActionInfo> actionsInfos = new ArrayList<>();
         actionsInfos.add(new ActionPuntToController());
         actionsInfos.add(new ActionLearn(0, tempSmacLearnTimeout, 0, ElanConstants.COOKIE_ELAN_LEARNED_SMAC, 0,
@@ -204,13 +233,13 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity flowEntity = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_SMAC_TABLE,
                 getTableMissFlowRef(NwConstants.ELAN_SMAC_TABLE), 0, "ELAN sMac Table Miss Flow", 0, 0,
                 ElanConstants.COOKIE_ELAN_KNOWN_SMAC, mkMatches, mkInstructions);
-        mdsalManager.installFlow(flowEntity);
+        mdsalManager.addFlow(tx, flowEntity);
 
-        addSmacBaseTableFlow(dpId);
-        addSmacLearnedTableFlow(dpId);
+        addSmacBaseTableFlow(tx, dpId);
+        addSmacLearnedTableFlow(tx, dpId);
     }
 
-    private void addSmacBaseTableFlow(BigInteger dpId) {
+    private void addSmacBaseTableFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
         // T48 - resubmit to T49 & T50
         List<ActionInfo> actionsInfo = new ArrayList<>();
         actionsInfo.add(new ActionNxResubmit(NwConstants.ELAN_SMAC_LEARNED_TABLE));
@@ -221,25 +250,26 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity doubleResubmitTable = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_BASE_TABLE,
                 getTableMissFlowRef(NwConstants.ELAN_BASE_TABLE), 0, "Elan sMac resubmit table", 0, 0,
                 ElanConstants.COOKIE_ELAN_BASE_SMAC, mkMatch, mkInstruct);
-        mdsalManager.installFlow(doubleResubmitTable);
+        mdsalManager.addFlow(tx, doubleResubmitTable);
     }
 
-    private void addSmacLearnedTableFlow(BigInteger dpId) {
+    private void addSmacLearnedTableFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
         // T50 - match on Reg4 and goto T51
         List<MatchInfoBase> mkMatches = new ArrayList<>();
         mkMatches.add(new NxMatchRegister(NxmNxReg4.class, LEARN_MATCH_REG4_VALUE));
         List<InstructionInfo> mkInstructions = new ArrayList<>();
         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
-        String flowRef = new StringBuffer().append(NwConstants.ELAN_SMAC_TABLE).append(NwConstants.FLOWID_SEPARATOR)
+        String flowRef = new StringBuilder().append(NwConstants.ELAN_SMAC_TABLE).append(NwConstants.FLOWID_SEPARATOR)
                 .append(LEARN_MATCH_REG4_VALUE).toString();
         FlowEntity flowEntity =
                 MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_SMAC_TABLE, flowRef, 10, "ELAN sMac Table Reg4 Flow",
-                        0, 0, ElanConstants.COOKIE_ELAN_KNOWN_SMAC.add(BigInteger.valueOf(LEARN_MATCH_REG4_VALUE)),
+                        0, 0, Uint64.valueOf(ElanConstants.COOKIE_ELAN_KNOWN_SMAC.toJava()
+                            .add(BigInteger.valueOf(LEARN_MATCH_REG4_VALUE))),
                         mkMatches, mkInstructions);
-        mdsalManager.installFlow(flowEntity);
+        mdsalManager.addFlow(tx, flowEntity);
     }
 
-    private void setupTableMissDmacFlow(BigInteger dpId) {
+    private void setupTableMissDmacFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
         List<MatchInfo> mkMatches = new ArrayList<>();
 
         List<InstructionInfo> mkInstructions = new ArrayList<>();
@@ -249,10 +279,10 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                 getTableMissFlowRef(NwConstants.ELAN_DMAC_TABLE), 0, "ELAN dMac Table Miss Flow", 0, 0,
                 ElanConstants.COOKIE_ELAN_KNOWN_DMAC, mkMatches, mkInstructions);
 
-        mdsalManager.installFlow(flowEntity);
+        mdsalManager.addFlow(tx, flowEntity);
     }
 
-    private void setupExternalL2vniTableMissFlow(BigInteger dpnId) {
+    private void setupExternalL2vniTableMissFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpnId) {
         List<MatchInfo> matches = new ArrayList<>();
         List<ActionInfo> actionsInfos = Collections.singletonList(new ActionNxResubmit(NwConstants
                         .LPORT_DISPATCHER_TABLE));
@@ -261,11 +291,11 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                         getTableMissFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE), 0,
                         "External L2VNI Table Miss Flow", 0, 0,
                          ElanConstants.COOKIE_L2VNI_DEMUX, matches, instructions);
-        mdsalManager.installFlow(flowEntity);
+        mdsalManager.addFlow(tx, flowEntity);
     }
 
 
-    private void createL2ControlProtocolDropFlows(BigInteger dpId) {
+    private void createL2ControlProtocolDropFlows(TypedWriteTransaction<Configuration> tx, Uint64 dpId) {
         List<MatchInfo> mkMatches = new ArrayList<>();
         MatchEthernetDestination matchEthDst =
                 new MatchEthernetDestination(new MacAddress(ElanConstants.L2_CONTROL_PACKETS_DMAC),
@@ -285,24 +315,19 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                 "L2 control packets dMac Table Flow", 0, 0, ElanConstants.COOKIE_ELAN_KNOWN_DMAC, mkMatches,
                 mkInstructions);
 
-        mdsalManager.installFlow(flow);
+        mdsalManager.addFlow(tx, flow);
     }
 
-    private String getTableMissFlowRef(long tableId) {
-        return new StringBuffer().append(tableId).toString();
+    private static String getTableMissFlowRef(long tableId) {
+        return String.valueOf(tableId);
     }
 
-    @Override
-    protected ElanNodeListener getDataTreeChangeListener() {
-        return ElanNodeListener.this;
+    private void setupTableMissApResponderFlow(TypedWriteTransaction<Configuration> tx, final Uint64 dpnId) {
+        mdsalManager.addFlow(tx, ArpResponderUtil.getArpResponderTableMissFlow(dpnId));
     }
 
-    private void setupTableMissApResponderFlow(final BigInteger dpnId) {
-        mdsalManager.installFlow(dpnId, ArpResponderUtil.getArpResponderTableMissFlow(dpnId));
-    }
-
-    private void setupTableMissArpCheckFlow(BigInteger dpnId) {
-        mdsalManager.installFlow(dpnId,
+    private void setupTableMissArpCheckFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpnId) {
+        mdsalManager.addFlow(tx,
                 MDSALUtil.buildFlowEntity(dpnId, NwConstants.ARP_CHECK_TABLE,
                         String.valueOf("L2.ELAN." + NwConstants.ARP_CHECK_TABLE), NwConstants.TABLE_MISS_PRIORITY,
                         ArpResponderConstant.DROP_FLOW_NAME.value(), 0, 0, NwConstants.COOKIE_ARP_RESPONDER,
@@ -310,13 +335,37 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                         Collections.singletonList(new InstructionGotoTable(NwConstants.ELAN_BASE_TABLE))));
     }
 
-    private void createArpRequestMatchFlows(BigInteger dpId, WriteTransaction writeFlowTx) {
-
+    private void createArpRequestMatchFlows(Uint64 dpId, TypedReadWriteTransaction<Configuration> tx)
+            throws ExecutionException, InterruptedException {
         long arpRequestGroupId = ArpResponderUtil.retrieveStandardArpResponderGroupId(idManagerService);
         List<BucketInfo> buckets = ArpResponderUtil.getDefaultBucketInfos(NwConstants.ARP_RESPONDER_TABLE);
-        ArpResponderUtil.installGroup(mdsalManager, dpId, arpRequestGroupId,
-                ArpResponderConstant.GROUP_FLOW_NAME.value(), buckets);
+        LOG.trace("Installing group flow on dpn {}", dpId);
+        GroupEntity groupEntity =
+            MDSALUtil.buildGroupEntity(dpId, arpRequestGroupId, ArpResponderConstant.GROUP_FLOW_NAME.value(),
+                GroupTypes.GroupAll, buckets);
+        mdsalManager.addGroup(tx, groupEntity);
+        InstanceIdentifier<Group> groupIid = ElanUtils.getGroupInstanceid(dpId, arpRequestGroupId);
+        if (tx.read(groupIid).get().isPresent()) {
+            LOG.info("group {} is present in the config hence adding the flow", arpRequestGroupId);
+            createArpRequestMatchFlowsForGroup(dpId, arpRequestGroupId,tx);
+            return;
+        }
+        eventCallbacks.onAddOrUpdate(LogicalDatastoreType.CONFIGURATION,
+                ElanUtils.getGroupInstanceid(dpId, arpRequestGroupId), (unused, newGroupId) -> {
+                LOG.info("group {} added in the config", arpRequestGroupId);
+                LoggingFutures.addErrorLogging(
+                        txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION,
+                            innerConfTx -> createArpRequestMatchFlowsForGroup(dpId, arpRequestGroupId,
+                                    innerConfTx)),
+                        LOG, "Error adding flow for the group {}",arpRequestGroupId);
+                return DataTreeEventCallbackRegistrar.NextAction.UNREGISTER;
+            }, Duration.ofSeconds(5), iid -> LOG.error("arpRequestGroupId {} not found in Config datastore",
+                        arpRequestGroupId));
+
+    }
 
+    private void createArpRequestMatchFlowsForGroup(Uint64 dpId, long arpRequestGroupId,
+            TypedReadWriteTransaction<Configuration> tx) {
         FlowEntity arpReqArpCheckTbl = ArpResponderUtil.createArpDefaultFlow(dpId, NwConstants.ARP_CHECK_TABLE,
                 NwConstants.ARP_REQUEST, () -> Arrays.asList(MatchEthernetType.ARP, MatchArpOp.REQUEST), () ->
                         Arrays.asList(new ActionGroup(arpRequestGroupId),
@@ -324,25 +373,25 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                                 new ActionNxResubmit(NwConstants.ARP_LEARN_TABLE_2),
                                 new ActionNxResubmit(NwConstants.ELAN_BASE_TABLE)));
         LOG.trace("Invoking MDSAL to install Arp Rquest Match Flow for table {}", NwConstants.ARP_CHECK_TABLE);
-        mdsalManager.addFlowToTx(arpReqArpCheckTbl, writeFlowTx);
+        mdsalManager.addFlow(tx, arpReqArpCheckTbl);
     }
 
-    private void createArpResponseMatchFlows(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void createArpResponseMatchFlows(Uint64 dpId, TypedReadWriteTransaction<Configuration> tx) {
         FlowEntity arpRepArpCheckTbl = ArpResponderUtil.createArpDefaultFlow(dpId, NwConstants.ARP_CHECK_TABLE,
                 NwConstants.ARP_REPLY, () -> Arrays.asList(MatchEthernetType.ARP, MatchArpOp.REPLY), () ->
                         Arrays.asList(new ActionNxResubmit(NwConstants.ARP_LEARN_TABLE_1),
                                 new ActionNxResubmit(NwConstants.ARP_LEARN_TABLE_2),
                                 new ActionNxResubmit(NwConstants.ELAN_BASE_TABLE)));
         LOG.trace("Invoking MDSAL to install  Arp Reply Match Flow for Table {} ", NwConstants.ARP_CHECK_TABLE);
-        mdsalManager.addFlowToTx(arpRepArpCheckTbl, writeFlowTx);
+        mdsalManager.addFlow(tx, arpRepArpCheckTbl);
     }
 
-    private void createArpPuntAndLearnFlow(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void createArpPuntAndLearnFlow(Uint64 dpId, TypedReadWriteTransaction<Configuration> tx) {
         LOG.debug("adding arp punt and learn entry in table {}", NwConstants.ARP_LEARN_TABLE_1);
 
         List<MatchInfo> matches = new ArrayList<>();
         List<ActionInfo> actions = new ArrayList<>();
-        BigInteger cookie = new BigInteger("88880000", 16);
+        Uint64 cookie = Uint64.valueOf("88880000", 16).intern();
 
         matches.add(MatchEthernetType.ARP);
         actions.add(new ActionPuntToController());
@@ -352,9 +401,9 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                     Arrays.asList(
                             new MatchFromValue(NwConstants.ETHTYPE_ARP, NxmOfFieldType.NXM_OF_ETH_TYPE.getType(),
                                     NxmOfFieldType.NXM_OF_ETH_TYPE.getFlowModHeaderLenInt()),
-                            new MatchFromField(NxmOfFieldType.NXM_OF_ARP_SPA.getType(),
-                                    NxmOfFieldType.NXM_OF_ARP_SPA.getType(),
-                                    NxmOfFieldType.NXM_OF_ARP_SPA.getFlowModHeaderLenInt()),
+                            new MatchFromField(NxmOfFieldType.NXM_OF_ARP_OP.getType(),
+                                    NxmOfFieldType.NXM_OF_ARP_OP.getType(),
+                                    NxmOfFieldType.NXM_OF_ARP_OP.getFlowModHeaderLenInt()),
                             new MatchFromField(NxmOfFieldType.NXM_OF_ARP_TPA.getType(),
                                     NxmOfFieldType.NXM_OF_ARP_TPA.getType(),
                                     NxmOfFieldType.NXM_OF_ARP_TPA.getFlowModHeaderLenInt()),
@@ -369,9 +418,9 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
                     Arrays.asList(
                             new MatchFromValue(NwConstants.ETHTYPE_ARP, NxmOfFieldType.NXM_OF_ETH_TYPE.getType(),
                                     NxmOfFieldType.NXM_OF_ETH_TYPE.getFlowModHeaderLenInt()),
-                            new MatchFromField(NxmOfFieldType.NXM_OF_ARP_SPA.getType(),
-                                    NxmOfFieldType.NXM_OF_ARP_TPA.getType(),
-                                    NxmOfFieldType.NXM_OF_ARP_SPA.getFlowModHeaderLenInt()),
+                            new MatchFromField(NxmOfFieldType.NXM_OF_ARP_OP.getType(),
+                                    NxmOfFieldType.NXM_OF_ARP_OP.getType(),
+                                    NxmOfFieldType.NXM_OF_ARP_OP.getFlowModHeaderLenInt()),
                             new MatchFromField(NxmOfFieldType.NXM_OF_ARP_TPA.getType(),
                                     NxmOfFieldType.NXM_OF_ARP_SPA.getType(),
                                     NxmOfFieldType.NXM_OF_ARP_TPA.getFlowModHeaderLenInt()),
@@ -387,10 +436,10 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity flow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ARP_LEARN_TABLE_1, flowid,
                 NwConstants.TABLE_MISS_PRIORITY, "arp punt/learn flow", 0,
                 0, cookie, matches, instructions);
-        mdsalManager.addFlowToTx(flow, writeFlowTx);
+        mdsalManager.addFlow(tx, flow);
     }
 
-    private void addGarpLearnMatchFlow(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void addGarpLearnMatchFlow(Uint64 dpId, TypedReadWriteTransaction<Configuration> tx) {
         List<ActionInfo> actions = new ArrayList<>();
         List<MatchInfoBase> matches = new ArrayList<>();
 
@@ -408,10 +457,10 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity garpFlow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_BASE_TABLE, flowid,
                 NwConstants.DEFAULT_ARP_FLOW_PRIORITY, "GARP learn match flow", 0, 0,
                 ElanConstants.COOKIE_ELAN_BASE_SMAC, matches, instructions);
-        mdsalManager.addFlowToTx(garpFlow, writeFlowTx);
+        mdsalManager.addFlow(tx, garpFlow);
     }
 
-    private void addArpLearnMatchFlow(BigInteger dpId, WriteTransaction writeFlowTx) {
+    private void addArpLearnMatchFlow(Uint64 dpId, TypedReadWriteTransaction<Configuration> tx) {
         List<ActionInfo> actions = new ArrayList<>();
         List<MatchInfoBase> matches = new ArrayList<>();
 
@@ -428,7 +477,7 @@ public class ElanNodeListener extends AsyncDataTreeChangeListenerBase<Node, Elan
         FlowEntity arpFlow = MDSALUtil.buildFlowEntity(dpId, NwConstants.ELAN_BASE_TABLE, flowid,
                 NwConstants.DEFAULT_ARP_FLOW_PRIORITY, "ARP learn match flow", 0, 0,
                 ElanConstants.COOKIE_ELAN_BASE_SMAC, matches, instructions);
-        mdsalManager.addFlowToTx(arpFlow, writeFlowTx);
+        mdsalManager.addFlow(tx, arpFlow);
     }
 
 }