MRI version bumpup for Aluminium
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / evpn / utils / EvpnUtils.java
index cb9f99afa6d8f1cd2afab9367b775cc23f10a98f..01970cbcbd8bb896e7cd652f6865f917cf2badbf 100644 (file)
@@ -7,14 +7,15 @@
  */
 package org.opendaylight.netvirt.elan.evpn.utils;
 
-import com.google.common.base.Optional;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
@@ -23,11 +24,9 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
 import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
 import org.opendaylight.genius.itm.globals.ITMConstants;
@@ -42,13 +41,17 @@ import org.opendaylight.genius.mdsalutil.instructions.InstructionWriteMetadata;
 import org.opendaylight.genius.mdsalutil.matches.MatchTunnelId;
 import org.opendaylight.genius.utils.ServiceIndex;
 import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
+import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
 import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
 import org.opendaylight.netvirt.elan.utils.ElanConstants;
 import org.opendaylight.netvirt.elan.utils.ElanUtils;
 import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
 import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
-import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.vpn.instances.VpnInstance;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.interfacemanager.servicebinding.rev160406.service.bindings.services.info.BoundServices;
@@ -63,10 +66,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.Evpn
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.elan.instances.ElanInstance;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.elan.rev150602.forwarding.entries.MacEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.fibmanager.rev150330.VrfEntryBase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.l3vpn.rev200204.vpn.instances.VpnInstance;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.neutronvpn.rev150602.neutron.vpn.portip.port.data.VpnPortipToPort;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint32;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,24 +86,28 @@ public class EvpnUtils {
     private final Predicate<MacEntry> isIpv4PrefixAvailable = (macEntry) -> (macEntry != null
         && macEntry.getIpPrefix() != null && macEntry.getIpPrefix().getIpv4Address() != null);
     private final DataBroker broker;
+    private final ManagedNewTransactionRunner txRunner;
     private final IInterfaceManager interfaceManager;
     private final ElanUtils elanUtils;
     private final ItmRpcService itmRpcService;
     private final JobCoordinator jobCoordinator;
     private final IBgpManager bgpManager;
     private final IVpnManager vpnManager;
+    private final ElanInstanceCache elanInstanceCache;
 
     @Inject
     public EvpnUtils(DataBroker broker, IInterfaceManager interfaceManager, ElanUtils elanUtils,
             ItmRpcService itmRpcService, IVpnManager vpnManager, IBgpManager bgpManager,
-            JobCoordinator jobCoordinator) {
+            JobCoordinator jobCoordinator, ElanInstanceCache elanInstanceCache) {
         this.broker = broker;
+        this.txRunner = new ManagedNewTransactionRunnerImpl(broker);
         this.interfaceManager = interfaceManager;
         this.elanUtils = elanUtils;
         this.itmRpcService = itmRpcService;
         this.vpnManager = vpnManager;
         this.bgpManager = bgpManager;
         this.jobCoordinator = jobCoordinator;
+        this.elanInstanceCache = elanInstanceCache;
     }
 
     public boolean isWithdrawEvpnRT2Routes(ElanInstance original, ElanInstance update) {
@@ -120,7 +130,7 @@ public class EvpnUtils {
             return;
         }
         String rd = vpnManager.getVpnRd(broker, evpnName);
-        ElanInstance elanInfo = ElanUtils.getElanInstanceByName(broker, elanName);
+        ElanInstance elanInfo = elanInstanceCache.get(elanName).orElse(null);
         macEntries.stream().filter(isIpv4PrefixAvailable).forEach(macEntry -> {
             InterfaceInfo interfaceInfo = interfaceManager.getInterfaceInfo(macEntry.getInterface());
             if (interfaceInfo == null) {
@@ -133,7 +143,8 @@ public class EvpnUtils {
         });
     }
 
-    public String getEndpointIpAddressForDPN(BigInteger dpnId) {
+    @Nullable
+    public String getEndpointIpAddressForDPN(Uint64 dpnId) {
 
         Future<RpcResult<GetDpnEndpointIpsOutput>> result = itmRpcService.getDpnEndpointIps(
                 new GetDpnEndpointIpsInputBuilder()
@@ -143,10 +154,10 @@ public class EvpnUtils {
         try {
             rpcResult = result.get();
         } catch (InterruptedException e) {
-            LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", e, dpnId);
+            LOG.error("getnextHopIpFromRpcOutput : InterruptedException for dpnid {}", dpnId, e);
             return null;
         } catch (ExecutionException e) {
-            LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", e, dpnId);
+            LOG.error("getnextHopIpFromRpcOutput : ExecutionException for dpnid {}", dpnId, e);
             return null;
         }
         if (!rpcResult.isSuccessful()) {
@@ -166,24 +177,27 @@ public class EvpnUtils {
                 : interfaceManager.getInterfaceInfoFromOperationalDataStore(ifName).getMacAddress());
     }
 
+    @Nullable
     public String getL3vpnNameFromElan(ElanInstance elanInfo) {
         if (elanInfo == null) {
             LOG.debug("getL3vpnNameFromElan :elanInfo is NULL");
             return null;
         }
-        EvpnAugmentation evpnAugmentation = elanInfo.getAugmentation(EvpnAugmentation.class);
+        EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
         return evpnAugmentation != null ? evpnAugmentation.getL3vpnName() : null;
     }
 
+    @Nullable
     public static String getEvpnNameFromElan(ElanInstance elanInfo) {
         if (elanInfo == null) {
             LOG.debug("getEvpnNameFromElan :elanInfo is NULL");
             return null;
         }
-        EvpnAugmentation evpnAugmentation = elanInfo.getAugmentation(EvpnAugmentation.class);
+        EvpnAugmentation evpnAugmentation = elanInfo.augmentation(EvpnAugmentation.class);
         return evpnAugmentation != null ? evpnAugmentation.getEvpnName() : null;
     }
 
+    @Nullable
     public String getEvpnRd(ElanInstance elanInfo) {
         String evpnName = getEvpnNameFromElan(elanInfo);
         if (evpnName == null) {
@@ -194,14 +208,14 @@ public class EvpnUtils {
     }
 
     public void advertisePrefix(ElanInstance elanInfo, String macAddress, String prefix,
-                                 String interfaceName, BigInteger dpnId) {
+                                 String interfaceName, Uint64 dpnId) {
         String rd = getEvpnRd(elanInfo);
         advertisePrefix(elanInfo, rd, macAddress, prefix, interfaceName, dpnId);
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     public void advertisePrefix(ElanInstance elanInfo, String rd,
-                                 String macAddress, String prefix, String interfaceName, BigInteger dpnId) {
+                                 String macAddress, String prefix, String interfaceName, Uint64 dpnId) {
         if (rd == null) {
             LOG.debug("advertisePrefix : rd is NULL for elanInfo {}, macAddress {}", elanInfo, macAddress);
             return;
@@ -211,15 +225,15 @@ public class EvpnUtils {
             LOG.debug("Failed to get the dpn tep ip for dpn {}", dpnId);
             return;
         }
-        int vpnLabel = 0;
-        long l2vni = elanInfo.getSegmentationId();
-        long l3vni = 0;
+        Uint32 vpnLabel = Uint32.ZERO;
+        Uint32 l2vni = ElanUtils.getVxlanSegmentationId(elanInfo);
+        Uint32 l3vni = Uint32.ZERO;
         String gatewayMacAddr = null;
         String l3VpName = getL3vpnNameFromElan(elanInfo);
         if (l3VpName != null) {
             VpnInstance l3VpnInstance = vpnManager.getVpnInstance(broker, l3VpName);
             l3vni = l3VpnInstance.getL3vni();
-            com.google.common.base.Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
+            Optional<String> gatewayMac = getGatewayMacAddressForInterface(l3VpName,
                     interfaceName, prefix);
             gatewayMacAddr = gatewayMac.isPresent() ? gatewayMac.get() : null;
 
@@ -260,7 +274,7 @@ public class EvpnUtils {
         String evpnName = evpnAugmentation.getEvpnName();
         String rd = vpnManager.getVpnRd(broker, evpnName);
         if (rd == null) {
-            LOG.debug("withdrawEvpnRT2Routes : rd is null ", elanName);
+            LOG.debug("withdrawEvpnRT2Routes : rd is null for {}", elanName);
             return;
         }
         List<MacEntry> macEntries = elanUtils.getElanMacEntries(elanName);
@@ -305,11 +319,11 @@ public class EvpnUtils {
         ExternalTunnelList externalTunnelList = null;
         try {
             externalTunnelList = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
-                    externalTunnelListId).orNull();
-        } catch (ReadFailedException e) {
+                    externalTunnelListId).orElse(null);
+        } catch (InterruptedException | ExecutionException e) {
             LOG.error("getExternalTunnelList: unable to read ExternalTunnelList, exception ", e);
         }
-        return Optional.fromNullable(externalTunnelList);
+        return Optional.ofNullable(externalTunnelList);
     }
 
     public static InstanceIdentifier<DcGatewayIpList> getDcGatewayIpListIdentifier() {
@@ -322,11 +336,11 @@ public class EvpnUtils {
         DcGatewayIpList dcGatewayIpListConfig = null;
         try {
             dcGatewayIpListConfig = elanUtils.read2(LogicalDatastoreType.CONFIGURATION,
-                    dcGatewayIpListInstanceIdentifier).orNull();
-        } catch (ReadFailedException e) {
+                    dcGatewayIpListInstanceIdentifier).orElse(null);
+        } catch (InterruptedException | ExecutionException e) {
             LOG.error("getDcGatewayTunnelInterfaceNameList: unable to read DcGatewayTunnelList, exception ", e);
         }
-        return Optional.fromNullable(dcGatewayIpListConfig);
+        return Optional.ofNullable(dcGatewayIpListConfig);
     }
 
     public List<String> getDcGatewayTunnelInterfaceNameList() {
@@ -336,17 +350,18 @@ public class EvpnUtils {
             LOG.info("No DC gateways configured while programming the l2vni table.");
             return tunnelInterfaceNameList;
         }
-        List<DcGatewayIp> dcGatewayIps = dcGatewayIpListOptional.get().getDcGatewayIp();
+        List<DcGatewayIp> dcGatewayIps
+                = new ArrayList<DcGatewayIp>(dcGatewayIpListOptional.get().nonnullDcGatewayIp().values());
 
         Optional<ExternalTunnelList> externalTunnelListOptional = getExternalTunnelList();
         if (!externalTunnelListOptional.isPresent()) {
             LOG.info("No External Tunnel Configured while programming the l2vni table.");
             return tunnelInterfaceNameList;
         }
-        List<ExternalTunnel> externalTunnels = externalTunnelListOptional.get().getExternalTunnel();
+        List<ExternalTunnel> externalTunnels
+                = new ArrayList<ExternalTunnel>(externalTunnelListOptional.get().nonnullExternalTunnel().values());
 
-        dcGatewayIps
-                .forEach(dcIp -> externalTunnels
+        dcGatewayIps.forEach(dcIp -> externalTunnels
                 .stream()
                 .filter(externalTunnel -> externalTunnel.getDestinationDevice()
                         .contains(dcIp.getIpAddress().getIpv4Address().toString()))
@@ -356,85 +371,86 @@ public class EvpnUtils {
     }
 
     public void bindElanServiceToExternalTunnel(String elanName, String interfaceName) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        int instructionKey = 0;
-        LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
-        List<Instruction> instructions = new ArrayList<>();
-        instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
-                NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
-        short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
-        BoundServices serviceInfo = ElanUtils.getBoundServices(
-                ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
-                NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
-        InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-        Optional<BoundServices> existingElanService = elanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
-                bindServiceId);
-        if (!existingElanService.isPresent()) {
-            tx.put(LogicalDatastoreType.CONFIGURATION, bindServiceId, serviceInfo, true);
-            tx.submit();
-        }
+        LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+            int instructionKey = 0;
+            LOG.trace("Binding external interface {} elan {}", interfaceName, elanName);
+            List<Instruction> instructions = new ArrayList<>();
+            instructions.add(MDSALUtil.buildAndGetGotoTableInstruction(
+                    NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, ++instructionKey));
+            short elanServiceIndex =
+                    ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
+            BoundServices serviceInfo = ElanUtils.getBoundServices(
+                    ElanUtils.getElanServiceName(elanName, interfaceName), elanServiceIndex,
+                    NwConstants.ELAN_SERVICE_INDEX, NwConstants.COOKIE_ELAN_INGRESS_TABLE, instructions);
+            InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
+            if (!tx.read(bindServiceId).get().isPresent()) {
+                tx.mergeParentStructurePut(bindServiceId, serviceInfo);
+            }
+        }), LOG, "Error binding an ELAN service to an external tunnel");
     }
 
     public void unbindElanServiceFromExternalTunnel(String elanName, String interfaceName) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
-        short elanServiceIndex = ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
-        InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
-        Optional<BoundServices> existingElanService = elanUtils.read(broker, LogicalDatastoreType.CONFIGURATION,
-                bindServiceId);
-        if (!existingElanService.isPresent()) {
-            tx.delete(LogicalDatastoreType.CONFIGURATION, bindServiceId);
-            tx.submit();
-        }
+        LoggingFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(CONFIGURATION, tx -> {
+            LOG.trace("UnBinding external interface {} elan {}", interfaceManager, elanName);
+            short elanServiceIndex =
+                    ServiceIndex.getIndex(NwConstants.ELAN_SERVICE_NAME, NwConstants.ELAN_SERVICE_INDEX);
+            InstanceIdentifier<BoundServices> bindServiceId = ElanUtils.buildServiceId(interfaceName, elanServiceIndex);
+            if (tx.read(bindServiceId).get().isPresent()) {
+                tx.delete(bindServiceId);
+            }
+        }), LOG, "Error binding an ELAN service to an external tunnel");
     }
 
-    private List<InstructionInfo> getInstructionsForExtTunnelTable(Long elanTag) {
+    private static List<InstructionInfo> getInstructionsForExtTunnelTable(Uint32 elanTag) {
         List<InstructionInfo> mkInstructions = new ArrayList<>();
-        mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag, false),
+        mkInstructions.add(new InstructionWriteMetadata(ElanUtils.getElanMetadataLabel(elanTag.longValue(), false),
                 ElanHelper.getElanMetadataMask()));
         mkInstructions.add(new InstructionGotoTable(NwConstants.ELAN_DMAC_TABLE));
         return mkInstructions;
     }
 
-    private String getFlowRef(long tableId, long elanTag, BigInteger dpnId) {
-        return new StringBuilder().append(tableId).append(elanTag).append(dpnId).toString();
+    private static String getFlowRef(long tableId, long elanTag, Uint64 dpnId) {
+        return new StringBuilder().append(tableId).append(elanTag).append(dpnId.toString()).toString();
     }
 
-    private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<BigInteger, FlowEntity> flowHandler) {
-        long elanTag = elanInfo.getElanTag();
+    private void programEvpnL2vniFlow(ElanInstance elanInfo, BiConsumer<Uint64, FlowEntity> flowHandler) {
+        Uint32 elanTag = elanInfo.getElanTag();
         List<MatchInfo> mkMatches = new ArrayList<>();
-        mkMatches.add(new MatchTunnelId(BigInteger.valueOf(elanInfo.getSegmentationId())));
-        NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
-            LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
-            List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
-            String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag, dpnId);
-            FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
-                    dpnId,
-                    NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
-                    flowRef,
-                    5, // prio
-                    elanInfo.getElanInstanceName(), // flowName
-                    0, // idleTimeout
-                    0, // hardTimeout
-                    ITMConstants.COOKIE_ITM_EXTERNAL.add(BigInteger.valueOf(elanTag)),
-                    mkMatches,
-                    instructions);
-            flowHandler.accept(dpnId, flowEntity);
-        });
+        mkMatches.add(new MatchTunnelId(Uint64.valueOf(ElanUtils.getVxlanSegmentationId(elanInfo).longValue())));
+        try {
+            NWUtil.getOperativeDPNs(broker).forEach(dpnId -> {
+                LOG.debug("Updating tunnel flow to dpnid {}", dpnId);
+                List<InstructionInfo> instructions = getInstructionsForExtTunnelTable(elanTag);
+                String flowRef = getFlowRef(NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE, elanTag.longValue(), dpnId);
+                FlowEntity flowEntity = MDSALUtil.buildFlowEntity(
+                        dpnId,
+                        NwConstants.L2VNI_EXTERNAL_TUNNEL_DEMUX_TABLE,
+                        flowRef,
+                        5, // prio
+                        elanInfo.getElanInstanceName(), // flowName
+                        0, // idleTimeout
+                        0, // hardTimeout
+                        Uint64.valueOf(ITMConstants.COOKIE_ITM_EXTERNAL.longValue() + elanTag.longValue()),
+                        mkMatches,
+                        instructions);
+                flowHandler.accept(dpnId, flowEntity);
+            });
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("programEvpnL2vniFlow: Exception while programming Evpn L2vni flow for elanInstance {}",
+                    elanInfo, e);
+        }
     }
 
     public void programEvpnL2vniDemuxTable(String elanName, final BiConsumer<String, String> serviceHandler,
-                                           BiConsumer<BigInteger, FlowEntity> flowHandler) {
-        ElanInstance elanInfo = ElanUtils.getElanInstanceByName(broker, elanName);
+                                           BiConsumer<Uint64, FlowEntity> flowHandler) {
+        ElanInstance elanInfo = elanInstanceCache.get(elanName).orElse(null);
         List<String> tunnelInterfaceNameList = getDcGatewayTunnelInterfaceNameList();
         if (tunnelInterfaceNameList.isEmpty()) {
             LOG.info("No DC gateways tunnels while programming l2vni table for elan {}.", elanName);
             return;
         }
 
-        tunnelInterfaceNameList.forEach(tunnelInterfaceName -> {
-            serviceHandler.accept(elanName, tunnelInterfaceName);
-        });
+        tunnelInterfaceNameList.forEach(tunnelInterfaceName -> serviceHandler.accept(elanName, tunnelInterfaceName));
         programEvpnL2vniFlow(elanInfo, flowHandler);
     }
 
@@ -445,17 +461,18 @@ public class EvpnUtils {
             SettableFuture<Optional<T>> settableFuture = SettableFuture.create();
             List futures = Collections.singletonList(settableFuture);
 
-            ReadWriteTransaction tx = broker.newReadWriteTransaction();
-
-            Futures.addCallback(tx.read(datastoreType, iid), new SettableFutureCallback<Optional<T>>(settableFuture) {
-                @Override
-                public void onSuccess(Optional<T> data) {
-                    function.apply(data);
-                    super.onSuccess(data);
-                }
-            }, MoreExecutors.directExecutor());
-
-            return futures;
+            try (ReadTransaction tx = broker.newReadOnlyTransaction()) {
+                Futures.addCallback(tx.read(datastoreType, iid),
+                        new SettableFutureCallback<Optional<T>>(settableFuture) {
+                            @Override
+                            public void onSuccess(Optional<T> data) {
+                                function.apply(data);
+                                super.onSuccess(data);
+                            }
+                        }, MoreExecutors.directExecutor());
+
+                return futures;
+            }
         }, ElanConstants.JOB_MAX_RETRIES);
     }
 }