2 * Copyright © 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netvirt.elan.evpn.utils;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import com.google.common.util.concurrent.SettableFuture;
14 import java.math.BigInteger;
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Future;
20 import java.util.function.BiConsumer;
21 import java.util.function.BiPredicate;
22 import java.util.function.Function;
23 import java.util.function.Predicate;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
31 import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
33 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
34 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
35 import org.opendaylight.genius.itm.globals.ITMConstants;
36 import org.opendaylight.genius.mdsalutil.FlowEntity;
37 import org.opendaylight.genius.mdsalutil.InstructionInfo;
38 import org.opendaylight.genius.mdsalutil.MDSALUtil;
39 import org.opendaylight.genius.mdsalutil.MatchInfo;
40 import org.opendaylight.genius.mdsalutil.NWUtil;
41 import org.opendaylight.genius.mdsalutil.NwConstants;
42 import org.opendaylight.genius.mdsalutil.instructions.InstructionGotoTable;
43 import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
44 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
45 import org.opendaylight.genius.utils.ServiceIndex;
46 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
47 import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
48 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
49 import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
50 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
51 import org.opendaylight.netvirt.elan.utils.ElanConstants;
52 import org.opendaylight.netvirt.elan.utils.ElanUtils;
53 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
54 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
55 import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
56 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.ExternalTunnelList;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.op.rev160406.external.tunnel.list.ExternalTunnel;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.DcGatewayIpList;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rev160406.dc.gateway.ip.list.DcGatewayIp;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsInputBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.GetDpnEndpointIpsOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.itm.rpcs.rev160406.ItmRpcService;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.EvpnAugmentation;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
71 import org.opendaylight.yangtools.yang.binding.DataObject;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.RpcResult;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
78 public class EvpnUtils {
80 private static final Logger LOG = LoggerFactory.getLogger(EvpnUtils.class);
82 private final BiPredicate<String, String> isNetAttach = (var1, var2) -> (var1 == null && var2 != null);
83 private final BiPredicate<String, String> isNetDetach = (var1, var2) -> (var1 != null && var2 == null);
84 private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
85 && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
86 private final DataBroker broker;
87 private final ManagedNewTransactionRunner txRunner;
88 private final IInterfaceManager interfaceManager;
89 private final ElanUtils elanUtils;
90 private final ItmRpcService itmRpcService;
91 private final JobCoordinator jobCoordinator;
92 private final IBgpManager bgpManager;
93 private final IVpnManager vpnManager;
94 private final ElanInstanceCache elanInstanceCache;
97 public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
98 ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
99 JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
100 this.broker = broker;
101 this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
102 this.interfaceManager = interfaceManager;
103 this.elanUtils = elanUtils;
104 this.itmRpcService = itmRpcService;
105 this.vpnManager = vpnManager;
106 this.bgpManager = bgpManager;
107 this.jobCoordinator = jobCoordinator;
108 this.elanInstanceCache = elanInstanceCache;
111 public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
112 return isNetDetach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
115 public boolean isAdvertiseEvpnRT2Routes(ElanInstance original, ElanInstance update) {
116 return isNetAttach.test(getEvpnNameFromElan(original), getEvpnNameFromElan(update));
119 @SuppressWarnings("checkstyle:IllegalCatch")
120 public void advertiseEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
121 if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
124 String evpnName = evpnAugmentation.getEvpnName();
125 List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
126 if (macEntries == null || macEntries.isEmpty()) {
127 LOG.trace("advertiseEvpnRT2Routes no elan mac entries found for {}", elanName);
130 String rd = vpnManager.getVpnRd(broker, evpnName);
131 ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
132 macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
133 InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
134 if (interfaceInfo == null) {
135 LOG.debug("advertiseEvpnRT2Routes, interfaceInfo is null for interface {}", macEntry.getInterface());
138 advertisePrefix(elanInfo, rd, macEntry.getMacAddress().getValue(),
139 macEntry.getIpPrefix().getIpv4Address().getValue(),
140 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
144 public String getEndpointIpAddressForDPN(BigInteger dpnId) {
146 Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
147 new GetDpnEndpointIpsInputBuilder()
148 .setSourceDpid(dpnId)
150 RpcResult<GetDpnEndpointIpsOutput> rpcResult = null;
152 rpcResult = result.get();
153 } catch (InterruptedException e) {
154 LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", e, dpnId);
156 } catch (ExecutionException e) {
157 LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", e, dpnId);
160 if (!rpcResult.isSuccessful()) {
161 LOG.warn("RPC Call to getDpnEndpointIps returned with Errors {}", rpcResult.getErrors());
165 List<IpAddress> nexthopIpList = rpcResult.getResult().getNexthopipList();
166 return nexthopIpList.get(0).getIpv4Address().getValue();
169 public Optional<String> getGatewayMacAddressForInterface(String vpnName,
170 String ifName, String ipAddress) {
171 VpnPortipToPort gwPort = vpnManager.getNeutronPortFromVpnPortFixedIp(broker, vpnName, ipAddress);
172 return Optional.of(gwPort != null && gwPort.isSubnetIp()
173 ? gwPort.getMacAddress()
174 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
177 public String getL3vpnNameFromElan(ElanInstance elanInfo) {
178 if (elanInfo == null) {
179 LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
182 EvpnAugmentation evpnAugmentation = elanInfo.getAugmentation(EvpnAugmentation.class);
183 return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
186 public static String getEvpnNameFromElan(ElanInstance elanInfo) {
187 if (elanInfo == null) {
188 LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
191 EvpnAugmentation evpnAugmentation = elanInfo.getAugmentation(EvpnAugmentation.class);
192 return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
195 public String getEvpnRd(ElanInstance elanInfo) {
196 String evpnName = getEvpnNameFromElan(elanInfo);
197 if (evpnName == null) {
198 LOG.debug("getEvpnRd : evpnName is NULL for elanInfo {}", elanInfo);
201 return vpnManager.getVpnRd(broker, evpnName);
204 public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
205 String interfaceName, BigInteger dpnId) {
206 String rd = getEvpnRd(elanInfo);
207 advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
210 @SuppressWarnings("checkstyle:IllegalCatch")
211 public void advertisePrefix(ElanInstance elanInfo, String rd,
212 String macAddress, String prefix, String interfaceName, BigInteger dpnId) {
214 LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
217 String nextHop = getEndpointIpAddressForDPN(dpnId);
218 if (nextHop == null) {
219 LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
223 long l2vni = elanUtils.getVxlanSegmentationId(elanInfo);
225 String gatewayMacAddr = null;
226 String l3VpName = getL3vpnNameFromElan(elanInfo);
227 if (l3VpName != null) {
228 VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
229 l3vni = l3VpnInstance.getL3vni();
230 com.google.common.base.Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
231 interfaceName, prefix);
232 gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
235 LOG.info("Advertising routes with rd {}, macAddress {}, prefix {}, nextHop {},"
236 + " vpnLabel {}, l3vni {}, l2vni {}, gatewayMac {}", rd, macAddress, prefix, nextHop,
237 vpnLabel, l3vni, l2vni, gatewayMacAddr);
239 bgpManager.advertisePrefix(rd, macAddress, prefix, nextHop,
240 VrfEntryBase.EncapType.Vxlan, vpnLabel, l3vni, l2vni, gatewayMacAddr);
241 } catch (Exception e) {
242 LOG.error("Failed to advertisePrefix", e);
246 public void advertisePrefix(ElanInstance elanInfo, MacEntry macEntry) {
247 InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
248 if (interfaceInfo == null) {
249 LOG.debug("advertisePrefix, interfaceInfo is null for interface {}", macEntry.getInterface());
253 if (!isIpv4PrefixAvailable.test(macEntry)) {
254 LOG.debug("advertisePrefix macEntry does not have IPv4 prefix {}", macEntry);
257 advertisePrefix(elanInfo, macEntry.getMacAddress().getValue(),
258 macEntry.getIpPrefix().getIpv4Address().getValue(),
259 interfaceInfo.getInterfaceName(), interfaceInfo.getDpId());
262 public void withdrawEvpnRT2Routes(EvpnAugmentation evpnAugmentation, String elanName) {
263 if (evpnAugmentation == null || evpnAugmentation.getEvpnName() == null) {
264 LOG.trace("withdrawEvpnRT2Routes, evpnAugmentation is null");
268 String evpnName = evpnAugmentation.getEvpnName();
269 String rd = vpnManager.getVpnRd(broker, evpnName);
271 LOG.debug("withdrawEvpnRT2Routes : rd is null ", elanName);
274 List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
275 if (macEntries == null || macEntries.isEmpty()) {
276 LOG.debug("withdrawEvpnRT2Routes : macEntries is empty for elan {} ", elanName);
279 for (MacEntry macEntry : macEntries) {
280 if (!isIpv4PrefixAvailable.test(macEntry)) {
281 LOG.debug("withdrawEvpnRT2Routes macEntry does not have IPv4 prefix {}", macEntry);
284 String prefix = macEntry.getIpPrefix().getIpv4Address().getValue();
285 LOG.info("Withdrawing routes with rd {}, prefix {}", rd, prefix);
286 bgpManager.withdrawPrefix(rd, prefix);
290 public void withdrawPrefix(ElanInstance elanInfo, String prefix) {
291 String rd = getEvpnRd(elanInfo);
295 bgpManager.withdrawPrefix(rd, prefix);
298 public void withdrawPrefix(ElanInstance elanInfo, MacEntry macEntry) {
299 if (!isIpv4PrefixAvailable.test(macEntry)) {
300 LOG.debug("withdrawPrefix macEntry does not have IPv4 prefix {}", macEntry);
303 withdrawPrefix(elanInfo, macEntry.getIpPrefix().getIpv4Address().getValue());
306 public static InstanceIdentifier<ExternalTunnelList> getExternaTunnelListIdentifier() {
307 return InstanceIdentifier
308 .builder(ExternalTunnelList.class).build();
311 public Optional<ExternalTunnelList> getExternalTunnelList() {
312 InstanceIdentifier<ExternalTunnelList> externalTunnelListId = getExternaTunnelListIdentifier();
313 ExternalTunnelList externalTunnelList = null;
315 externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
316 externalTunnelListId).orNull();
317 } catch (ReadFailedException e) {
318 LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
320 return Optional.fromNullable(externalTunnelList);
323 public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
324 return InstanceIdentifier
325 .builder(DcGatewayIpList.class).build();
328 public Optional<DcGatewayIpList> getDcGatewayIpList() {
329 InstanceIdentifier<DcGatewayIpList> dcGatewayIpListInstanceIdentifier = getDcGatewayIpListIdentifier();
330 DcGatewayIpList dcGatewayIpListConfig = null;
332 dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
333 dcGatewayIpListInstanceIdentifier).orNull();
334 } catch (ReadFailedException e) {
335 LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
337 return Optional.fromNullable(dcGatewayIpListConfig);
340 public List<String> getDcGatewayTunnelInterfaceNameList() {
341 final List<String> tunnelInterfaceNameList = new ArrayList<>();
342 Optional<DcGatewayIpList> dcGatewayIpListOptional = getDcGatewayIpList();
343 if (!dcGatewayIpListOptional.isPresent()) {
344 LOG.info("No DC gateways configured while programming the l2vni table.");
345 return tunnelInterfaceNameList;
347 List<DcGatewayIp> dcGatewayIps = dcGatewayIpListOptional.get().getDcGatewayIp();
349 Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
350 if (!externalTunnelListOptional.isPresent()) {
351 LOG.info("No External Tunnel Configured while programming the l2vni table.");
352 return tunnelInterfaceNameList;
354 List<ExternalTunnel> externalTunnels = externalTunnelListOptional.get().getExternalTunnel();
357 .forEach(dcIp -> externalTunnels
359 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
360 .contains(dcIp.getIpAddress().getIpv4Address().toString()))
361 .forEach(externalTunnel -> tunnelInterfaceNameList.add(externalTunnel.getTunnelInterfaceName())));
363 return tunnelInterfaceNameList;
366 public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
367 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
368 int instructionKey = 0;
369 LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
370 List<Instruction> instructions = new ArrayList<>();
371 instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
372 NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
373 short elanServiceIndex =
374 ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
375 BoundServices serviceInfo = ElanUtils.getBoundServices(
376 ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
377 NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
378 InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
379 if (!tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
380 tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo,
381 WriteTransaction.CREATE_MISSING_PARENTS);
383 }), LOG, "Error binding an ELAN service to an external tunnel");
386 public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
387 ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
388 LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
389 short elanServiceIndex =
390 ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
391 InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
392 if (tx.read(LogicalDatastoreType.CONFIGURATION, bindServiceId).checkedGet().isPresent()) {
393 tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
395 }), LOG, "Error binding an ELAN service to an external tunnel");
398 private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
399 List<InstructionInfo> mkInstructions = new ArrayList<>();
400 mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag, false),
401 ElanHelper.getElanMetadataMask()));
402 mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
403 return mkInstructions;
406 private String getFlowRef(long tableId, long elanTag, BigInteger dpnId) {
407 return new StringBuilder().append(tableId).append(elanTag).append(dpnId).toString();
410 private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<BigInteger, FlowEntity> flowHandler) {
411 long elanTag = elanInfo.getElanTag();
412 List<MatchInfo> mkMatches = new ArrayList<>();
413 mkMatches.add(new MatchTunnelId(BigInteger.valueOf(elanUtils.getVxlanSegmentationId(elanInfo))));
414 NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
415 LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
416 List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
417 String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag, dpnId);
418 FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
420 NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
423 elanInfo.getElanInstanceName(), // flowName
426 ITMConstants.COOKIE_ITM_EXTERNAL.add(BigInteger.valueOf(elanTag)),
429 flowHandler.accept(dpnId, flowEntity);
433 public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
434 BiConsumer<BigInteger, FlowEntity> flowHandler) {
435 ElanInstance elanInfo = elanInstanceCache.get(elanName).orNull();
436 List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
437 if (tunnelInterfaceNameList.isEmpty()) {
438 LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
442 tunnelInterfaceNameList.forEach(tunnelInterfaceName -> {
443 serviceHandler.accept(elanName, tunnelInterfaceName);
445 programEvpnL2vniFlow(elanInfo, flowHandler);
448 @SuppressWarnings({ "unchecked", "rawtypes" })
449 public <T extends DataObject> void asyncReadAndExecute(final LogicalDatastoreType datastoreType,
450 final InstanceIdentifier<T> iid, final String jobKey, final Function<Optional<T>, Void> function) {
451 jobCoordinator.enqueueJob(jobKey, () -> {
452 SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
453 List futures = Collections.singletonList(settableFuture);
455 try (ReadOnlyTransaction tx = broker.newReadOnlyTransaction()) {
456 Futures.addCallback(tx.read(datastoreType, iid),
457 new SettableFutureCallback<Optional<T>>(settableFuture) {
459 public void onSuccess(Optional<T> data) {
460 function.apply(data);
461 super.onSuccess(data);
463 }, MoreExecutors.directExecutor());
467 }, ElanConstants.JOB_MAX_RETRIES);