01970cbcbd8bb896e7cd652f6865f917cf2badbf
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / evpn / utils / EvpnUtils.java
1 /*
2  * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.evpn.utils;
9
10 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
11
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.Optional;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Future;
21 import java.util.function.BiConsumer;
22 import java.util.function.BiPredicate;
23 import java.util.function.Function;
24 import java.util.function.Predicate;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
29 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
30 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
31 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
32 import org.opendaylight.genius.itm.globals.ITMConstants;
33 import org.opendaylight.genius.mdsalutil.FlowEntity;
34 import org.opendaylight.genius.mdsalutil.InstructionInfo;
35 import org.opendaylight.genius.mdsalutil.MDSALUtil;
36 import org.opendaylight.genius.mdsalutil.MatchInfo;
37 import org.opendaylight.genius.mdsalutil.NWUtil;
38 import org.opendaylight.genius.mdsalutil.NwConstants;
39 import org.opendaylight.genius.mdsalutil.instructions.InstructionGotoTable;
40 import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
41 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
42 import org.opendaylight.genius.utils.ServiceIndex;
43 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
44 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
45 import org.opendaylight.mdsal.binding.api.DataBroker;
46 import org.opendaylight.mdsal.binding.api.ReadTransaction;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
49 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
50 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
51 import org.opendaylight.netvirt.elan.utils.ElanConstants;
52 import org.opendaylight.netvirt.elan.utils.ElanUtils;
53 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
54 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
55 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.ExternalTunnelList;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.external.tunnel.list.ExternalTunnel;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.DcGatewayIpList;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.dc.gateway.ip.list.DcGatewayIp;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsInputBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsOutput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.EvpnAugmentation;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.l3vpn.rev200204.vpn.instances.VpnInstance;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
71 import org.opendaylight.yangtools.yang.binding.DataObject;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.opendaylight.yangtools.yang.common.Uint32;
75 import org.opendaylight.yangtools.yang.common.Uint64;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 @Singleton
80 public class EvpnUtils {
81
82     private static final Logger LOG = LoggerFactory.getLogger(EvpnUtils.class);
83
84     private final BiPredicate<String, String> isNetAttach = (var1, var2) -> (var1 == null && var2 != null);
85     private final BiPredicate<String, String> isNetDetach = (var1, var2) -> (var1 != null && var2 == null);
86     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
87         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
88     private final DataBroker broker;
89     private final ManagedNewTransactionRunner txRunner;
90     private final IInterfaceManager interfaceManager;
91     private final ElanUtils elanUtils;
92     private final ItmRpcService itmRpcService;
93     private final JobCoordinator jobCoordinator;
94     private final IBgpManager bgpManager;
95     private final IVpnManager vpnManager;
96     private final ElanInstanceCache elanInstanceCache;
97
98     @Inject
99     public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
100             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
101             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
102         this.broker = broker;
103         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
104         this.interfaceManager = interfaceManager;
105         this.elanUtils = elanUtils;
106         this.itmRpcService = itmRpcService;
107         this.vpnManager = vpnManager;
108         this.bgpManager = bgpManager;
109         this.jobCoordinator = jobCoordinator;
110         this.elanInstanceCache = elanInstanceCache;
111     }
112
113     public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
114         return isNetDetach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
115     }
116
117     public boolean isAdvertiseEvpnRT2Routes(ElanInstance original, ElanInstance update) {
118         return isNetAttach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
119     }
120
121     @SuppressWarnings("checkstyle:IllegalCatch")
122     public void advertiseEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName)  {
123         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
124             return;
125         }
126         String evpnName = evpnAugmentation.getEvpnName();
127         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
128         if (macEntries == null || macEntries.isEmpty()) {
129             LOG.trace("advertiseEvpnRT2Routes no elan mac entries found for {}", elanName);
130             return;
131         }
132         String rd = vpnManager.getVpnRd(broker, evpnName);
133         ElanInstance elanInfo = elanInstanceCache.get(elanName).orElse(null);
134         macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
135             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
136             if (interfaceInfo == null) {
137                 LOG.debug("advertiseEvpnRT2Routes, interfaceInfo is null for interface {}", macEntry.getInterface());
138                 return;
139             }
140             advertisePrefix(elanInfo, rd, macEntry.getMacAddress().getValue(),
141                     macEntry.getIpPrefix().getIpv4Address().getValue(),
142                     interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
143         });
144     }
145
146     @Nullable
147     public String getEndpointIpAddressForDPN(Uint64 dpnId) {
148
149         Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
150                 new GetDpnEndpointIpsInputBuilder()
151                         .setSourceDpid(dpnId)
152                         .build());
153         RpcResult<GetDpnEndpointIpsOutput> rpcResult = null;
154         try {
155             rpcResult = result.get();
156         } catch (InterruptedException e) {
157             LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", dpnId, e);
158             return null;
159         } catch (ExecutionException e) {
160             LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", dpnId, e);
161             return null;
162         }
163         if (!rpcResult.isSuccessful()) {
164             LOG.warn("RPC Call to getDpnEndpointIps returned with Errors {}", rpcResult.getErrors());
165             return null;
166         }
167
168         List<IpAddress> nexthopIpList = rpcResult.getResult().getNexthopipList();
169         return nexthopIpList.get(0).getIpv4Address().getValue();
170     }
171
172     public Optional<String> getGatewayMacAddressForInterface(String vpnName,
173                                                                                     String ifName, String ipAddress) {
174         VpnPortipToPort gwPort = vpnManager.getNeutronPortFromVpnPortFixedIp(broker, vpnName, ipAddress);
175         return Optional.of(gwPort != null && gwPort.isSubnetIp()
176                 ? gwPort.getMacAddress()
177                 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
178     }
179
180     @Nullable
181     public String getL3vpnNameFromElan(ElanInstance elanInfo) {
182         if (elanInfo == null) {
183             LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
184             return null;
185         }
186         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
187         return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
188     }
189
190     @Nullable
191     public static String getEvpnNameFromElan(ElanInstance elanInfo) {
192         if (elanInfo == null) {
193             LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
194             return null;
195         }
196         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
197         return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
198     }
199
200     @Nullable
201     public String getEvpnRd(ElanInstance elanInfo) {
202         String evpnName = getEvpnNameFromElan(elanInfo);
203         if (evpnName == null) {
204             LOG.debug("getEvpnRd : evpnName is NULL for elanInfo {}", elanInfo);
205             return null;
206         }
207         return vpnManager.getVpnRd(broker, evpnName);
208     }
209
210     public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
211                                  String interfaceName, Uint64 dpnId) {
212         String rd = getEvpnRd(elanInfo);
213         advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
214     }
215
216     @SuppressWarnings("checkstyle:IllegalCatch")
217     public void advertisePrefix(ElanInstance elanInfo, String rd,
218                                  String macAddress, String prefix, String interfaceName, Uint64 dpnId) {
219         if (rd == null) {
220             LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
221             return;
222         }
223         String nextHop = getEndpointIpAddressForDPN(dpnId);
224         if (nextHop == null) {
225             LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
226             return;
227         }
228         Uint32 vpnLabel = Uint32.ZERO;
229         Uint32 l2vni = ElanUtils.getVxlanSegmentationId(elanInfo);
230         Uint32 l3vni = Uint32.ZERO;
231         String gatewayMacAddr = null;
232         String l3VpName = getL3vpnNameFromElan(elanInfo);
233         if (l3VpName != null) {
234             VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
235             l3vni = l3VpnInstance.getL3vni();
236             Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
237                     interfaceName, prefix);
238             gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
239
240         }
241         LOG.info("Advertising routes with rd {},  macAddress {}, prefix {}, nextHop {},"
242                         + " vpnLabel {}, l3vni {}, l2vni {}, gatewayMac {}", rd, macAddress, prefix, nextHop,
243                 vpnLabel, l3vni, l2vni, gatewayMacAddr);
244         try {
245             bgpManager.advertisePrefix(rd, macAddress, prefix, nextHop,
246                     VrfEntryBase.EncapType.Vxlan, vpnLabel, l3vni, l2vni, gatewayMacAddr);
247         } catch (Exception e) {
248             LOG.error("Failed to advertisePrefix", e);
249         }
250     }
251
252     public void advertisePrefix(ElanInstance elanInfo, MacEntry macEntry) {
253         InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
254         if (interfaceInfo == null) {
255             LOG.debug("advertisePrefix, interfaceInfo is null for interface {}", macEntry.getInterface());
256             return;
257         }
258
259         if (!isIpv4PrefixAvailable.test(macEntry)) {
260             LOG.debug("advertisePrefix macEntry does not have IPv4 prefix {}", macEntry);
261             return;
262         }
263         advertisePrefix(elanInfo, macEntry.getMacAddress().getValue(),
264                 macEntry.getIpPrefix().getIpv4Address().getValue(),
265                 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
266     }
267
268     public void withdrawEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
269         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
270             LOG.trace("withdrawEvpnRT2Routes, evpnAugmentation is null");
271             return;
272         }
273
274         String evpnName = evpnAugmentation.getEvpnName();
275         String rd = vpnManager.getVpnRd(broker, evpnName);
276         if (rd == null) {
277             LOG.debug("withdrawEvpnRT2Routes : rd is null for {}", elanName);
278             return;
279         }
280         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
281         if (macEntries == null || macEntries.isEmpty()) {
282             LOG.debug("withdrawEvpnRT2Routes : macEntries  is empty for elan {} ", elanName);
283             return;
284         }
285         for (MacEntry macEntry : macEntries) {
286             if (!isIpv4PrefixAvailable.test(macEntry)) {
287                 LOG.debug("withdrawEvpnRT2Routes macEntry does not have IPv4 prefix {}", macEntry);
288                 continue;
289             }
290             String prefix = macEntry.getIpPrefix().getIpv4Address().getValue();
291             LOG.info("Withdrawing routes with rd {}, prefix {}", rd, prefix);
292             bgpManager.withdrawPrefix(rd, prefix);
293         }
294     }
295
296     public void withdrawPrefix(ElanInstance elanInfo, String prefix) {
297         String rd = getEvpnRd(elanInfo);
298         if (rd == null) {
299             return;
300         }
301         bgpManager.withdrawPrefix(rd, prefix);
302     }
303
304     public void withdrawPrefix(ElanInstance elanInfo, MacEntry macEntry) {
305         if (!isIpv4PrefixAvailable.test(macEntry)) {
306             LOG.debug("withdrawPrefix macEntry does not have IPv4 prefix {}", macEntry);
307             return;
308         }
309         withdrawPrefix(elanInfo, macEntry.getIpPrefix().getIpv4Address().getValue());
310     }
311
312     public static InstanceIdentifier<ExternalTunnelList> getExternaTunnelListIdentifier() {
313         return InstanceIdentifier
314                 .builder(ExternalTunnelList.class).build();
315     }
316
317     public Optional<ExternalTunnelList> getExternalTunnelList() {
318         InstanceIdentifier<ExternalTunnelList> externalTunnelListId = getExternaTunnelListIdentifier();
319         ExternalTunnelList externalTunnelList = null;
320         try {
321             externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
322                     externalTunnelListId).orElse(null);
323         } catch (InterruptedException | ExecutionException e) {
324             LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
325         }
326         return Optional.ofNullable(externalTunnelList);
327     }
328
329     public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
330         return InstanceIdentifier
331                 .builder(DcGatewayIpList.class).build();
332     }
333
334     public Optional<DcGatewayIpList> getDcGatewayIpList() {
335         InstanceIdentifier<DcGatewayIpList> dcGatewayIpListInstanceIdentifier = getDcGatewayIpListIdentifier();
336         DcGatewayIpList dcGatewayIpListConfig = null;
337         try {
338             dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
339                     dcGatewayIpListInstanceIdentifier).orElse(null);
340         } catch (InterruptedException | ExecutionException e) {
341             LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
342         }
343         return Optional.ofNullable(dcGatewayIpListConfig);
344     }
345
346     public List<String> getDcGatewayTunnelInterfaceNameList() {
347         final List<String> tunnelInterfaceNameList = new ArrayList<>();
348         Optional<DcGatewayIpList> dcGatewayIpListOptional = getDcGatewayIpList();
349         if (!dcGatewayIpListOptional.isPresent()) {
350             LOG.info("No DC gateways configured while programming the l2vni table.");
351             return tunnelInterfaceNameList;
352         }
353         List<DcGatewayIp> dcGatewayIps
354                 = new ArrayList<DcGatewayIp>(dcGatewayIpListOptional.get().nonnullDcGatewayIp().values());
355
356         Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
357         if (!externalTunnelListOptional.isPresent()) {
358             LOG.info("No External Tunnel Configured while programming the l2vni table.");
359             return tunnelInterfaceNameList;
360         }
361         List<ExternalTunnel> externalTunnels
362                 = new ArrayList<ExternalTunnel>(externalTunnelListOptional.get().nonnullExternalTunnel().values());
363
364         dcGatewayIps.forEach(dcIp -> externalTunnels
365                 .stream()
366                 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
367                         .contains(dcIp.getIpAddress().getIpv4Address().toString()))
368                 .forEach(externalTunnel -> tunnelInterfaceNameList.add(externalTunnel.getTunnelInterfaceName())));
369
370         return tunnelInterfaceNameList;
371     }
372
373     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
374         LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
375             int instructionKey = 0;
376             LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
377             List<Instruction> instructions = new ArrayList<>();
378             instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
379                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
380             short elanServiceIndex =
381                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
382             BoundServices serviceInfo = ElanUtils.getBoundServices(
383                     ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
384                     NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
385             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
386             if (!tx.read(bindServiceId).get().isPresent()) {
387                 tx.mergeParentStructurePut(bindServiceId, serviceInfo);
388             }
389         }), LOG, "Error binding an ELAN service to an external tunnel");
390     }
391
392     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
393         LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
394             LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
395             short elanServiceIndex =
396                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
397             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
398             if (tx.read(bindServiceId).get().isPresent()) {
399                 tx.delete(bindServiceId);
400             }
401         }), LOG, "Error binding an ELAN service to an external tunnel");
402     }
403
404     private static List<InstructionInfo> getInstructionsForExtTunnelTable(Uint32 elanTag) {
405         List<InstructionInfo> mkInstructions = new ArrayList<>();
406         mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag.longValue(), false),
407                 ElanHelper.getElanMetadataMask()));
408         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
409         return mkInstructions;
410     }
411
412     private static String getFlowRef(long tableId, long elanTag, Uint64 dpnId) {
413         return new StringBuilder().append(tableId).append(elanTag).append(dpnId.toString()).toString();
414     }
415
416     private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<Uint64, FlowEntity> flowHandler) {
417         Uint32 elanTag = elanInfo.getElanTag();
418         List<MatchInfo> mkMatches = new ArrayList<>();
419         mkMatches.add(new MatchTunnelId(Uint64.valueOf(ElanUtils.getVxlanSegmentationId(elanInfo).longValue())));
420         try {
421             NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
422                 LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
423                 List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
424                 String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag.longValue(), dpnId);
425                 FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
426                         dpnId,
427                         NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
428                         flowRef,
429                         5, // prio
430                         elanInfo.getElanInstanceName(), // flowName
431                         0, // idleTimeout
432                         0, // hardTimeout
433                         Uint64.valueOf(ITMConstants.COOKIE_ITM_EXTERNAL.longValue() + elanTag.longValue()),
434                         mkMatches,
435                         instructions);
436                 flowHandler.accept(dpnId, flowEntity);
437             });
438         } catch (ExecutionException | InterruptedException e) {
439             LOG.error("programEvpnL2vniFlow: Exception while programming Evpn L2vni flow for elanInstance {}",
440                     elanInfo, e);
441         }
442     }
443
444     public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
445                                            BiConsumer<Uint64, FlowEntity> flowHandler) {
446         ElanInstance elanInfo = elanInstanceCache.get(elanName).orElse(null);
447         List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
448         if (tunnelInterfaceNameList.isEmpty()) {
449             LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
450             return;
451         }
452
453         tunnelInterfaceNameList.forEach(tunnelInterfaceName -> serviceHandler.accept(elanName, tunnelInterfaceName));
454         programEvpnL2vniFlow(elanInfo, flowHandler);
455     }
456
457     @SuppressWarnings({ "unchecked", "rawtypes" })
458     public <T extends DataObject> void asyncReadAndExecute(final LogicalDatastoreType datastoreType,
459             final InstanceIdentifier<T> iid, final String jobKey, final Function<Optional<T>, Void> function) {
460         jobCoordinator.enqueueJob(jobKey, () -> {
461             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
462             List futures = Collections.singletonList(settableFuture);
463
464             try (ReadTransaction tx = broker.newReadOnlyTransaction()) {
465                 Futures.addCallback(tx.read(datastoreType, iid),
466                         new SettableFutureCallback<Optional<T>>(settableFuture) {
467                             @Override
468                             public void onSuccess(Optional<T> data) {
469                                 function.apply(data);
470                                 super.onSuccess(data);
471                             }
472                         }, MoreExecutors.directExecutor());
473
474                 return futures;
475             }
476         }, ElanConstants.JOB_MAX_RETRIES);
477     }
478 }