Fixup Augmentable and Identifiable methods changing
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / evpn / utils / EvpnUtils.java
1 /*
2  * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.evpn.utils;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.SettableFuture;
14 import java.math.BigInteger;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Future;
20 import java.util.function.BiConsumer;
21 import java.util.function.BiPredicate;
22 import java.util.function.Function;
23 import java.util.function.Predicate;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
31 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
33 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
34 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
35 import org.opendaylight.genius.itm.globals.ITMConstants;
36 import org.opendaylight.genius.mdsalutil.FlowEntity;
37 import org.opendaylight.genius.mdsalutil.InstructionInfo;
38 import org.opendaylight.genius.mdsalutil.MDSALUtil;
39 import org.opendaylight.genius.mdsalutil.MatchInfo;
40 import org.opendaylight.genius.mdsalutil.NWUtil;
41 import org.opendaylight.genius.mdsalutil.NwConstants;
42 import org.opendaylight.genius.mdsalutil.instructions.InstructionGotoTable;
43 import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
44 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
45 import org.opendaylight.genius.utils.ServiceIndex;
46 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
47 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
48 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
49 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
50 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
51 import org.opendaylight.netvirt.elan.utils.ElanConstants;
52 import org.opendaylight.netvirt.elan.utils.ElanUtils;
53 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
54 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
55 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.ExternalTunnelList;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.external.tunnel.list.ExternalTunnel;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.DcGatewayIpList;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.dc.gateway.ip.list.DcGatewayIp;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.EvpnAugmentation;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
71 import org.opendaylight.yangtools.yang.binding.DataObject;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 @Singleton
78 public class EvpnUtils {
79
80     private static final Logger LOG = LoggerFactory.getLogger(EvpnUtils.class);
81
82     private final BiPredicate<String, String> isNetAttach = (var1, var2) -> (var1 == null && var2 != null);
83     private final BiPredicate<String, String> isNetDetach = (var1, var2) -> (var1 != null && var2 == null);
84     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
85         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
86     private final DataBroker broker;
87     private final ManagedNewTransactionRunner txRunner;
88     private final IInterfaceManager interfaceManager;
89     private final ElanUtils elanUtils;
90     private final ItmRpcService itmRpcService;
91     private final JobCoordinator jobCoordinator;
92     private final IBgpManager bgpManager;
93     private final IVpnManager vpnManager;
94     private final ElanInstanceCache elanInstanceCache;
95
96     @Inject
97     public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
98             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
99             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
100         this.broker = broker;
101         this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
102         this.interfaceManager = interfaceManager;
103         this.elanUtils = elanUtils;
104         this.itmRpcService = itmRpcService;
105         this.vpnManager = vpnManager;
106         this.bgpManager = bgpManager;
107         this.jobCoordinator = jobCoordinator;
108         this.elanInstanceCache = elanInstanceCache;
109     }
110
111     public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
112         return isNetDetach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
113     }
114
115     public boolean isAdvertiseEvpnRT2Routes(ElanInstance original, ElanInstance update) {
116         return isNetAttach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
117     }
118
119     @SuppressWarnings("checkstyle:IllegalCatch")
120     public void advertiseEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName)  {
121         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
122             return;
123         }
124         String evpnName = evpnAugmentation.getEvpnName();
125         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
126         if (macEntries == null || macEntries.isEmpty()) {
127             LOG.trace("advertiseEvpnRT2Routes no elan mac entries found for {}", elanName);
128             return;
129         }
130         String rd = vpnManager.getVpnRd(broker, evpnName);
131         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
132         macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
133             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
134             if (interfaceInfo == null) {
135                 LOG.debug("advertiseEvpnRT2Routes, interfaceInfo is null for interface {}", macEntry.getInterface());
136                 return;
137             }
138             advertisePrefix(elanInfo, rd, macEntry.getMacAddress().getValue(),
139                     macEntry.getIpPrefix().getIpv4Address().getValue(),
140                     interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
141         });
142     }
143
144     public String getEndpointIpAddressForDPN(BigInteger dpnId) {
145
146         Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
147                 new GetDpnEndpointIpsInputBuilder()
148                         .setSourceDpid(dpnId)
149                         .build());
150         RpcResult<GetDpnEndpointIpsOutput> rpcResult = null;
151         try {
152             rpcResult = result.get();
153         } catch (InterruptedException e) {
154             LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", dpnId, e);
155             return null;
156         } catch (ExecutionException e) {
157             LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", dpnId, e);
158             return null;
159         }
160         if (!rpcResult.isSuccessful()) {
161             LOG.warn("RPC Call to getDpnEndpointIps returned with Errors {}", rpcResult.getErrors());
162             return null;
163         }
164
165         List<IpAddress> nexthopIpList = rpcResult.getResult().getNexthopipList();
166         return nexthopIpList.get(0).getIpv4Address().getValue();
167     }
168
169     public Optional<String> getGatewayMacAddressForInterface(String vpnName,
170                                                                                     String ifName, String ipAddress) {
171         VpnPortipToPort gwPort = vpnManager.getNeutronPortFromVpnPortFixedIp(broker, vpnName, ipAddress);
172         return Optional.of(gwPort != null && gwPort.isSubnetIp()
173                 ? gwPort.getMacAddress()
174                 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
175     }
176
177     public String getL3vpnNameFromElan(ElanInstance elanInfo) {
178         if (elanInfo == null) {
179             LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
180             return null;
181         }
182         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
183         return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
184     }
185
186     public static String getEvpnNameFromElan(ElanInstance elanInfo) {
187         if (elanInfo == null) {
188             LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
189             return null;
190         }
191         EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
192         return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
193     }
194
195     public String getEvpnRd(ElanInstance elanInfo) {
196         String evpnName = getEvpnNameFromElan(elanInfo);
197         if (evpnName == null) {
198             LOG.debug("getEvpnRd : evpnName is NULL for elanInfo {}", elanInfo);
199             return null;
200         }
201         return vpnManager.getVpnRd(broker, evpnName);
202     }
203
204     public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
205                                  String interfaceName, BigInteger dpnId) {
206         String rd = getEvpnRd(elanInfo);
207         advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
208     }
209
210     @SuppressWarnings("checkstyle:IllegalCatch")
211     public void advertisePrefix(ElanInstance elanInfo, String rd,
212                                  String macAddress, String prefix, String interfaceName, BigInteger dpnId) {
213         if (rd == null) {
214             LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
215             return;
216         }
217         String nextHop = getEndpointIpAddressForDPN(dpnId);
218         if (nextHop == null) {
219             LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
220             return;
221         }
222         int vpnLabel = 0;
223         long l2vni = elanUtils.getVxlanSegmentationId(elanInfo);
224         long l3vni = 0;
225         String gatewayMacAddr = null;
226         String l3VpName = getL3vpnNameFromElan(elanInfo);
227         if (l3VpName != null) {
228             VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
229             l3vni = l3VpnInstance.getL3vni();
230             com.google.common.base.Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
231                     interfaceName, prefix);
232             gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
233
234         }
235         LOG.info("Advertising routes with rd {},  macAddress {}, prefix {}, nextHop {},"
236                         + " vpnLabel {}, l3vni {}, l2vni {}, gatewayMac {}", rd, macAddress, prefix, nextHop,
237                 vpnLabel, l3vni, l2vni, gatewayMacAddr);
238         try {
239             bgpManager.advertisePrefix(rd, macAddress, prefix, nextHop,
240                     VrfEntryBase.EncapType.Vxlan, vpnLabel, l3vni, l2vni, gatewayMacAddr);
241         } catch (Exception e) {
242             LOG.error("Failed to advertisePrefix", e);
243         }
244     }
245
246     public void advertisePrefix(ElanInstance elanInfo, MacEntry macEntry) {
247         InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
248         if (interfaceInfo == null) {
249             LOG.debug("advertisePrefix, interfaceInfo is null for interface {}", macEntry.getInterface());
250             return;
251         }
252
253         if (!isIpv4PrefixAvailable.test(macEntry)) {
254             LOG.debug("advertisePrefix macEntry does not have IPv4 prefix {}", macEntry);
255             return;
256         }
257         advertisePrefix(elanInfo, macEntry.getMacAddress().getValue(),
258                 macEntry.getIpPrefix().getIpv4Address().getValue(),
259                 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
260     }
261
262     public void withdrawEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
263         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
264             LOG.trace("withdrawEvpnRT2Routes, evpnAugmentation is null");
265             return;
266         }
267
268         String evpnName = evpnAugmentation.getEvpnName();
269         String rd = vpnManager.getVpnRd(broker, evpnName);
270         if (rd == null) {
271             LOG.debug("withdrawEvpnRT2Routes : rd is null for {}", elanName);
272             return;
273         }
274         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
275         if (macEntries == null || macEntries.isEmpty()) {
276             LOG.debug("withdrawEvpnRT2Routes : macEntries  is empty for elan {} ", elanName);
277             return;
278         }
279         for (MacEntry macEntry : macEntries) {
280             if (!isIpv4PrefixAvailable.test(macEntry)) {
281                 LOG.debug("withdrawEvpnRT2Routes macEntry does not have IPv4 prefix {}", macEntry);
282                 continue;
283             }
284             String prefix = macEntry.getIpPrefix().getIpv4Address().getValue();
285             LOG.info("Withdrawing routes with rd {}, prefix {}", rd, prefix);
286             bgpManager.withdrawPrefix(rd, prefix);
287         }
288     }
289
290     public void withdrawPrefix(ElanInstance elanInfo, String prefix) {
291         String rd = getEvpnRd(elanInfo);
292         if (rd == null) {
293             return;
294         }
295         bgpManager.withdrawPrefix(rd, prefix);
296     }
297
298     public void withdrawPrefix(ElanInstance elanInfo, MacEntry macEntry) {
299         if (!isIpv4PrefixAvailable.test(macEntry)) {
300             LOG.debug("withdrawPrefix macEntry does not have IPv4 prefix {}", macEntry);
301             return;
302         }
303         withdrawPrefix(elanInfo, macEntry.getIpPrefix().getIpv4Address().getValue());
304     }
305
306     public static InstanceIdentifier<ExternalTunnelList> getExternaTunnelListIdentifier() {
307         return InstanceIdentifier
308                 .builder(ExternalTunnelList.class).build();
309     }
310
311     public Optional<ExternalTunnelList> getExternalTunnelList() {
312         InstanceIdentifier<ExternalTunnelList> externalTunnelListId = getExternaTunnelListIdentifier();
313         ExternalTunnelList externalTunnelList = null;
314         try {
315             externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
316                     externalTunnelListId).orNull();
317         } catch (ReadFailedException e) {
318             LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
319         }
320         return Optional.fromNullable(externalTunnelList);
321     }
322
323     public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
324         return InstanceIdentifier
325                 .builder(DcGatewayIpList.class).build();
326     }
327
328     public Optional<DcGatewayIpList> getDcGatewayIpList() {
329         InstanceIdentifier<DcGatewayIpList> dcGatewayIpListInstanceIdentifier = getDcGatewayIpListIdentifier();
330         DcGatewayIpList dcGatewayIpListConfig = null;
331         try {
332             dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
333                     dcGatewayIpListInstanceIdentifier).orNull();
334         } catch (ReadFailedException e) {
335             LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
336         }
337         return Optional.fromNullable(dcGatewayIpListConfig);
338     }
339
340     public List<String> getDcGatewayTunnelInterfaceNameList() {
341         final List<String> tunnelInterfaceNameList = new ArrayList<>();
342         Optional<DcGatewayIpList> dcGatewayIpListOptional = getDcGatewayIpList();
343         if (!dcGatewayIpListOptional.isPresent()) {
344             LOG.info("No DC gateways configured while programming the l2vni table.");
345             return tunnelInterfaceNameList;
346         }
347         List<DcGatewayIp> dcGatewayIps = dcGatewayIpListOptional.get().getDcGatewayIp();
348
349         Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
350         if (!externalTunnelListOptional.isPresent()) {
351             LOG.info("No External Tunnel Configured while programming the l2vni table.");
352             return tunnelInterfaceNameList;
353         }
354         List<ExternalTunnel> externalTunnels = externalTunnelListOptional.get().getExternalTunnel();
355
356         dcGatewayIps
357                 .forEach(dcIp -> externalTunnels
358                 .stream()
359                 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
360                         .contains(dcIp.getIpAddress().getIpv4Address().toString()))
361                 .forEach(externalTunnel -> tunnelInterfaceNameList.add(externalTunnel.getTunnelInterfaceName())));
362
363         return tunnelInterfaceNameList;
364     }
365
366     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
367         ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
368             int instructionKey = 0;
369             LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
370             List<Instruction> instructions = new ArrayList<>();
371             instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
372                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
373             short elanServiceIndex =
374                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
375             BoundServices serviceInfo = ElanUtils.getBoundServices(
376                     ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
377                     NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
378             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
379             if (!tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
380                 tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo,
381                         WriteTransaction.CREATE_MISSING_PARENTS);
382             }
383         }), LOG, "Error binding an ELAN service to an external tunnel");
384     }
385
386     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
387         ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
388             LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
389             short elanServiceIndex =
390                     ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
391             InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
392             if (tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
393                 tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
394             }
395         }), LOG, "Error binding an ELAN service to an external tunnel");
396     }
397
398     private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
399         List<InstructionInfo> mkInstructions = new ArrayList<>();
400         mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag, false),
401                 ElanHelper.getElanMetadataMask()));
402         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
403         return mkInstructions;
404     }
405
406     private String getFlowRef(long tableId, long elanTag, BigInteger dpnId) {
407         return new StringBuilder().append(tableId).append(elanTag).append(dpnId).toString();
408     }
409
410     private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<BigInteger, FlowEntity> flowHandler) {
411         long elanTag = elanInfo.getElanTag();
412         List<MatchInfo> mkMatches = new ArrayList<>();
413         mkMatches.add(new MatchTunnelId(BigInteger.valueOf(elanUtils.getVxlanSegmentationId(elanInfo))));
414         NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
415             LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
416             List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
417             String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag, dpnId);
418             FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
419                     dpnId,
420                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
421                     flowRef,
422                     5, // prio
423                     elanInfo.getElanInstanceName(), // flowName
424                     0, // idleTimeout
425                     0, // hardTimeout
426                     ITMConstants.COOKIE_ITM_EXTERNAL.add(BigInteger.valueOf(elanTag)),
427                     mkMatches,
428                     instructions);
429             flowHandler.accept(dpnId, flowEntity);
430         });
431     }
432
433     public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
434                                            BiConsumer<BigInteger, FlowEntity> flowHandler) {
435         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
436         List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
437         if (tunnelInterfaceNameList.isEmpty()) {
438             LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
439             return;
440         }
441
442         tunnelInterfaceNameList.forEach(tunnelInterfaceName -> serviceHandler.accept(elanName, tunnelInterfaceName));
443         programEvpnL2vniFlow(elanInfo, flowHandler);
444     }
445
446     @SuppressWarnings({ "unchecked", "rawtypes" })
447     public <T extends DataObject> void asyncReadAndExecute(final LogicalDatastoreType datastoreType,
448             final InstanceIdentifier<T> iid, final String jobKey, final Function<Optional<T>, Void> function) {
449         jobCoordinator.enqueueJob(jobKey, () -> {
450             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
451             List futures = Collections.singletonList(settableFuture);
452
453             try (ReadOnlyTransaction tx = broker.newReadOnlyTransaction()) {
454                 Futures.addCallback(tx.read(datastoreType, iid),
455                         new SettableFutureCallback<Optional<T>>(settableFuture) {
456                             @Override
457                             public void onSuccess(Optional<T> data) {
458                                 function.apply(data);
459                                 super.onSuccess(data);
460                             }
461                         }, MoreExecutors.directExecutor());
462
463                 return futures;
464             }
465         }, ElanConstants.JOB_MAX_RETRIES);
466     }
467 }