elanmanager: drop nullToEmpty and reqNonNullOrElse
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / evpn / utils / EvpnUtils.java
1 /*
2  * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.evpn.utils;
9
10 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
11
12 import com.google.common.base.Optional;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.math.BigInteger;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Future;
22 import java.util.function.BiConsumer;
23 import java.util.function.BiPredicate;
24 import java.util.function.Function;
25 import java.util.function.Predicate;
26 import javax.annotation.Nullable;
27 import javax.inject.Inject;
28 import javax.inject.Singleton;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
35 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
36 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
37 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
38 import org.opendaylight.genius.itm.globals.ITMConstants;
39 import org.opendaylight.genius.mdsalutil.FlowEntity;
40 import org.opendaylight.genius.mdsalutil.InstructionInfo;
41 import org.opendaylight.genius.mdsalutil.MDSALUtil;
42 import org.opendaylight.genius.mdsalutil.MatchInfo;
43 import org.opendaylight.genius.mdsalutil.NWUtil;
44 import org.opendaylight.genius.mdsalutil.NwConstants;
45 import org.opendaylight.genius.mdsalutil.instructions.InstructionGotoTable;
46 import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
47 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
48 import org.opendaylight.genius.utils.ServiceIndex;
49 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
50 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
51 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
52 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
53 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
54 import org.opendaylight.netvirt.elan.utils.ElanConstants;
55 import org.opendaylight.netvirt.elan.utils.ElanUtils;
56 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
57 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
58 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.ExternalTunnelList;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.external.tunnel.list.ExternalTunnel;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.DcGatewayIpList;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.dc.gateway.ip.list.DcGatewayIp;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsInputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsOutput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.EvpnAugmentation;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
74 import org.opendaylight.yangtools.yang.binding.DataObject;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.opendaylight.yangtools.yang.common.RpcResult;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80 @Singleton
81 public class EvpnUtils {
82
83     private static final Logger LOG = LoggerFactory.getLogger(EvpnUtils.class);
84
85     private final BiPredicate<String, String> isNetAttach = (var1, var2) -> (var1 == null && var2 != null);
86     private final BiPredicate<String, String> isNetDetach = (var1, var2) -> (var1 != null && var2 == null);
87     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
88         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
89     private final DataBroker broker;
90     private final ManagedNewTransactionRunner txRunner;
91     private final IInterfaceManager interfaceManager;
92     private final ElanUtils elanUtils;
93     private final ItmRpcService itmRpcService;
94     private final JobCoordinator jobCoordinator;
95     private final IBgpManager bgpManager;
96     private final IVpnManager vpnManager;
97     private final ElanInstanceCache elanInstanceCache;
98
99     @Inject
100     public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
101             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
102             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
103         this.broker = broker;
104         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
105         this.interfaceManager = interfaceManager;
106         this.elanUtils = elanUtils;
107         this.itmRpcService = itmRpcService;
108         this.vpnManager = vpnManager;
109         this.bgpManager = bgpManager;
110         this.jobCoordinator = jobCoordinator;
111         this.elanInstanceCache = elanInstanceCache;
112     }
113
114     public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
115         return isNetDetach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
116     }
117
118     public boolean isAdvertiseEvpnRT2Routes(ElanInstance original, ElanInstance update) {
119         return isNetAttach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
120     }
121
122     @SuppressWarnings("checkstyle:IllegalCatch")
123     public void advertiseEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName)  {
124         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
125             return;
126         }
127         String evpnName = evpnAugmentation.getEvpnName();
128         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
129         if (macEntries == null || macEntries.isEmpty()) {
130             LOG.trace("advertiseEvpnRT2Routes no elan mac entries found for {}", elanName);
131             return;
132         }
133         String rd = vpnManager.getVpnRd(broker, evpnName);
134         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
135         macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
136             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
137             if (interfaceInfo == null) {
138                 LOG.debug("advertiseEvpnRT2Routes, interfaceInfo is null for interface {}", macEntry.getInterface());
139                 return;
140             }
141             advertisePrefix(elanInfo, rd, macEntry.getMacAddress().getValue(),
142                     macEntry.getIpPrefix().getIpv4Address().getValue(),
143                     interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
144         });
145     }
146
147     @Nullable
148     public String getEndpointIpAddressForDPN(BigInteger dpnId) {
149
150         Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
151                 new GetDpnEndpointIpsInputBuilder()
152                         .setSourceDpid(dpnId)
153                         .build());
154         RpcResult<GetDpnEndpointIpsOutput> rpcResult = null;
155         try {
156             rpcResult = result.get();
157         } catch (InterruptedException e) {
158             LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", dpnId, e);
159             return null;
160         } catch (ExecutionException e) {
161             LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", dpnId, e);
162             return null;
163         }
164         if (!rpcResult.isSuccessful()) {
165             LOG.warn("RPC Call to getDpnEndpointIps returned with Errors {}", rpcResult.getErrors());
166             return null;
167         }
168
169         List<IpAddress> nexthopIpList = rpcResult.getResult().getNexthopipList();
170         return nexthopIpList.get(0).getIpv4Address().getValue();
171     }
172
173     public Optional<String> getGatewayMacAddressForInterface(String vpnName,
174                                                                                     String ifName, String ipAddress) {
175         VpnPortipToPort gwPort = vpnManager.getNeutronPortFromVpnPortFixedIp(broker, vpnName, ipAddress);
176         return Optional.of(gwPort != null && gwPort.isSubnetIp()
177                 ? gwPort.getMacAddress()
178                 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
179     }
180
181     @Nullable
182     public String getL3vpnNameFromElan(ElanInstance elanInfo) {
183         if (elanInfo == null) {
184             LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
185             return null;
186         }
187         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
188         return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
189     }
190
191     @Nullable
192     public static String getEvpnNameFromElan(ElanInstance elanInfo) {
193         if (elanInfo == null) {
194             LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
195             return null;
196         }
197         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
198         return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
199     }
200
201     @Nullable
202     public String getEvpnRd(ElanInstance elanInfo) {
203         String evpnName = getEvpnNameFromElan(elanInfo);
204         if (evpnName == null) {
205             LOG.debug("getEvpnRd : evpnName is NULL for elanInfo {}", elanInfo);
206             return null;
207         }
208         return vpnManager.getVpnRd(broker, evpnName);
209     }
210
211     public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
212                                  String interfaceName, BigInteger dpnId) {
213         String rd = getEvpnRd(elanInfo);
214         advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
215     }
216
217     @SuppressWarnings("checkstyle:IllegalCatch")
218     public void advertisePrefix(ElanInstance elanInfo, String rd,
219                                  String macAddress, String prefix, String interfaceName, BigInteger dpnId) {
220         if (rd == null) {
221             LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
222             return;
223         }
224         String nextHop = getEndpointIpAddressForDPN(dpnId);
225         if (nextHop == null) {
226             LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
227             return;
228         }
229         int vpnLabel = 0;
230         long l2vni = elanUtils.getVxlanSegmentationId(elanInfo);
231         long l3vni = 0;
232         String gatewayMacAddr = null;
233         String l3VpName = getL3vpnNameFromElan(elanInfo);
234         if (l3VpName != null) {
235             VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
236             l3vni = l3VpnInstance.getL3vni();
237             com.google.common.base.Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
238                     interfaceName, prefix);
239             gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
240
241         }
242         LOG.info("Advertising routes with rd {},  macAddress {}, prefix {}, nextHop {},"
243                         + " vpnLabel {}, l3vni {}, l2vni {}, gatewayMac {}", rd, macAddress, prefix, nextHop,
244                 vpnLabel, l3vni, l2vni, gatewayMacAddr);
245         try {
246             bgpManager.advertisePrefix(rd, macAddress, prefix, nextHop,
247                     VrfEntryBase.EncapType.Vxlan, vpnLabel, l3vni, l2vni, gatewayMacAddr);
248         } catch (Exception e) {
249             LOG.error("Failed to advertisePrefix", e);
250         }
251     }
252
253     public void advertisePrefix(ElanInstance elanInfo, MacEntry macEntry) {
254         InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
255         if (interfaceInfo == null) {
256             LOG.debug("advertisePrefix, interfaceInfo is null for interface {}", macEntry.getInterface());
257             return;
258         }
259
260         if (!isIpv4PrefixAvailable.test(macEntry)) {
261             LOG.debug("advertisePrefix macEntry does not have IPv4 prefix {}", macEntry);
262             return;
263         }
264         advertisePrefix(elanInfo, macEntry.getMacAddress().getValue(),
265                 macEntry.getIpPrefix().getIpv4Address().getValue(),
266                 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
267     }
268
269     public void withdrawEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
270         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
271             LOG.trace("withdrawEvpnRT2Routes, evpnAugmentation is null");
272             return;
273         }
274
275         String evpnName = evpnAugmentation.getEvpnName();
276         String rd = vpnManager.getVpnRd(broker, evpnName);
277         if (rd == null) {
278             LOG.debug("withdrawEvpnRT2Routes : rd is null for {}", elanName);
279             return;
280         }
281         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
282         if (macEntries == null || macEntries.isEmpty()) {
283             LOG.debug("withdrawEvpnRT2Routes : macEntries  is empty for elan {} ", elanName);
284             return;
285         }
286         for (MacEntry macEntry : macEntries) {
287             if (!isIpv4PrefixAvailable.test(macEntry)) {
288                 LOG.debug("withdrawEvpnRT2Routes macEntry does not have IPv4 prefix {}", macEntry);
289                 continue;
290             }
291             String prefix = macEntry.getIpPrefix().getIpv4Address().getValue();
292             LOG.info("Withdrawing routes with rd {}, prefix {}", rd, prefix);
293             bgpManager.withdrawPrefix(rd, prefix);
294         }
295     }
296
297     public void withdrawPrefix(ElanInstance elanInfo, String prefix) {
298         String rd = getEvpnRd(elanInfo);
299         if (rd == null) {
300             return;
301         }
302         bgpManager.withdrawPrefix(rd, prefix);
303     }
304
305     public void withdrawPrefix(ElanInstance elanInfo, MacEntry macEntry) {
306         if (!isIpv4PrefixAvailable.test(macEntry)) {
307             LOG.debug("withdrawPrefix macEntry does not have IPv4 prefix {}", macEntry);
308             return;
309         }
310         withdrawPrefix(elanInfo, macEntry.getIpPrefix().getIpv4Address().getValue());
311     }
312
313     public static InstanceIdentifier<ExternalTunnelList> getExternaTunnelListIdentifier() {
314         return InstanceIdentifier
315                 .builder(ExternalTunnelList.class).build();
316     }
317
318     public Optional<ExternalTunnelList> getExternalTunnelList() {
319         InstanceIdentifier<ExternalTunnelList> externalTunnelListId = getExternaTunnelListIdentifier();
320         ExternalTunnelList externalTunnelList = null;
321         try {
322             externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
323                     externalTunnelListId).orNull();
324         } catch (ReadFailedException e) {
325             LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
326         }
327         return Optional.fromNullable(externalTunnelList);
328     }
329
330     public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
331         return InstanceIdentifier
332                 .builder(DcGatewayIpList.class).build();
333     }
334
335     public Optional<DcGatewayIpList> getDcGatewayIpList() {
336         InstanceIdentifier<DcGatewayIpList> dcGatewayIpListInstanceIdentifier = getDcGatewayIpListIdentifier();
337         DcGatewayIpList dcGatewayIpListConfig = null;
338         try {
339             dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
340                     dcGatewayIpListInstanceIdentifier).orNull();
341         } catch (ReadFailedException e) {
342             LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
343         }
344         return Optional.fromNullable(dcGatewayIpListConfig);
345     }
346
347     public List<String> getDcGatewayTunnelInterfaceNameList() {
348         final List<String> tunnelInterfaceNameList = new ArrayList<>();
349         Optional<DcGatewayIpList> dcGatewayIpListOptional = getDcGatewayIpList();
350         if (!dcGatewayIpListOptional.isPresent()) {
351             LOG.info("No DC gateways configured while programming the l2vni table.");
352             return tunnelInterfaceNameList;
353         }
354         List<DcGatewayIp> dcGatewayIps = dcGatewayIpListOptional.get().nonnullDcGatewayIp();
355
356         Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
357         if (!externalTunnelListOptional.isPresent()) {
358             LOG.info("No External Tunnel Configured while programming the l2vni table.");
359             return tunnelInterfaceNameList;
360         }
361         List<ExternalTunnel> externalTunnels = externalTunnelListOptional.get().nonnullExternalTunnel();
362
363         dcGatewayIps.forEach(dcIp -> externalTunnels
364                 .stream()
365                 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
366                         .contains(dcIp.getIpAddress().getIpv4Address().toString()))
367                 .forEach(externalTunnel -> tunnelInterfaceNameList.add(externalTunnel.getTunnelInterfaceName())));
368
369         return tunnelInterfaceNameList;
370     }
371
372     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
373         ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
374             int instructionKey = 0;
375             LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
376             List<Instruction> instructions = new ArrayList<>();
377             instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
378                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
379             short elanServiceIndex =
380                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
381             BoundServices serviceInfo = ElanUtils.getBoundServices(
382                     ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
383                     NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
384             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
385             if (!tx.read(bindServiceId).get().isPresent()) {
386                 tx.put(bindServiceId, serviceInfo, WriteTransaction.CREATE_MISSING_PARENTS);
387             }
388         }), LOG, "Error binding an ELAN service to an external tunnel");
389     }
390
391     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
392         ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
393             LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
394             short elanServiceIndex =
395                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
396             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
397             if (tx.read(bindServiceId).get().isPresent()) {
398                 tx.delete(bindServiceId);
399             }
400         }), LOG, "Error binding an ELAN service to an external tunnel");
401     }
402
403     private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
404         List<InstructionInfo> mkInstructions = new ArrayList<>();
405         mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag, false),
406                 ElanHelper.getElanMetadataMask()));
407         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
408         return mkInstructions;
409     }
410
411     private String getFlowRef(long tableId, long elanTag, BigInteger dpnId) {
412         return new StringBuilder().append(tableId).append(elanTag).append(dpnId).toString();
413     }
414
415     private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<BigInteger, FlowEntity> flowHandler) {
416         long elanTag = elanInfo.getElanTag();
417         List<MatchInfo> mkMatches = new ArrayList<>();
418         mkMatches.add(new MatchTunnelId(BigInteger.valueOf(elanUtils.getVxlanSegmentationId(elanInfo))));
419         NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
420             LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
421             List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
422             String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag, dpnId);
423             FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
424                     dpnId,
425                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
426                     flowRef,
427                     5, // prio
428                     elanInfo.getElanInstanceName(), // flowName
429                     0, // idleTimeout
430                     0, // hardTimeout
431                     ITMConstants.COOKIE_ITM_EXTERNAL.add(BigInteger.valueOf(elanTag)),
432                     mkMatches,
433                     instructions);
434             flowHandler.accept(dpnId, flowEntity);
435         });
436     }
437
438     public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
439                                            BiConsumer<BigInteger, FlowEntity> flowHandler) {
440         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
441         List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
442         if (tunnelInterfaceNameList.isEmpty()) {
443             LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
444             return;
445         }
446
447         tunnelInterfaceNameList.forEach(tunnelInterfaceName -> serviceHandler.accept(elanName, tunnelInterfaceName));
448         programEvpnL2vniFlow(elanInfo, flowHandler);
449     }
450
451     @SuppressWarnings({ "unchecked", "rawtypes" })
452     public <T extends DataObject> void asyncReadAndExecute(final LogicalDatastoreType datastoreType,
453             final InstanceIdentifier<T> iid, final String jobKey, final Function<Optional<T>, Void> function) {
454         jobCoordinator.enqueueJob(jobKey, () -> {
455             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
456             List futures = Collections.singletonList(settableFuture);
457
458             try (ReadOnlyTransaction tx = broker.newReadOnlyTransaction()) {
459                 Futures.addCallback(tx.read(datastoreType, iid),
460                         new SettableFutureCallback<Optional<T>>(settableFuture) {
461                             @Override
462                             public void onSuccess(Optional<T> data) {
463                                 function.apply(data);
464                                 super.onSuccess(data);
465                             }
466                         }, MoreExecutors.directExecutor());
467
468                 return futures;
469             }
470         }, ElanConstants.JOB_MAX_RETRIES);
471     }
472 }