1c0f2ca23a057391d6d9a97f4f10663113636b78
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / evpn / utils / EvpnUtils.java
1 /*
2  * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.evpn.utils;
9
10 import static java.util.Collections.emptyList;
11 import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
12 import static org.opendaylight.netvirt.elan.utils.ElanUtils.requireNonNullElse;
13
14 import com.google.common.base.Optional;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.function.BiConsumer;
25 import java.util.function.BiPredicate;
26 import java.util.function.Function;
27 import java.util.function.Predicate;
28 import javax.annotation.Nullable;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
36 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
37 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
38 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
39 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
40 import org.opendaylight.genius.itm.globals.ITMConstants;
41 import org.opendaylight.genius.mdsalutil.FlowEntity;
42 import org.opendaylight.genius.mdsalutil.InstructionInfo;
43 import org.opendaylight.genius.mdsalutil.MDSALUtil;
44 import org.opendaylight.genius.mdsalutil.MatchInfo;
45 import org.opendaylight.genius.mdsalutil.NWUtil;
46 import org.opendaylight.genius.mdsalutil.NwConstants;
47 import org.opendaylight.genius.mdsalutil.instructions.InstructionGotoTable;
48 import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
49 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
50 import org.opendaylight.genius.utils.ServiceIndex;
51 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
52 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
53 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
54 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
55 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
56 import org.opendaylight.netvirt.elan.utils.ElanConstants;
57 import org.opendaylight.netvirt.elan.utils.ElanUtils;
58 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
59 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
60 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.ExternalTunnelList;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.external.tunnel.list.ExternalTunnel;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.DcGatewayIpList;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.dc.gateway.ip.list.DcGatewayIp;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsInputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsOutput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.EvpnAugmentation;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
76 import org.opendaylight.yangtools.yang.binding.DataObject;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.common.RpcResult;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 @Singleton
83 public class EvpnUtils {
84
85     private static final Logger LOG = LoggerFactory.getLogger(EvpnUtils.class);
86
87     private final BiPredicate<String, String> isNetAttach = (var1, var2) -> (var1 == null && var2 != null);
88     private final BiPredicate<String, String> isNetDetach = (var1, var2) -> (var1 != null && var2 == null);
89     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
90         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
91     private final DataBroker broker;
92     private final ManagedNewTransactionRunner txRunner;
93     private final IInterfaceManager interfaceManager;
94     private final ElanUtils elanUtils;
95     private final ItmRpcService itmRpcService;
96     private final JobCoordinator jobCoordinator;
97     private final IBgpManager bgpManager;
98     private final IVpnManager vpnManager;
99     private final ElanInstanceCache elanInstanceCache;
100
101     @Inject
102     public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
103             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
104             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
105         this.broker = broker;
106         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
107         this.interfaceManager = interfaceManager;
108         this.elanUtils = elanUtils;
109         this.itmRpcService = itmRpcService;
110         this.vpnManager = vpnManager;
111         this.bgpManager = bgpManager;
112         this.jobCoordinator = jobCoordinator;
113         this.elanInstanceCache = elanInstanceCache;
114     }
115
116     public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
117         return isNetDetach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
118     }
119
120     public boolean isAdvertiseEvpnRT2Routes(ElanInstance original, ElanInstance update) {
121         return isNetAttach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
122     }
123
124     @SuppressWarnings("checkstyle:IllegalCatch")
125     public void advertiseEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName)  {
126         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
127             return;
128         }
129         String evpnName = evpnAugmentation.getEvpnName();
130         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
131         if (macEntries == null || macEntries.isEmpty()) {
132             LOG.trace("advertiseEvpnRT2Routes no elan mac entries found for {}", elanName);
133             return;
134         }
135         String rd = vpnManager.getVpnRd(broker, evpnName);
136         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
137         macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
138             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
139             if (interfaceInfo == null) {
140                 LOG.debug("advertiseEvpnRT2Routes, interfaceInfo is null for interface {}", macEntry.getInterface());
141                 return;
142             }
143             advertisePrefix(elanInfo, rd, macEntry.getMacAddress().getValue(),
144                     macEntry.getIpPrefix().getIpv4Address().getValue(),
145                     interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
146         });
147     }
148
149     @Nullable
150     public String getEndpointIpAddressForDPN(BigInteger dpnId) {
151
152         Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
153                 new GetDpnEndpointIpsInputBuilder()
154                         .setSourceDpid(dpnId)
155                         .build());
156         RpcResult<GetDpnEndpointIpsOutput> rpcResult = null;
157         try {
158             rpcResult = result.get();
159         } catch (InterruptedException e) {
160             LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", dpnId, e);
161             return null;
162         } catch (ExecutionException e) {
163             LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", dpnId, e);
164             return null;
165         }
166         if (!rpcResult.isSuccessful()) {
167             LOG.warn("RPC Call to getDpnEndpointIps returned with Errors {}", rpcResult.getErrors());
168             return null;
169         }
170
171         List<IpAddress> nexthopIpList = rpcResult.getResult().getNexthopipList();
172         return nexthopIpList.get(0).getIpv4Address().getValue();
173     }
174
175     public Optional<String> getGatewayMacAddressForInterface(String vpnName,
176                                                                                     String ifName, String ipAddress) {
177         VpnPortipToPort gwPort = vpnManager.getNeutronPortFromVpnPortFixedIp(broker, vpnName, ipAddress);
178         return Optional.of(gwPort != null && gwPort.isSubnetIp()
179                 ? gwPort.getMacAddress()
180                 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
181     }
182
183     @Nullable
184     public String getL3vpnNameFromElan(ElanInstance elanInfo) {
185         if (elanInfo == null) {
186             LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
187             return null;
188         }
189         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
190         return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
191     }
192
193     @Nullable
194     public static String getEvpnNameFromElan(ElanInstance elanInfo) {
195         if (elanInfo == null) {
196             LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
197             return null;
198         }
199         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
200         return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
201     }
202
203     @Nullable
204     public String getEvpnRd(ElanInstance elanInfo) {
205         String evpnName = getEvpnNameFromElan(elanInfo);
206         if (evpnName == null) {
207             LOG.debug("getEvpnRd : evpnName is NULL for elanInfo {}", elanInfo);
208             return null;
209         }
210         return vpnManager.getVpnRd(broker, evpnName);
211     }
212
213     public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
214                                  String interfaceName, BigInteger dpnId) {
215         String rd = getEvpnRd(elanInfo);
216         advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
217     }
218
219     @SuppressWarnings("checkstyle:IllegalCatch")
220     public void advertisePrefix(ElanInstance elanInfo, String rd,
221                                  String macAddress, String prefix, String interfaceName, BigInteger dpnId) {
222         if (rd == null) {
223             LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
224             return;
225         }
226         String nextHop = getEndpointIpAddressForDPN(dpnId);
227         if (nextHop == null) {
228             LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
229             return;
230         }
231         int vpnLabel = 0;
232         long l2vni = elanUtils.getVxlanSegmentationId(elanInfo);
233         long l3vni = 0;
234         String gatewayMacAddr = null;
235         String l3VpName = getL3vpnNameFromElan(elanInfo);
236         if (l3VpName != null) {
237             VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
238             l3vni = l3VpnInstance.getL3vni();
239             com.google.common.base.Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
240                     interfaceName, prefix);
241             gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
242
243         }
244         LOG.info("Advertising routes with rd {},  macAddress {}, prefix {}, nextHop {},"
245                         + " vpnLabel {}, l3vni {}, l2vni {}, gatewayMac {}", rd, macAddress, prefix, nextHop,
246                 vpnLabel, l3vni, l2vni, gatewayMacAddr);
247         try {
248             bgpManager.advertisePrefix(rd, macAddress, prefix, nextHop,
249                     VrfEntryBase.EncapType.Vxlan, vpnLabel, l3vni, l2vni, gatewayMacAddr);
250         } catch (Exception e) {
251             LOG.error("Failed to advertisePrefix", e);
252         }
253     }
254
255     public void advertisePrefix(ElanInstance elanInfo, MacEntry macEntry) {
256         InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
257         if (interfaceInfo == null) {
258             LOG.debug("advertisePrefix, interfaceInfo is null for interface {}", macEntry.getInterface());
259             return;
260         }
261
262         if (!isIpv4PrefixAvailable.test(macEntry)) {
263             LOG.debug("advertisePrefix macEntry does not have IPv4 prefix {}", macEntry);
264             return;
265         }
266         advertisePrefix(elanInfo, macEntry.getMacAddress().getValue(),
267                 macEntry.getIpPrefix().getIpv4Address().getValue(),
268                 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
269     }
270
271     public void withdrawEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
272         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
273             LOG.trace("withdrawEvpnRT2Routes, evpnAugmentation is null");
274             return;
275         }
276
277         String evpnName = evpnAugmentation.getEvpnName();
278         String rd = vpnManager.getVpnRd(broker, evpnName);
279         if (rd == null) {
280             LOG.debug("withdrawEvpnRT2Routes : rd is null for {}", elanName);
281             return;
282         }
283         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
284         if (macEntries == null || macEntries.isEmpty()) {
285             LOG.debug("withdrawEvpnRT2Routes : macEntries  is empty for elan {} ", elanName);
286             return;
287         }
288         for (MacEntry macEntry : macEntries) {
289             if (!isIpv4PrefixAvailable.test(macEntry)) {
290                 LOG.debug("withdrawEvpnRT2Routes macEntry does not have IPv4 prefix {}", macEntry);
291                 continue;
292             }
293             String prefix = macEntry.getIpPrefix().getIpv4Address().getValue();
294             LOG.info("Withdrawing routes with rd {}, prefix {}", rd, prefix);
295             bgpManager.withdrawPrefix(rd, prefix);
296         }
297     }
298
299     public void withdrawPrefix(ElanInstance elanInfo, String prefix) {
300         String rd = getEvpnRd(elanInfo);
301         if (rd == null) {
302             return;
303         }
304         bgpManager.withdrawPrefix(rd, prefix);
305     }
306
307     public void withdrawPrefix(ElanInstance elanInfo, MacEntry macEntry) {
308         if (!isIpv4PrefixAvailable.test(macEntry)) {
309             LOG.debug("withdrawPrefix macEntry does not have IPv4 prefix {}", macEntry);
310             return;
311         }
312         withdrawPrefix(elanInfo, macEntry.getIpPrefix().getIpv4Address().getValue());
313     }
314
315     public static InstanceIdentifier<ExternalTunnelList> getExternaTunnelListIdentifier() {
316         return InstanceIdentifier
317                 .builder(ExternalTunnelList.class).build();
318     }
319
320     public Optional<ExternalTunnelList> getExternalTunnelList() {
321         InstanceIdentifier<ExternalTunnelList> externalTunnelListId = getExternaTunnelListIdentifier();
322         ExternalTunnelList externalTunnelList = null;
323         try {
324             externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
325                     externalTunnelListId).orNull();
326         } catch (ReadFailedException e) {
327             LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
328         }
329         return Optional.fromNullable(externalTunnelList);
330     }
331
332     public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
333         return InstanceIdentifier
334                 .builder(DcGatewayIpList.class).build();
335     }
336
337     public Optional<DcGatewayIpList> getDcGatewayIpList() {
338         InstanceIdentifier<DcGatewayIpList> dcGatewayIpListInstanceIdentifier = getDcGatewayIpListIdentifier();
339         DcGatewayIpList dcGatewayIpListConfig = null;
340         try {
341             dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
342                     dcGatewayIpListInstanceIdentifier).orNull();
343         } catch (ReadFailedException e) {
344             LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
345         }
346         return Optional.fromNullable(dcGatewayIpListConfig);
347     }
348
349     public List<String> getDcGatewayTunnelInterfaceNameList() {
350         final List<String> tunnelInterfaceNameList = new ArrayList<>();
351         Optional<DcGatewayIpList> dcGatewayIpListOptional = getDcGatewayIpList();
352         if (!dcGatewayIpListOptional.isPresent()) {
353             LOG.info("No DC gateways configured while programming the l2vni table.");
354             return tunnelInterfaceNameList;
355         }
356         List<DcGatewayIp> dcGatewayIps = dcGatewayIpListOptional.get().getDcGatewayIp();
357
358         Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
359         if (!externalTunnelListOptional.isPresent()) {
360             LOG.info("No External Tunnel Configured while programming the l2vni table.");
361             return tunnelInterfaceNameList;
362         }
363         List<ExternalTunnel> externalTunnels =
364             requireNonNullElse(externalTunnelListOptional.get().getExternalTunnel(), emptyList());
365
366         requireNonNullElse(dcGatewayIps, Collections.<DcGatewayIp>emptyList())
367                 .forEach(dcIp -> externalTunnels
368                 .stream()
369                 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
370                         .contains(dcIp.getIpAddress().getIpv4Address().toString()))
371                 .forEach(externalTunnel -> tunnelInterfaceNameList.add(externalTunnel.getTunnelInterfaceName())));
372
373         return tunnelInterfaceNameList;
374     }
375
376     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
377         ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
378             int instructionKey = 0;
379             LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
380             List<Instruction> instructions = new ArrayList<>();
381             instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
382                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
383             short elanServiceIndex =
384                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
385             BoundServices serviceInfo = ElanUtils.getBoundServices(
386                     ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
387                     NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
388             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
389             if (!tx.read(bindServiceId).get().isPresent()) {
390                 tx.put(bindServiceId, serviceInfo, WriteTransaction.CREATE_MISSING_PARENTS);
391             }
392         }), LOG, "Error binding an ELAN service to an external tunnel");
393     }
394
395     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
396         ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
397             LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
398             short elanServiceIndex =
399                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
400             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
401             if (tx.read(bindServiceId).get().isPresent()) {
402                 tx.delete(bindServiceId);
403             }
404         }), LOG, "Error binding an ELAN service to an external tunnel");
405     }
406
407     private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
408         List<InstructionInfo> mkInstructions = new ArrayList<>();
409         mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag, false),
410                 ElanHelper.getElanMetadataMask()));
411         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
412         return mkInstructions;
413     }
414
415     private String getFlowRef(long tableId, long elanTag, BigInteger dpnId) {
416         return new StringBuilder().append(tableId).append(elanTag).append(dpnId).toString();
417     }
418
419     private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<BigInteger, FlowEntity> flowHandler) {
420         long elanTag = elanInfo.getElanTag();
421         List<MatchInfo> mkMatches = new ArrayList<>();
422         mkMatches.add(new MatchTunnelId(BigInteger.valueOf(elanUtils.getVxlanSegmentationId(elanInfo))));
423         NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
424             LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
425             List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
426             String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag, dpnId);
427             FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
428                     dpnId,
429                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
430                     flowRef,
431                     5, // prio
432                     elanInfo.getElanInstanceName(), // flowName
433                     0, // idleTimeout
434                     0, // hardTimeout
435                     ITMConstants.COOKIE_ITM_EXTERNAL.add(BigInteger.valueOf(elanTag)),
436                     mkMatches,
437                     instructions);
438             flowHandler.accept(dpnId, flowEntity);
439         });
440     }
441
442     public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
443                                            BiConsumer<BigInteger, FlowEntity> flowHandler) {
444         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
445         List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
446         if (tunnelInterfaceNameList.isEmpty()) {
447             LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
448             return;
449         }
450
451         tunnelInterfaceNameList.forEach(tunnelInterfaceName -> serviceHandler.accept(elanName, tunnelInterfaceName));
452         programEvpnL2vniFlow(elanInfo, flowHandler);
453     }
454
455     @SuppressWarnings({ "unchecked", "rawtypes" })
456     public <T extends DataObject> void asyncReadAndExecute(final LogicalDatastoreType datastoreType,
457             final InstanceIdentifier<T> iid, final String jobKey, final Function<Optional<T>, Void> function) {
458         jobCoordinator.enqueueJob(jobKey, () -> {
459             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
460             List futures = Collections.singletonList(settableFuture);
461
462             try (ReadOnlyTransaction tx = broker.newReadOnlyTransaction()) {
463                 Futures.addCallback(tx.read(datastoreType, iid),
464                         new SettableFutureCallback<Optional<T>>(settableFuture) {
465                             @Override
466                             public void onSuccess(Optional<T> data) {
467                                 function.apply(data);
468                                 super.onSuccess(data);
469                             }
470                         }, MoreExecutors.directExecutor());
471
472                 return futures;
473             }
474         }, ElanConstants.JOB_MAX_RETRIES);
475     }
476 }