<parent>
<groupId>org.opendaylight.netvirt</groupId>
- <artifactId>binding-parent</artifactId>
+ <artifactId>managed-tx-parent</artifactId>
<version>0.7.0-SNAPSHOT</version>
- <relativePath>../../commons/binding-parent</relativePath>
+ <relativePath>../../commons/managed-tx-parent</relativePath>
</parent>
<artifactId>neutronvpn-impl</artifactId>
import com.google.common.base.Optional;
import com.google.common.base.Strings;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
final List<FixedIps> portIpsList = port.getFixedIps();
jobCoordinator.enqueueJob("PORT- " + portName,
() -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(confTx -> {
- List<ListenableFuture<Void>> futures = new ArrayList<>();
if (!(NeutronUtils.isPortVnicTypeNormal(port) || isPortTypeSwitchdev(port))) {
for (FixedIps ip : portIpsList) {
// remove direct port from subnetMaps config DS
package org.opendaylight.netvirt.neutronvpn;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana._if.type.rev140508.L2vlan;
private static final Logger LOG = LoggerFactory.getLogger(NeutronTrunkChangeListener.class);
private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
private final IInterfaceManager ifMgr;
private final JobCoordinator jobCoordinator;
public NeutronTrunkChangeListener(final DataBroker dataBroker, final IInterfaceManager ifMgr,
final JobCoordinator jobCoordinator) {
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.ifMgr = ifMgr;
this.jobCoordinator = jobCoordinator;
}
// Should we use parentName?
jobCoordinator.enqueueJob("PORT- " + portName, () -> {
Interface iface = ifMgr.getInterfaceInfoFromConfigDataStore(portName);
- List<ListenableFuture<Void>> futures = new ArrayList<>();
if (iface == null) {
/*
* Trunk creation requires NeutronPort to be present, by this time interface
* node as this one. Use of DSJC helps ensure the order.
*/
LOG.warn("Interface not present for Trunk SubPort: {}", subPort);
- return futures;
+ return Collections.emptyList();
}
InterfaceBuilder interfaceBuilder = new InterfaceBuilder();
IfL2vlan ifL2vlan = new IfL2vlanBuilder().setL2vlanMode(IfL2vlan.L2vlanMode.TrunkMember)
SplitHorizon splitHorizon = new SplitHorizonBuilder().setOverrideSplitHorizonProtection(true).build();
interfaceBuilder.setName(portName).setType(L2vlan.class).addAugmentation(IfL2vlan.class, ifL2vlan)
.addAugmentation(ParentRefs.class, parentRefs).addAugmentation(SplitHorizon.class, splitHorizon);
- iface = interfaceBuilder.build();
+ Interface newIface = interfaceBuilder.build();
/*
* Interface is already created for parent NeutronPort. We're updating parent refs
* and VLAN Information
*/
- WriteTransaction txn = dataBroker.newWriteOnlyTransaction();
- txn.merge(LogicalDatastoreType.CONFIGURATION, interfaceIdentifier, iface);
- LOG.trace("Creating trunk member interface {}", iface);
- futures.add(txn.submit());
- return futures;
+ return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ tx.merge(LogicalDatastoreType.CONFIGURATION, interfaceIdentifier, newIface);
+ LOG.trace("Creating trunk member interface {}", newIface);
+ }));
});
}
NeutronvpnUtils.buildVlanInterfaceIdentifier(subPort.getPortId().getValue());
jobCoordinator.enqueueJob("PORT- " + portName, () -> {
Interface iface = ifMgr.getInterfaceInfoFromConfigDataStore(portName);
- List<ListenableFuture<Void>> futures = new ArrayList<>();
if (iface == null) {
LOG.warn("Interface not present for SubPort {}", subPort);
- return futures;
+ return Collections.emptyList();
}
/*
* We'll reset interface back to way it was? Can IFM handle parentRef delete?
.removeAugmentation(SplitHorizon.class);
IfL2vlan ifL2vlan = new IfL2vlanBuilder().setL2vlanMode(IfL2vlan.L2vlanMode.Trunk).build();
interfaceBuilder.addAugmentation(IfL2vlan.class, ifL2vlan);
- iface = interfaceBuilder.build();
+ Interface newIface = interfaceBuilder.build();
/*
* There is no means to do an update to remove elements from a node.
* Our solution is to get existing iface, remove parentRef and VlanId,
* and this being subport delete path, don't expect any significant changes to
* corresponding Neutron Port. Deletion of NeutronPort should follow soon enough.
*/
- WriteTransaction txn = dataBroker.newWriteOnlyTransaction();
- txn.put(LogicalDatastoreType.CONFIGURATION, interfaceIdentifier, iface);
- LOG.trace("Resetting trunk member interface {}", iface);
- futures.add(txn.submit());
- return futures;
+ return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ tx.put(LogicalDatastoreType.CONFIGURATION, interfaceIdentifier, newIface);
+ LOG.trace("Resetting trunk member interface {}", newIface);
+ }));
});
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
private static long LOCK_WAIT_TIME = 10L;
private final DataBroker dataBroker;
- private final ManagedNewTransactionRunner managedNewTransactionRunner;
+ private final ManagedNewTransactionRunner txRunner;
private final NotificationPublishService notificationPublishService;
private final VpnRpcService vpnRpcService;
private final NeutronFloatingToFixedIpMappingChangeListener floatingIpMapListener;
final NeutronvpnConfig neutronvpnConfig, final IVpnManager vpnManager,
final JobCoordinator jobCoordinator, final NeutronvpnUtils neutronvpnUtils) {
this.dataBroker = dataBroker;
- this.managedNewTransactionRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
notificationPublishService = notiPublishService;
vpnRpcService = vpnRpcSrv;
this.elanService = elanService;
// TODO Clean up the exception handling
@SuppressWarnings("checkstyle:IllegalCatch")
- protected boolean deleteVpnInterface(String infName, @Nullable String vpnId, WriteTransaction wrtConfigTxn) {
+ protected void deleteVpnInterface(String infName, @Nullable String vpnId, WriteTransaction wrtConfigTxn) {
+ if (wrtConfigTxn == null) {
+ ListenableFutures.addErrorLogging(
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> deleteVpnInterface(infName, vpnId, tx)),
+ LOG, "Error deleting VPN interface {} {}", infName, vpnId);
+ return;
+ }
+
InstanceIdentifier<VpnInterface> vpnIfIdentifier =
NeutronvpnUtils.buildVpnInterfaceIdentifier(infName);
- Optional<VpnInterface> optionalVpnInterface = null;
+ Optional<VpnInterface> optionalVpnInterface;
try {
optionalVpnInterface =
SingleTransactionDataBroker.syncReadOptional(dataBroker, LogicalDatastoreType.CONFIGURATION,
vpnIfIdentifier);
} catch (ReadFailedException ex) {
LOG.error("Error during deletion of vpninterface {}", infName, ex);
- return false;
+ return;
}
if (!optionalVpnInterface.isPresent()) {
LOG.warn("Deletion of vpninterface {}, optionalVpnInterface is not present()", infName);
- return false;
- }
- boolean wrtConfigTxnPresent = true;
- if (wrtConfigTxn == null) {
- wrtConfigTxnPresent = false;
- wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
+ return;
}
if (vpnId != null) {
VpnInterface vpnInterface = optionalVpnInterface.get();
&& VpnHelper.doesVpnInterfaceBelongToVpnInstance(vpnId, vpnList)) {
VpnHelper.removeVpnInterfaceVpnInstanceNamesFromList(vpnId, vpnList);
if (!vpnList.isEmpty()) {
- if (!wrtConfigTxnPresent) {
- wrtConfigTxn.submit();
- }
LOG.debug("Deleting vpn interface {} not immediately since vpnInstanceName "
+ "List not empty", infName);
- return false;
+ return;
}
VpnInterfaceBuilder vpnIfBuilder = new VpnInterfaceBuilder(optionalVpnInterface.get())
.setVpnInstanceNames(vpnList);
}
LOG.debug("Deleting vpn interface {}", infName);
wrtConfigTxn.delete(LogicalDatastoreType.CONFIGURATION, vpnIfIdentifier);
- if (!wrtConfigTxnPresent) {
- wrtConfigTxn.submit();
- }
- return true;
}
protected void removeVpnFromVpnInterface(Uuid vpnId, Port port,
final Boolean isRouterInterface = port.getDeviceOwner()
.equals(NeutronConstants.DEVICE_OWNER_ROUTER_INF) ? true : false;
jobCoordinator.enqueueJob("PORT-" + portId.getValue(), () -> singletonList(
- managedNewTransactionRunner.callWithNewWriteOnlyTransactionAndSubmit(wrtConfigTxn -> {
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(wrtConfigTxn -> {
Adjacencies portAdj = createPortIpAdjacencies(port, isRouterInterface, wrtConfigTxn, sn,
vpnIface);
if (vpnIface == null) {
for (final Uuid portId : portList) {
LOG.debug("withdrawing subnet IP {} from vpn-interface {}", sn.getSubnetIp(), portId.getValue());
final Port port = neutronvpnUtils.getNeutronPort(portId);
- jobCoordinator.enqueueJob("PORT-" + portId.getValue(), () -> {
- WriteTransaction wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
- List<ListenableFuture<Void>> futures = new ArrayList<>();
- if (port != null) {
- withdrawPortIpFromVpnIface(vpnId, internetId, port, sn, wrtConfigTxn);
- } else {
- LOG.warn("Cannot proceed with withdrawPortIpFromVpnIface for port {} in subnet {} since "
- + "port is absent in Neutron config DS", portId.getValue(), subnet.getValue());
- }
- futures.add(wrtConfigTxn.submit());
- return futures;
- });
+ jobCoordinator.enqueueJob("PORT-" + portId.getValue(),
+ () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ if (port != null) {
+ withdrawPortIpFromVpnIface(vpnId, internetId, port, sn, tx);
+ } else {
+ LOG.warn(
+ "Cannot proceed with withdrawPortIpFromVpnIface for port {} in subnet {} since "
+ + "port is absent in Neutron config DS", portId.getValue(),
+ subnet.getValue());
+ }
+ })));
}
}
//update subnet-vpn association
}
jobCoordinator.enqueueJob("VPN-" + vpn.getValue(), () -> singletonList(
- managedNewTransactionRunner.callWithNewWriteOnlyTransactionAndSubmit(wrtConfigTxn -> {
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(wrtConfigTxn -> {
if (isBeingAssociated) {
updateVpnInterface(vpn, null, neutronvpnUtils.getNeutronPort(sm.getRouterInterfacePortId()),
true, true, wrtConfigTxn);
for (Uuid port : portList) {
LOG.debug("Updating vpn-interface for port {} isBeingAssociated {}",
port.getValue(), isBeingAssociated);
- jobCoordinator.enqueueJob("PORT-" + port.getValue(), () -> {
- WriteTransaction wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
- List<ListenableFuture<Void>> futures = new ArrayList<>();
- if (isBeingAssociated) {
- updateVpnInterface(vpn, null, neutronvpnUtils.getNeutronPort(port),
- isBeingAssociated, false, wrtConfigTxn);
- } else {
- removeVpnFromVpnInterface(vpn,
- neutronvpnUtils.getNeutronPort(port), wrtConfigTxn, sm);
- }
- futures.add(wrtConfigTxn.submit());
- return futures;
- });
+ jobCoordinator.enqueueJob("PORT-" + port.getValue(),
+ () -> Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ if (isBeingAssociated) {
+ updateVpnInterface(vpn, null, neutronvpnUtils.getNeutronPort(port),
+ true, false, tx);
+ } else {
+ removeVpnFromVpnInterface(vpn, neutronvpnUtils.getNeutronPort(port), tx, sm);
+ }
+ })));
}
}
}
//Update Router Interface first synchronously.
//CAUTION: Please DONOT make the router interface VPN Movement as an asynchronous commit again !
- try {
- WriteTransaction wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
- updateVpnInterface(newVpnId, oldVpnId,
- neutronvpnUtils.getNeutronPort(sn.getRouterInterfacePortId()),
- isBeingAssociated, true, wrtConfigTxn);
- wrtConfigTxn.submit().checkedGet();
- } catch (TransactionCommitFailedException e) {
- LOG.error("Failed to update router interface {} in subnet {} from oldVpnId {} to newVpnId {}, returning",
- sn.getRouterInterfacePortId().getValue(), subnet.getValue(), oldVpnId, newVpnId);
- return sn;
- }
+ ListenableFuture<Void> future =
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> updateVpnInterface(newVpnId, oldVpnId,
+ neutronvpnUtils.getNeutronPort(sn.getRouterInterfacePortId()),
+ isBeingAssociated, true, tx));
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ // Check for ports on this subnet and update association of
+ // corresponding vpn-interfaces to external vpn
+ List<Uuid> portList = sn.getPortList();
+ if (portList != null) {
+ for (Uuid port : portList) {
+ LOG.debug("Updating vpn-interface for port {} isBeingAssociated {}",
+ port.getValue(), isBeingAssociated);
+ jobCoordinator.enqueueJob("PORT-" + port.getValue(), () -> Collections.singletonList(
+ txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+ tx -> updateVpnInterface(newVpnId, oldVpnId,
+ neutronvpnUtils.getNeutronPort(port), isBeingAssociated, false,
+ tx))));
+ }
+ }
+ }
- // Check for ports on this subnet and update association of
- // corresponding vpn-interfaces to external vpn
- List<Uuid> portList = sn.getPortList();
- if (portList != null) {
- for (Uuid port : portList) {
- LOG.debug("Updating vpn-interface for port {} isBeingAssociated {}",
- port.getValue(), isBeingAssociated);
- jobCoordinator.enqueueJob("PORT-" + port.getValue(), () -> {
- WriteTransaction wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
- List<ListenableFuture<Void>> futures = new ArrayList<>();
- updateVpnInterface(newVpnId, oldVpnId, neutronvpnUtils.getNeutronPort(port),
- isBeingAssociated, false, wrtConfigTxn);
- futures.add(wrtConfigTxn.submit());
- return futures;
- });
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error(
+ "Failed to update router interface {} in subnet {} from oldVpnId {} to newVpnId {}, "
+ + "returning",
+ sn.getRouterInterfacePortId().getValue(), subnet.getValue(), oldVpnId, newVpnId, throwable);
}
- }
+ }, MoreExecutors.directExecutor());
+
return sn;
}
return;
}
- WriteTransaction wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
- for (String elanInterface : extElanInterfaces) {
- createExternalVpnInterface(extNetId, elanInterface, wrtConfigTxn);
- }
- wrtConfigTxn.submit();
+ ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ for (String elanInterface : extElanInterfaces) {
+ createExternalVpnInterface(extNetId, elanInterface, tx);
+ }
+ }), LOG, "Error creating external VPN interfaces for {}", extNetId);
}
// TODO Clean up the exception handling
LOG.error("No external ports attached for external network {}", extNetId.getValue());
return;
}
- try {
-
- WriteTransaction wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
+ ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
for (String elanInterface : extElanInterfaces) {
InstanceIdentifier<VpnInterface> vpnIfIdentifier = NeutronvpnUtils
.buildVpnInterfaceIdentifier(elanInterface);
LOG.info("Removing vpn interface {}", elanInterface);
- wrtConfigTxn.delete(LogicalDatastoreType.CONFIGURATION, vpnIfIdentifier);
+ tx.delete(LogicalDatastoreType.CONFIGURATION, vpnIfIdentifier);
}
- wrtConfigTxn.submit();
-
- } catch (Exception ex) {
- LOG.error("Removal of vpninterfaces {} failed", extElanInterfaces, ex);
- }
+ }), LOG, "Error removing external VPN interfaces for {}", extNetId);
}
private void createExternalVpnInterface(Uuid vpnId, String infName, WriteTransaction wrtConfigTxn) {
LOG.error("vpn id or interface is null");
return;
}
+ if (wrtConfigTxn == null) {
+ ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+ tx -> writeVpnInterfaceToDs(vpnIdList, infName, adjacencies, isRouterInterface, tx)), LOG,
+ "Error writing VPN interface");
+ return;
+ }
List<VpnInstanceNames> vpnIdListStruct = new ArrayList<>();
for (Uuid vpnId: vpnIdList) {
VpnInstanceNames vpnInstance = VpnHelper.getVpnInterfaceVpnInstanceNames(vpnId.getValue(),
AssociatedSubnetType.V4AndV6Subnets);
vpnIdListStruct.add(vpnInstance);
}
- Boolean wrtConfigTxnPresent = true;
- if (wrtConfigTxn == null) {
- wrtConfigTxnPresent = false;
- wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
- }
InstanceIdentifier<VpnInterface> vpnIfIdentifier = NeutronvpnUtils.buildVpnInterfaceIdentifier(infName);
VpnInterfaceBuilder vpnb = new VpnInterfaceBuilder().setKey(new VpnInterfaceKey(infName))
} catch (Exception ex) {
LOG.error("Creation of vpninterface {} failed", infName, ex);
}
-
- if (!wrtConfigTxnPresent) {
- wrtConfigTxn.submit();
- }
}
private void updateVpnInterfaceWithAdjacencies(Uuid vpnId, String infName, Adjacencies adjacencies,
LOG.error("vpn id or interface is null");
return;
}
-
if (wrtConfigTxn == null) {
- wrtConfigTxn = dataBroker.newWriteOnlyTransaction();
+ ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ updateVpnInterfaceWithAdjacencies(vpnId, infName, adjacencies, tx);
+ }), LOG, "Error updating VPN interface with adjacencies");
+ return;
}
+
InstanceIdentifier<VpnInterface> vpnIfIdentifier = NeutronvpnUtils.buildVpnInterfaceIdentifier(infName);
boolean isLockAcquired = false;
try {
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.genius.mdsalutil.MDSALUtil;
import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
import org.opendaylight.netvirt.neutronvpn.api.enums.IpVersionChoice;
import org.opendaylight.netvirt.neutronvpn.api.utils.NeutronUtils;
import org.opendaylight.yang.gen.v1.urn.huawei.params.xml.ns.yang.l3vpn.rev140815.VpnInstances;
private final ConcurrentMap<Uuid, HashMap<Uuid, Network>> qosNetworksMap = new ConcurrentHashMap<>();
private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
private final IdManagerService idManager;
private final JobCoordinator jobCoordinator;
private IPV6InternetDefaultRouteProgrammer ipV6InternetDefRt;
public NeutronvpnUtils(final DataBroker dataBroker, final IdManagerService idManager,
final JobCoordinator jobCoordinator, final IPV6InternetDefaultRouteProgrammer ipV6InternetDefRt) {
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.idManager = idManager;
this.jobCoordinator = jobCoordinator;
this.ipV6InternetDefRt = ipV6InternetDefRt;
if (isFinalVpnInstanceIpv6Changed) {
builder.setIpv6Configured(finalIsIpv6Configured);
}
- WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
- InstanceIdentifier<VpnInstanceOpDataEntry> id = InstanceIdentifier.builder(VpnInstanceOpData.class)
- .child(VpnInstanceOpDataEntry.class,
- new VpnInstanceOpDataEntryKey(vpnInstanceOpDataEntry.getVrfId())).build();
- writeTxn.merge(LogicalDatastoreType.OPERATIONAL, id, builder.build(), false);
- LOG.info("updateVpnInstanceWithIpFamily: Successfully {} {} to Vpn {}",
- add ? "added" : "removed",
- ipVersion.toString(), vpnName);
- return Collections.singletonList(writeTxn.submit());
+ return Collections.singletonList(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ InstanceIdentifier<VpnInstanceOpDataEntry> id = InstanceIdentifier.builder(VpnInstanceOpData.class)
+ .child(VpnInstanceOpDataEntry.class,
+ new VpnInstanceOpDataEntryKey(vpnInstanceOpDataEntry.getVrfId())).build();
+ tx.merge(LogicalDatastoreType.OPERATIONAL, id, builder.build(), false);
+ LOG.info("updateVpnInstanceWithIpFamily: Successfully {} {} to Vpn {}",
+ add ? "added" : "removed",
+ ipVersion.toString(), vpnName);
+ }));
});
- return;
}
/**
}
VpnInstanceOpDataEntryBuilder builder = new VpnInstanceOpDataEntryBuilder(vpnInstanceOpDataEntry);
builder.setBgpvpnType(choice);
- WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
- writeTxn.merge(LogicalDatastoreType.OPERATIONAL, id, builder.build(), false);
- LOG.debug("updateVpnInstanceOpWithType: sent merge to operDS BgpvpnType {} for {}", choice, vpn.getValue());
- try {
- writeTxn.submit().get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("updateVpnInstanceOpWithType: on merge execution, error: {}", e);
- }
- return;
+ ListenableFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(tx -> {
+ tx.merge(LogicalDatastoreType.OPERATIONAL, id, builder.build(), false);
+ LOG.debug("updateVpnInstanceOpWithType: sent merge to operDS BgpvpnType {} for {}", choice, vpn.getValue());
+ }), LOG, "Error updating VPN instance op {} with type {}", vpn, choice);
}
@Nonnull
package org.opendaylight.netvirt.neutronvpn.evpn.utils;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.ListenableFutures;
import org.opendaylight.netvirt.elanmanager.api.ElanHelper;
import org.opendaylight.netvirt.vpnmanager.api.IVpnManager;
import org.opendaylight.netvirt.vpnmanager.api.VpnHelper;
}
private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
private final IVpnManager vpnManager;
private final JobCoordinator jobCoordinator;
public NeutronEvpnUtils(DataBroker broker, IVpnManager vpnManager, JobCoordinator jobCoordinator) {
this.dataBroker = broker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.vpnManager = vpnManager;
this.jobCoordinator = jobCoordinator;
}
public void updateElanWithVpnInfo(String elanInstanceName, VpnInstance vpnInstance, Operation operation) {
String vpnName = vpnInstance.getVpnInstanceName();
InstanceIdentifier<ElanInstance> elanIid = ElanHelper.getElanInstanceConfigurationDataPath(elanInstanceName);
- ReadWriteTransaction transaction = dataBroker.newReadWriteTransaction();
- Optional<ElanInstance> elanInstanceOptional = Optional.absent();
- try {
- elanInstanceOptional = transaction.read(LogicalDatastoreType.CONFIGURATION, elanIid).checkedGet();
- } catch (ReadFailedException e) {
- LOG.error("updateElanWithVpnInfo throws ReadFailedException e {}", e);
- }
- if (!elanInstanceOptional.isPresent()) {
- return;
- }
+ ListenableFutures.addErrorLogging(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+ Optional<ElanInstance> elanInstanceOptional =
+ tx.read(LogicalDatastoreType.CONFIGURATION, elanIid).checkedGet();
+ if (!elanInstanceOptional.isPresent()) {
+ return;
+ }
- EvpnAugmentationBuilder evpnAugmentationBuilder = new EvpnAugmentationBuilder();
- ElanInstanceBuilder elanInstanceBuilder = new ElanInstanceBuilder(elanInstanceOptional.get());
- if (elanInstanceBuilder.getAugmentation(EvpnAugmentation.class) != null) {
- evpnAugmentationBuilder =
- new EvpnAugmentationBuilder(elanInstanceBuilder.getAugmentation(EvpnAugmentation.class));
- }
- if (operation == Operation.ADD) {
- evpnAugmentationBuilder.setEvpnName(vpnName);
- LOG.debug("Writing Elan-EvpnAugmentation with key {}", elanInstanceName);
- } else {
- evpnAugmentationBuilder.setEvpnName(null);
- LOG.debug("Deleting Elan-EvpnAugmentation with key {}", elanInstanceName);
- }
+ EvpnAugmentationBuilder evpnAugmentationBuilder = new EvpnAugmentationBuilder();
+ ElanInstanceBuilder elanInstanceBuilder = new ElanInstanceBuilder(elanInstanceOptional.get());
+ if (elanInstanceBuilder.getAugmentation(EvpnAugmentation.class) != null) {
+ evpnAugmentationBuilder =
+ new EvpnAugmentationBuilder(elanInstanceBuilder.getAugmentation(EvpnAugmentation.class));
+ }
+ if (operation == Operation.ADD) {
+ evpnAugmentationBuilder.setEvpnName(vpnName);
+ LOG.debug("Writing Elan-EvpnAugmentation with key {}", elanInstanceName);
+ } else {
+ evpnAugmentationBuilder.setEvpnName(null);
+ LOG.debug("Deleting Elan-EvpnAugmentation with key {}", elanInstanceName);
+ }
- elanInstanceBuilder.addAugmentation(EvpnAugmentation.class, evpnAugmentationBuilder.build());
- transaction.put(LogicalDatastoreType.CONFIGURATION, elanIid, elanInstanceBuilder.build(),
- WriteTransaction.CREATE_MISSING_PARENTS);
- transaction.submit();
+ elanInstanceBuilder.addAugmentation(EvpnAugmentation.class, evpnAugmentationBuilder.build());
+ tx.put(LogicalDatastoreType.CONFIGURATION, elanIid, elanInstanceBuilder.build(),
+ WriteTransaction.CREATE_MISSING_PARENTS);
+ }), LOG, "Error updating ELAN with VPN info {}, {}, {}", elanInstanceName, vpnInstance, operation);
}
public void updateVpnWithElanInfo(VpnInstance vpnInstance, String elanInstanceName, Operation operation) {
InstanceIdentifier<EvpnRdToNetwork> rdToNetworkIdentifier = getRdToNetworkIdentifier(rd);
- jobCoordinator.enqueueJob("EVPN_ASSOCIATE-" + rd, () -> {
- ReadWriteTransaction transaction = dataBroker.newReadWriteTransaction();
- List<ListenableFuture<Void>> futures = new ArrayList<>();
- if (operation == Operation.DELETE) {
- LOG.debug("Deleting Evpn-Network with key {}", rd);
- transaction.delete(LogicalDatastoreType.CONFIGURATION, rdToNetworkIdentifier);
- } else {
- EvpnRdToNetworkBuilder evpnRdToNetworkBuilder = new EvpnRdToNetworkBuilder().setKey(
- new EvpnRdToNetworkKey(rd));
- evpnRdToNetworkBuilder.setRd(rd);
- evpnRdToNetworkBuilder.setNetworkId(elanInstanceName);
- LOG.info("updating Evpn {} with elaninstance {} and rd {}",
- vpnInstance.getVpnInstanceName(), elanInstanceName, rd);
- transaction.put(LogicalDatastoreType.CONFIGURATION, rdToNetworkIdentifier,
- evpnRdToNetworkBuilder.build(), WriteTransaction.CREATE_MISSING_PARENTS);
- }
- futures.add(transaction.submit());
- return futures;
- });
+ jobCoordinator.enqueueJob("EVPN_ASSOCIATE-" + rd,
+ () -> Collections.singletonList(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+ if (operation == Operation.DELETE) {
+ LOG.debug("Deleting Evpn-Network with key {}", rd);
+ tx.delete(LogicalDatastoreType.CONFIGURATION, rdToNetworkIdentifier);
+ } else {
+ EvpnRdToNetworkBuilder evpnRdToNetworkBuilder = new EvpnRdToNetworkBuilder().setKey(
+ new EvpnRdToNetworkKey(rd));
+ evpnRdToNetworkBuilder.setRd(rd);
+ evpnRdToNetworkBuilder.setNetworkId(elanInstanceName);
+ LOG.info("updating Evpn {} with elaninstance {} and rd {}",
+ vpnInstance.getVpnInstanceName(), elanInstanceName, rd);
+ tx.put(LogicalDatastoreType.CONFIGURATION, rdToNetworkIdentifier,
+ evpnRdToNetworkBuilder.build(), WriteTransaction.CREATE_MISSING_PARENTS);
+ }
+ })));
}
public void updateElanAndVpn(VpnInstance vpnInstance, String subnetVpn, Operation operation) {
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.genius.mdsalutil.MDSALUtil;
import org.opendaylight.genius.utils.SystemPropertyReader;
import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
public class L2GatewayListener extends AsyncClusteredDataTreeChangeListenerBase<L2gateway, L2GatewayListener> {
private static final Logger LOG = LoggerFactory.getLogger(L2GatewayListener.class);
private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
private final ItmRpcService itmRpcService;
private final IL2gwService l2gwService;
private final EntityOwnershipUtils entityOwnershipUtils;
final ItmRpcService itmRpcService, final IL2gwService l2gwService,
final JobCoordinator jobCoordinator, final L2GatewayCache l2GatewayCache) {
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.entityOwnershipUtils = new EntityOwnershipUtils(entityOwnershipService);
this.itmRpcService = itmRpcService;
this.l2gwService = l2gwService;
LOG.info("Removing L2gateway with ID: {}", input.getUuid());
List<L2gatewayConnection> connections = l2gwService
.getL2GwConnectionsByL2GatewayId(input.getUuid());
- try {
- ReadWriteTransaction tx = this.dataBroker.newReadWriteTransaction();
+ Futures.addCallback(txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
for (L2gatewayConnection connection : connections) {
InstanceIdentifier<L2gatewayConnection> iid = InstanceIdentifier.create(Neutron.class)
.child(L2gatewayConnections.class).child(L2gatewayConnection.class, connection.getKey());
tx.delete(LogicalDatastoreType.CONFIGURATION, iid);
}
- tx.submit().checkedGet();
- } catch (TransactionCommitFailedException e) {
- LOG.error("Failed to delete associated l2gwconnection while deleting l2gw {} with id beacause of {}",
- input.getUuid(), e.getLocalizedMessage());
- //TODO :retry
- }
- List<Devices> l2Devices = input.getDevices();
- for (Devices l2Device : l2Devices) {
- LOG.trace("Removing L2gateway device: {}", l2Device);
- removeL2Device(l2Device, input);
- }
+ }), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ List<Devices> l2Devices = input.getDevices();
+ for (Devices l2Device : l2Devices) {
+ LOG.trace("Removing L2gateway device: {}", l2Device);
+ removeL2Device(l2Device, input);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Failed to delete associated l2gwconnection while deleting l2gw {} with id",
+ input.getUuid(), throwable);
+ }
+ }, MoreExecutors.directExecutor());
}
@Override
return;
}
jobCoordinator.enqueueJob("l2gw.update", () -> {
- ReadWriteTransaction transaction = dataBroker.newReadWriteTransaction();
- DeviceInterfaces updatedDeviceInterfaces = new DeviceInterfaces(update);
- List<ListenableFuture<Void>> fts = new ArrayList<>();
- original.getDevices()
- .stream()
- .filter((originalDevice) -> originalDevice.getInterfaces() != null)
- .forEach((originalDevice) -> {
- String deviceName = originalDevice.getDeviceName();
- L2GatewayDevice l2GwDevice = l2GatewayCache.get(deviceName);
- NodeId physicalSwitchNodeId = HwvtepSouthboundUtils.createManagedNodeId(
- new NodeId(l2GwDevice.getHwvtepNodeId()), deviceName);
- originalDevice.getInterfaces()
- .stream()
- .filter((intf) -> !updatedDeviceInterfaces.containsInterface(
- deviceName, intf.getInterfaceName()))
- .forEach((intf) -> connections.forEach((connection) -> {
- Integer vlanId = connection.getSegmentId();
- if (intf.getSegmentationIds() != null
- && !intf.getSegmentationIds().isEmpty()) {
- for (Integer vlan : intf.getSegmentationIds()) {
- HwvtepUtils.deleteVlanBinding(transaction,
- physicalSwitchNodeId, intf.getInterfaceName(), vlan);
+ ListenableFuture<Void> future = txRunner.callWithNewReadWriteTransactionAndSubmit(tx -> {
+ DeviceInterfaces updatedDeviceInterfaces = new DeviceInterfaces(update);
+ original.getDevices()
+ .stream()
+ .filter((originalDevice) -> originalDevice.getInterfaces() != null)
+ .forEach((originalDevice) -> {
+ String deviceName = originalDevice.getDeviceName();
+ L2GatewayDevice l2GwDevice = l2GatewayCache.get(deviceName);
+ NodeId physicalSwitchNodeId = HwvtepSouthboundUtils.createManagedNodeId(
+ new NodeId(l2GwDevice.getHwvtepNodeId()), deviceName);
+ originalDevice.getInterfaces()
+ .stream()
+ .filter((intf) -> !updatedDeviceInterfaces.containsInterface(
+ deviceName, intf.getInterfaceName()))
+ .forEach((intf) -> connections.forEach((connection) -> {
+ Integer vlanId = connection.getSegmentId();
+ if (intf.getSegmentationIds() != null
+ && !intf.getSegmentationIds().isEmpty()) {
+ for (Integer vlan : intf.getSegmentationIds()) {
+ HwvtepUtils.deleteVlanBinding(tx,
+ physicalSwitchNodeId, intf.getInterfaceName(), vlan);
+ }
+ } else {
+ LOG.debug("Deleting vlan binding {} {} {}",
+ physicalSwitchNodeId, intf.getInterfaceName(), vlanId);
+ HwvtepUtils.deleteVlanBinding(tx, physicalSwitchNodeId,
+ intf.getInterfaceName(), vlanId);
}
- } else {
- LOG.debug("Deleting vlan binding {} {} {}",
- physicalSwitchNodeId, intf.getInterfaceName(), vlanId);
- HwvtepUtils.deleteVlanBinding(transaction, physicalSwitchNodeId,
- intf.getInterfaceName(), vlanId);
- }
- }));
- });
- fts.add(transaction.submit());
- Futures.addCallback(fts.get(0), new FutureCallback<Void>() {
+ }));
+ });
+ });
+ Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void success) {
LOG.debug("Successfully deleted vlan bindings for l2gw update {}", update);
LOG.error("Failed to delete vlan bindings as part of l2gw udpate {}", update);
}
}, MoreExecutors.directExecutor());
- return fts;
+ return Collections.singletonList(future);
}, SystemPropertyReader.getDataStoreJobCoordinatorMaxRetries());
}