Refactor elanInstanceLocalCache to a non-static singleton
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / evpn / utils / EvpnUtils.java
1 /*
2  * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netvirt.elan.evpn.utils;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.SettableFuture;
14 import java.math.BigInteger;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Future;
20 import java.util.function.BiConsumer;
21 import java.util.function.BiPredicate;
22 import java.util.function.Function;
23 import java.util.function.Predicate;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
31 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
32 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
33 import org.opendaylight.genius.itm.globals.ITMConstants;
34 import org.opendaylight.genius.mdsalutil.FlowEntity;
35 import org.opendaylight.genius.mdsalutil.InstructionInfo;
36 import org.opendaylight.genius.mdsalutil.MDSALUtil;
37 import org.opendaylight.genius.mdsalutil.MatchInfo;
38 import org.opendaylight.genius.mdsalutil.NWUtil;
39 import org.opendaylight.genius.mdsalutil.NwConstants;
40 import org.opendaylight.genius.mdsalutil.instructions.InstructionGotoTable;
41 import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
42 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
43 import org.opendaylight.genius.utils.ServiceIndex;
44 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
45 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
46 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
47 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
48 import org.opendaylight.netvirt.elan.utils.ElanConstants;
49 import org.opendaylight.netvirt.elan.utils.ElanUtils;
50 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
51 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
52 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
53 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.ExternalTunnelList;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.external.tunnel.list.ExternalTunnel;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.DcGatewayIpList;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.dc.gateway.ip.list.DcGatewayIp;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsInputBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsOutput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.EvpnAugmentation;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
68 import org.opendaylight.yangtools.yang.binding.DataObject;
69 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
70 import org.opendaylight.yangtools.yang.common.RpcResult;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 @Singleton
75 public class EvpnUtils {
76
77     private static final Logger LOG = LoggerFactory.getLogger(EvpnUtils.class);
78
79     private final BiPredicate<String, String> isNetAttach = (var1, var2) -> (var1 == null && var2 != null);
80     private final BiPredicate<String, String> isNetDetach = (var1, var2) -> (var1 != null && var2 == null);
81     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
82         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
83     private final DataBroker broker;
84     private final IInterfaceManager interfaceManager;
85     private final ElanUtils elanUtils;
86     private final ItmRpcService itmRpcService;
87     private final JobCoordinator jobCoordinator;
88     private final IBgpManager bgpManager;
89     private final IVpnManager vpnManager;
90     private final ElanInstanceCache elanInstanceCache;
91
92     @Inject
93     public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
94             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
95             JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
96         this.broker = broker;
97         this.interfaceManager = interfaceManager;
98         this.elanUtils = elanUtils;
99         this.itmRpcService = itmRpcService;
100         this.vpnManager = vpnManager;
101         this.bgpManager = bgpManager;
102         this.jobCoordinator = jobCoordinator;
103         this.elanInstanceCache = elanInstanceCache;
104     }
105
106     public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
107         return isNetDetach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
108     }
109
110     public boolean isAdvertiseEvpnRT2Routes(ElanInstance original, ElanInstance update) {
111         return isNetAttach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
112     }
113
114     @SuppressWarnings("checkstyle:IllegalCatch")
115     public void advertiseEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName)  {
116         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
117             return;
118         }
119         String evpnName = evpnAugmentation.getEvpnName();
120         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
121         if (macEntries == null || macEntries.isEmpty()) {
122             LOG.trace("advertiseEvpnRT2Routes no elan mac entries found for {}", elanName);
123             return;
124         }
125         String rd = vpnManager.getVpnRd(broker, evpnName);
126         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
127         macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
128             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
129             if (interfaceInfo == null) {
130                 LOG.debug("advertiseEvpnRT2Routes, interfaceInfo is null for interface {}", macEntry.getInterface());
131                 return;
132             }
133             advertisePrefix(elanInfo, rd, macEntry.getMacAddress().getValue(),
134                     macEntry.getIpPrefix().getIpv4Address().getValue(),
135                     interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
136         });
137     }
138
139     public String getEndpointIpAddressForDPN(BigInteger dpnId) {
140
141         Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
142                 new GetDpnEndpointIpsInputBuilder()
143                         .setSourceDpid(dpnId)
144                         .build());
145         RpcResult<GetDpnEndpointIpsOutput> rpcResult = null;
146         try {
147             rpcResult = result.get();
148         } catch (InterruptedException e) {
149             LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", e, dpnId);
150             return null;
151         } catch (ExecutionException e) {
152             LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", e, dpnId);
153             return null;
154         }
155         if (!rpcResult.isSuccessful()) {
156             LOG.warn("RPC Call to getDpnEndpointIps returned with Errors {}", rpcResult.getErrors());
157             return null;
158         }
159
160         List<IpAddress> nexthopIpList = rpcResult.getResult().getNexthopipList();
161         return nexthopIpList.get(0).getIpv4Address().getValue();
162     }
163
164     public Optional<String> getGatewayMacAddressForInterface(String vpnName,
165                                                                                     String ifName, String ipAddress) {
166         VpnPortipToPort gwPort = vpnManager.getNeutronPortFromVpnPortFixedIp(broker, vpnName, ipAddress);
167         return Optional.of(gwPort != null && gwPort.isSubnetIp()
168                 ? gwPort.getMacAddress()
169                 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
170     }
171
172     public String getL3vpnNameFromElan(ElanInstance elanInfo) {
173         if (elanInfo == null) {
174             LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
175             return null;
176         }
177         EvpnAugmentation evpnAugmentation = elanInfo.getAugmentation(EvpnAugmentation.class);
178         return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
179     }
180
181     public static String getEvpnNameFromElan(ElanInstance elanInfo) {
182         if (elanInfo == null) {
183             LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
184             return null;
185         }
186         EvpnAugmentation evpnAugmentation = elanInfo.getAugmentation(EvpnAugmentation.class);
187         return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
188     }
189
190     public String getEvpnRd(ElanInstance elanInfo) {
191         String evpnName = getEvpnNameFromElan(elanInfo);
192         if (evpnName == null) {
193             LOG.debug("getEvpnRd : evpnName is NULL for elanInfo {}", elanInfo);
194             return null;
195         }
196         return vpnManager.getVpnRd(broker, evpnName);
197     }
198
199     public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
200                                  String interfaceName, BigInteger dpnId) {
201         String rd = getEvpnRd(elanInfo);
202         advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
203     }
204
205     @SuppressWarnings("checkstyle:IllegalCatch")
206     public void advertisePrefix(ElanInstance elanInfo, String rd,
207                                  String macAddress, String prefix, String interfaceName, BigInteger dpnId) {
208         if (rd == null) {
209             LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
210             return;
211         }
212         String nextHop = getEndpointIpAddressForDPN(dpnId);
213         if (nextHop == null) {
214             LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
215             return;
216         }
217         int vpnLabel = 0;
218         long l2vni = elanInfo.getSegmentationId();
219         long l3vni = 0;
220         String gatewayMacAddr = null;
221         String l3VpName = getL3vpnNameFromElan(elanInfo);
222         if (l3VpName != null) {
223             VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
224             l3vni = l3VpnInstance.getL3vni();
225             com.google.common.base.Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
226                     interfaceName, prefix);
227             gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
228
229         }
230         LOG.info("Advertising routes with rd {},  macAddress {}, prefix {}, nextHop {},"
231                         + " vpnLabel {}, l3vni {}, l2vni {}, gatewayMac {}", rd, macAddress, prefix, nextHop,
232                 vpnLabel, l3vni, l2vni, gatewayMacAddr);
233         try {
234             bgpManager.advertisePrefix(rd, macAddress, prefix, nextHop,
235                     VrfEntryBase.EncapType.Vxlan, vpnLabel, l3vni, l2vni, gatewayMacAddr);
236         } catch (Exception e) {
237             LOG.error("Failed to advertisePrefix", e);
238         }
239     }
240
241     public void advertisePrefix(ElanInstance elanInfo, MacEntry macEntry) {
242         InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
243         if (interfaceInfo == null) {
244             LOG.debug("advertisePrefix, interfaceInfo is null for interface {}", macEntry.getInterface());
245             return;
246         }
247
248         if (!isIpv4PrefixAvailable.test(macEntry)) {
249             LOG.debug("advertisePrefix macEntry does not have IPv4 prefix {}", macEntry);
250             return;
251         }
252         advertisePrefix(elanInfo, macEntry.getMacAddress().getValue(),
253                 macEntry.getIpPrefix().getIpv4Address().getValue(),
254                 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
255     }
256
257     public void withdrawEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
258         if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
259             LOG.trace("withdrawEvpnRT2Routes, evpnAugmentation is null");
260             return;
261         }
262
263         String evpnName = evpnAugmentation.getEvpnName();
264         String rd = vpnManager.getVpnRd(broker, evpnName);
265         if (rd == null) {
266             LOG.debug("withdrawEvpnRT2Routes : rd is null ", elanName);
267             return;
268         }
269         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
270         if (macEntries == null || macEntries.isEmpty()) {
271             LOG.debug("withdrawEvpnRT2Routes : macEntries  is empty for elan {} ", elanName);
272             return;
273         }
274         for (MacEntry macEntry : macEntries) {
275             if (!isIpv4PrefixAvailable.test(macEntry)) {
276                 LOG.debug("withdrawEvpnRT2Routes macEntry does not have IPv4 prefix {}", macEntry);
277                 continue;
278             }
279             String prefix = macEntry.getIpPrefix().getIpv4Address().getValue();
280             LOG.info("Withdrawing routes with rd {}, prefix {}", rd, prefix);
281             bgpManager.withdrawPrefix(rd, prefix);
282         }
283     }
284
285     public void withdrawPrefix(ElanInstance elanInfo, String prefix) {
286         String rd = getEvpnRd(elanInfo);
287         if (rd == null) {
288             return;
289         }
290         bgpManager.withdrawPrefix(rd, prefix);
291     }
292
293     public void withdrawPrefix(ElanInstance elanInfo, MacEntry macEntry) {
294         if (!isIpv4PrefixAvailable.test(macEntry)) {
295             LOG.debug("withdrawPrefix macEntry does not have IPv4 prefix {}", macEntry);
296             return;
297         }
298         withdrawPrefix(elanInfo, macEntry.getIpPrefix().getIpv4Address().getValue());
299     }
300
301     public static InstanceIdentifier<ExternalTunnelList> getExternaTunnelListIdentifier() {
302         return InstanceIdentifier
303                 .builder(ExternalTunnelList.class).build();
304     }
305
306     public Optional<ExternalTunnelList> getExternalTunnelList() {
307         InstanceIdentifier<ExternalTunnelList> externalTunnelListId = getExternaTunnelListIdentifier();
308         ExternalTunnelList externalTunnelList = null;
309         try {
310             externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
311                     externalTunnelListId).orNull();
312         } catch (ReadFailedException e) {
313             LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
314         }
315         return Optional.fromNullable(externalTunnelList);
316     }
317
318     public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
319         return InstanceIdentifier
320                 .builder(DcGatewayIpList.class).build();
321     }
322
323     public Optional<DcGatewayIpList> getDcGatewayIpList() {
324         InstanceIdentifier<DcGatewayIpList> dcGatewayIpListInstanceIdentifier = getDcGatewayIpListIdentifier();
325         DcGatewayIpList dcGatewayIpListConfig = null;
326         try {
327             dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
328                     dcGatewayIpListInstanceIdentifier).orNull();
329         } catch (ReadFailedException e) {
330             LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
331         }
332         return Optional.fromNullable(dcGatewayIpListConfig);
333     }
334
335     public List<String> getDcGatewayTunnelInterfaceNameList() {
336         final List<String> tunnelInterfaceNameList = new ArrayList<>();
337         Optional<DcGatewayIpList> dcGatewayIpListOptional = getDcGatewayIpList();
338         if (!dcGatewayIpListOptional.isPresent()) {
339             LOG.info("No DC gateways configured while programming the l2vni table.");
340             return tunnelInterfaceNameList;
341         }
342         List<DcGatewayIp> dcGatewayIps = dcGatewayIpListOptional.get().getDcGatewayIp();
343
344         Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
345         if (!externalTunnelListOptional.isPresent()) {
346             LOG.info("No External Tunnel Configured while programming the l2vni table.");
347             return tunnelInterfaceNameList;
348         }
349         List<ExternalTunnel> externalTunnels = externalTunnelListOptional.get().getExternalTunnel();
350
351         dcGatewayIps
352                 .forEach(dcIp -> externalTunnels
353                 .stream()
354                 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
355                         .contains(dcIp.getIpAddress().getIpv4Address().toString()))
356                 .forEach(externalTunnel -> tunnelInterfaceNameList.add(externalTunnel.getTunnelInterfaceName())));
357
358         return tunnelInterfaceNameList;
359     }
360
361     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
362         WriteTransaction tx = broker.newWriteOnlyTransaction();
363         int instructionKey = 0;
364         LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
365         List<Instruction> instructions = new ArrayList<>();
366         instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
367                 NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
368         short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
369         BoundServices serviceInfo = ElanUtils.getBoundServices(
370                 ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
371                 NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
372         InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
373         Optional<BoundServices> existingElanService = elanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
374                 bindServiceId);
375         if (!existingElanService.isPresent()) {
376             tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo, true);
377             tx.submit();
378         }
379     }
380
381     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
382         WriteTransaction tx = broker.newWriteOnlyTransaction();
383         LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
384         short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
385         InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
386         Optional<BoundServices> existingElanService = elanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
387                 bindServiceId);
388         if (!existingElanService.isPresent()) {
389             tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
390             tx.submit();
391         }
392     }
393
394     private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
395         List<InstructionInfo> mkInstructions = new ArrayList<>();
396         mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag, false),
397                 ElanHelper.getElanMetadataMask()));
398         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
399         return mkInstructions;
400     }
401
402     private String getFlowRef(long tableId, long elanTag, BigInteger dpnId) {
403         return new StringBuilder().append(tableId).append(elanTag).append(dpnId).toString();
404     }
405
406     private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<BigInteger, FlowEntity> flowHandler) {
407         long elanTag = elanInfo.getElanTag();
408         List<MatchInfo> mkMatches = new ArrayList<>();
409         mkMatches.add(new MatchTunnelId(BigInteger.valueOf(elanInfo.getSegmentationId())));
410         NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
411             LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
412             List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
413             String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag, dpnId);
414             FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
415                     dpnId,
416                     NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
417                     flowRef,
418                     5, // prio
419                     elanInfo.getElanInstanceName(), // flowName
420                     0, // idleTimeout
421                     0, // hardTimeout
422                     ITMConstants.COOKIE_ITM_EXTERNAL.add(BigInteger.valueOf(elanTag)),
423                     mkMatches,
424                     instructions);
425             flowHandler.accept(dpnId, flowEntity);
426         });
427     }
428
429     public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
430                                            BiConsumer<BigInteger, FlowEntity> flowHandler) {
431         ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
432         List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
433         if (tunnelInterfaceNameList.isEmpty()) {
434             LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
435             return;
436         }
437
438         tunnelInterfaceNameList.forEach(tunnelInterfaceName -> {
439             serviceHandler.accept(elanName, tunnelInterfaceName);
440         });
441         programEvpnL2vniFlow(elanInfo, flowHandler);
442     }
443
444     @SuppressWarnings({ "unchecked", "rawtypes" })
445     public <T extends DataObject> void asyncReadAndExecute(final LogicalDatastoreType datastoreType,
446             final InstanceIdentifier<T> iid, final String jobKey, final Function<Optional<T>, Void> function) {
447         jobCoordinator.enqueueJob(jobKey, () -> {
448             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
449             List futures = Collections.singletonList(settableFuture);
450
451             ReadWriteTransaction tx = broker.newReadWriteTransaction();
452
453             Futures.addCallback(tx.read(datastoreType, iid), new SettableFutureCallback<Optional<T>>(settableFuture) {
454                 @Override
455                 public void onSuccess(Optional<T> data) {
456                     function.apply(data);
457                     super.onSuccess(data);
458                 }
459             }, MoreExecutors.directExecutor());
460
461             return futures;
462         }, ElanConstants.JOB_MAX_RETRIES);
463     }
464 }