*/
package org.opendaylight.netvirt.vpnmanager;
-import com.google.common.base.Optional;
+import static java.util.Collections.emptyList;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import javax.annotation.PostConstruct;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
+import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
+import org.opendaylight.genius.infra.Datastore;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
+import org.opendaylight.genius.utils.JvmGlobalLocks;
import org.opendaylight.genius.utils.SystemPropertyReader;
import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.netvirt.bgpmanager.api.IBgpManager;
import org.opendaylight.netvirt.fibmanager.api.IFibManager;
import org.opendaylight.netvirt.vpnmanager.api.VpnExtraRouteHelper;
+import org.opendaylight.serviceutils.tools.listener.AbstractAsyncDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.urn.ericsson.params.xml.ns.yang.ebgp.rev150901.AddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.genius.idmanager.rev160406.IdManagerService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3nexthop.rev150409.L3nexthop;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3nexthop.rev150409.l3nexthop.VpnNexthops;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3nexthop.rev150409.l3nexthop.VpnNexthopsKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.VpnInstanceOpData;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.prefix.to._interface.VpnIds;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data.VpnInstanceOpDataEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.instance.op.data.vpn.instance.op.data.entry.vpntargets.VpnTarget;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.l3vpn.rev130911.vpn.to.extraroutes.Vpn;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Uint32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
-public class VpnOpStatusListener extends AsyncDataTreeChangeListenerBase<VpnInstanceOpDataEntry, VpnOpStatusListener> {
+public class VpnOpStatusListener extends AbstractAsyncDataTreeChangeListener<VpnInstanceOpDataEntry> {
private static final Logger LOG = LoggerFactory.getLogger(VpnOpStatusListener.class);
private final DataBroker dataBroker;
+ private final ManagedNewTransactionRunner txRunner;
private final IBgpManager bgpManager;
private final IdManagerService idManager;
private final IFibManager fibManager;
private final IMdsalApiManager mdsalManager;
private final VpnFootprintService vpnFootprintService;
private final JobCoordinator jobCoordinator;
+ private final VpnUtil vpnUtil;
@Inject
public VpnOpStatusListener(final DataBroker dataBroker, final IBgpManager bgpManager,
final IdManagerService idManager, final IFibManager fibManager,
final IMdsalApiManager mdsalManager, final VpnFootprintService vpnFootprintService,
- final JobCoordinator jobCoordinator) {
- super(VpnInstanceOpDataEntry.class, VpnOpStatusListener.class);
+ final JobCoordinator jobCoordinator, VpnUtil vpnUtil) {
+ super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(VpnInstanceOpData.class)
+ .child(VpnInstanceOpDataEntry.class), Executors
+ .newListeningSingleThreadExecutor("VpnOpStatusListener", LOG));
this.dataBroker = dataBroker;
+ this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.bgpManager = bgpManager;
this.idManager = idManager;
this.fibManager = fibManager;
this.mdsalManager = mdsalManager;
this.vpnFootprintService = vpnFootprintService;
this.jobCoordinator = jobCoordinator;
+ this.vpnUtil = vpnUtil;
+ start();
}
- @PostConstruct
public void start() {
LOG.info("{} start", getClass().getSimpleName());
- registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
}
@Override
- protected InstanceIdentifier<VpnInstanceOpDataEntry> getWildCardPath() {
- return InstanceIdentifier.create(VpnInstanceOpData.class).child(VpnInstanceOpDataEntry.class);
+ @PreDestroy
+ public void close() {
+ super.close();
+ Executors.shutdownAndAwaitTermination(getExecutorService());
}
- @Override
- protected VpnOpStatusListener getDataTreeChangeListener() {
- return VpnOpStatusListener.this;
- }
@Override
- protected void remove(InstanceIdentifier<VpnInstanceOpDataEntry> identifier, VpnInstanceOpDataEntry value) {
+ public void remove(InstanceIdentifier<VpnInstanceOpDataEntry> identifier, VpnInstanceOpDataEntry value) {
LOG.info("remove: Ignoring vpn Op {} with rd {}", value.getVpnInstanceName(), value.getVrfId());
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- protected void update(InstanceIdentifier<VpnInstanceOpDataEntry> identifier,
+ public void update(InstanceIdentifier<VpnInstanceOpDataEntry> identifier,
VpnInstanceOpDataEntry original, VpnInstanceOpDataEntry update) {
LOG.info("update: Processing update for vpn {} with rd {}", update.getVpnInstanceName(), update.getVrfId());
if (update.getVpnState() == VpnInstanceOpDataEntry.VpnState.PendingDelete
final String vpnName = update.getVpnInstanceName();
final List<String> rds = update.getRd();
String primaryRd = update.getVrfId();
- final long vpnId = VpnUtil.getVpnId(dataBroker, vpnName);
+ final Uint32 vpnId = vpnUtil.getVpnId(vpnName);
jobCoordinator.enqueueJob("VPN-" + update.getVpnInstanceName(), () -> {
- WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
- // Clean up VpnInstanceToVpnId from Config DS
- VpnUtil.removeVpnIdToVpnInstance(dataBroker, vpnId, writeTxn);
- VpnUtil.removeVpnInstanceToVpnId(dataBroker, vpnName, writeTxn);
- LOG.trace("Removed vpnIdentifier for rd{} vpnname {}", primaryRd, vpnName);
- // Clean up FIB Entries Config DS
- fibManager.removeVrfTable(dataBroker, primaryRd, null);
- // Clean up VPNExtraRoutes Operational DS
- if (VpnUtil.isBgpVpn(vpnName, primaryRd)) {
- if (update.getType() == VpnInstanceOpDataEntry.Type.L2) {
- rds.parallelStream().forEach(rd -> bgpManager.deleteVrf(rd, false, AddressFamily.L2VPN));
- }
- if (update.isIpv4Configured()) {
- rds.parallelStream().forEach(rd -> bgpManager.deleteVrf(rd, false, AddressFamily.IPV4));
- }
- if (update.isIpv6Configured()) {
- rds.parallelStream().forEach(rd -> bgpManager.deleteVrf(rd, false, AddressFamily.IPV6));
- }
- }
- InstanceIdentifier<Vpn> vpnToExtraroute = VpnExtraRouteHelper.getVpnToExtrarouteVpnIdentifier(vpnName);
- Optional<Vpn> optVpnToExtraroute = VpnUtil.read(dataBroker,
- LogicalDatastoreType.OPERATIONAL, vpnToExtraroute);
- if (optVpnToExtraroute.isPresent()) {
- VpnUtil.removeVpnExtraRouteForVpn(dataBroker, vpnName, writeTxn);
- }
+ // Two transactions are used, one for operational, one for config; we only submit the config
+ // transaction if the operational transaction succeeds
+ ListenableFuture<Void> operationalFuture = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+ Datastore.OPERATIONAL, operTx -> {
+ // Clean up VPNExtraRoutes Operational DS
+ if (rds != null && VpnUtil.isBgpVpn(vpnName, primaryRd)) {
+ if (update.getType() == VpnInstanceOpDataEntry.Type.L2) {
+ rds.parallelStream().forEach(rd -> bgpManager.deleteVrf(
+ rd, false, AddressFamily.L2VPN));
+ }
+ if (update.getIpAddressFamilyConfigured()
+ == VpnInstanceOpDataEntry.IpAddressFamilyConfigured.Ipv4) {
+ rds.parallelStream().forEach(rd -> bgpManager.deleteVrf(
+ rd, false, AddressFamily.IPV4));
+ }
+ if (update.getIpAddressFamilyConfigured()
+ == VpnInstanceOpDataEntry.IpAddressFamilyConfigured.Ipv6) {
+ rds.parallelStream().forEach(rd -> bgpManager.deleteVrf(
+ rd, false, AddressFamily.IPV6));
+ }
+ if (update.getIpAddressFamilyConfigured()
+ == VpnInstanceOpDataEntry.IpAddressFamilyConfigured.Ipv4AndIpv6) {
+ rds.parallelStream()
+ .forEach(rd -> bgpManager.deleteVrf(
+ rd, false, AddressFamily.IPV4));
+ rds.parallelStream()
+ .forEach(rd -> bgpManager.deleteVrf(
+ rd, false, AddressFamily.IPV6));
+ }
+ }
+ InstanceIdentifier<Vpn> vpnToExtraroute =
+ VpnExtraRouteHelper.getVpnToExtrarouteVpnIdentifier(vpnName);
+ Optional<Vpn> optVpnToExtraroute = Optional.empty();
+ try {
+ optVpnToExtraroute = SingleTransactionDataBroker.syncReadOptional(dataBroker,
+ LogicalDatastoreType.OPERATIONAL, vpnToExtraroute);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("update: Failed to read VpnToExtraRoute for vpn {}", vpnName);
+ }
+ if (optVpnToExtraroute.isPresent()) {
+ VpnUtil.removeVpnExtraRouteForVpn(vpnName, operTx);
+ }
+ if (VpnUtil.isL3VpnOverVxLan(update.getL3vni())) {
+ vpnUtil.removeExternalTunnelDemuxFlows(vpnName);
+ }
+ // Clean up PrefixToInterface Operational DS
+ Optional<VpnIds> optPrefixToIntf = Optional.empty();
+ try {
+ optPrefixToIntf = SingleTransactionDataBroker.syncReadOptional(dataBroker,
+ LogicalDatastoreType.OPERATIONAL, VpnUtil.getPrefixToInterfaceIdentifier(vpnId));
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("update: Failed to read PrefixToInterface for vpn {}", vpnName);
+ }
+ if (optPrefixToIntf.isPresent()) {
+ VpnUtil.removePrefixToInterfaceForVpnId(vpnId, operTx);
+ }
+ // Clean up L3NextHop Operational DS
+ InstanceIdentifier<VpnNexthops> vpnNextHops = InstanceIdentifier.builder(L3nexthop.class).child(
+ VpnNexthops.class, new VpnNexthopsKey(vpnId)).build();
+ Optional<VpnNexthops> optL3nexthopForVpnId = Optional.empty();
+ try {
+ optL3nexthopForVpnId = SingleTransactionDataBroker.syncReadOptional(dataBroker,
+ LogicalDatastoreType.OPERATIONAL, vpnNextHops);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("update: Failed to read VpnNextHops for vpn {}", vpnName);
+ }
+ if (optL3nexthopForVpnId.isPresent()) {
+ VpnUtil.removeL3nexthopForVpnId(vpnId, operTx);
+ }
- if (VpnUtil.isL3VpnOverVxLan(update.getL3vni())) {
- VpnUtil.removeExternalTunnelDemuxFlows(vpnName, dataBroker, mdsalManager);
- }
+ // Clean up VPNInstanceOpDataEntry
+ VpnUtil.removeVpnOpInstance(primaryRd, operTx);
+ });
- // Clean up VPNInstanceOpDataEntry
- VpnUtil.removeVpnOpInstance(dataBroker, primaryRd, writeTxn);
- // Clean up PrefixToInterface Operational DS
- VpnUtil.removePrefixToInterfaceForVpnId(dataBroker, vpnId, writeTxn);
+ Futures.addCallback(operationalFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ Futures.addCallback(txRunner.callWithNewWriteOnlyTransactionAndSubmit(
+ Datastore.CONFIGURATION, confTx -> {
+ // Clean up VpnInstanceToVpnId from Config DS
+ VpnUtil.removeVpnIdToVpnInstance(vpnId, confTx);
+ VpnUtil.removeVpnInstanceToVpnId(vpnName, confTx);
+ LOG.trace("Removed vpnIdentifier for rd{} vpnname {}", primaryRd, vpnName);
- // Clean up L3NextHop Operational DS
- VpnUtil.removeL3nexthopForVpnId(dataBroker, vpnId, writeTxn);
+ // Clean up FIB Entries Config DS
+ // FIXME: separate out to somehow?
+ final ReentrantLock lock = JvmGlobalLocks.getLockForString(vpnName);
+ lock.lock();
+ try {
+ fibManager.removeVrfTable(primaryRd, confTx);
+ } finally {
+ lock.unlock();
+ }
+ }), new VpnOpStatusListener.PostDeleteVpnInstanceWorker(vpnName),
+ MoreExecutors.directExecutor());
+ // Note: Release the of VpnId will happen in PostDeleteVpnInstancWorker only if
+ // operationalTxn/Config succeeds.
+ }
- // Release the ID used for this VPN back to IdManager
- VpnUtil.releaseId(idManager, VpnConstants.VPN_IDPOOL_NAME, vpnName);
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.error("Error deleting VPN {}", vpnName, throwable);
+ }
+ }, MoreExecutors.directExecutor());
- List<ListenableFuture<Void>> futures = new ArrayList<>();
- futures.add(writeTxn.submit());
- return futures;
+ LOG.info("Removed vpn data for vpnname {}", vpnName);
+ return Collections.singletonList(operationalFuture);
}, SystemPropertyReader.getDataStoreJobCoordinatorMaxRetries());
} else if (update.getVpnState() == VpnInstanceOpDataEntry.VpnState.Created) {
final String vpnName = update.getVpnInstanceName();
- final List<String> rds = update.getRd();
String primaryRd = update.getVrfId();
if (!VpnUtil.isBgpVpn(vpnName, primaryRd)) {
return;
return;
}
jobCoordinator.enqueueJob("VPN-" + update.getVpnInstanceName(), () -> {
- WriteTransaction writeTxn = dataBroker.newWriteOnlyTransaction();
- long primaryRdAddFailed = rds.parallelStream().filter(rd -> {
+ //RD update case get only updated RD list
+ List<String> rds = update.getRd() != null ? new ArrayList<>(update.getRd()) : new ArrayList<>();
+ if (original.getRd() != null && original.getRd().size() != rds.size()) {
+ rds.removeAll(original.getRd());
+ }
+ rds.parallelStream().forEach(rd -> {
try {
+ List<String> importRTList = rd.equals(primaryRd) ? irtList : emptyList();
LOG.info("VpnOpStatusListener.update: updating BGPVPN for vpn {} with RD {}"
- + " Type is {}, IPv4 is {}, IPv6 is {}", vpnName, primaryRd, update.getType(),
- update.isIpv4Configured(), update.isIpv6Configured());
- if (update.getType() == VpnInstanceOpDataEntry.Type.L2) {
- bgpManager.addVrf(rd, irtList, ertList, AddressFamily.L2VPN);
- } else {
- bgpManager.deleteVrf(rd, false, AddressFamily.L2VPN);
+ + " Type is {}, IPtype is {}, iRT {}", vpnName, primaryRd, update.getType(),
+ update.getIpAddressFamilyConfigured(), importRTList);
+ int ipValue = VpnUtil.getIpFamilyValueToRemove(original,update);
+ switch (ipValue) {
+ case 4:
+ bgpManager.deleteVrf(rd, false, AddressFamily.IPV4);
+ break;
+ case 6:
+ bgpManager.deleteVrf(rd, false, AddressFamily.IPV6);
+ break;
+ case 10:
+ bgpManager.deleteVrf(rd, false, AddressFamily.IPV4);
+ bgpManager.deleteVrf(rd, false, AddressFamily.IPV6);
+ break;
+ default:
+ break;
}
- if (!original.isIpv4Configured() && update.isIpv4Configured()) {
- bgpManager.addVrf(rd, irtList, ertList, AddressFamily.IPV4);
- } else if (original.isIpv4Configured() && !update.isIpv4Configured()) {
- bgpManager.deleteVrf(rd, false, AddressFamily.IPV4);
+ /* Update vrf entry with newly added RD list. VPN does not support for
+ * deleting existing RDs
+ */
+ if (original.getRd().size() != update.getRd().size()) {
+ ipValue = VpnUtil.getIpFamilyValueToAdd(original,update);
+ switch (ipValue) {
+ case 4:
+ bgpManager.addVrf(rd, importRTList, ertList, AddressFamily.IPV4);
+ break;
+ case 6:
+ bgpManager.addVrf(rd, importRTList, ertList, AddressFamily.IPV6);
+ break;
+ case 10:
+ bgpManager.addVrf(rd, importRTList, ertList, AddressFamily.IPV4);
+ bgpManager.addVrf(rd, importRTList, ertList, AddressFamily.IPV6);
+ break;
+ default:
+ break;
+ }
}
- if (!original.isIpv6Configured() && update.isIpv6Configured()) {
- bgpManager.addVrf(rd, irtList, ertList, AddressFamily.IPV6);
- } else if (original.isIpv6Configured() && !update.isIpv6Configured()) {
- bgpManager.deleteVrf(rd, false, AddressFamily.IPV6);
- }
- } catch (Exception e) {
- LOG.error("VpnOpStatusListener.update: Exception when updating VRF to BGP"
- + " for vpn {} rd {}", vpnName, rd);
- return false;
+ } catch (RuntimeException e) {
+ LOG.error("VpnOpStatusListener.update: Exception when updating VRF to BGP for vpn {} rd {}",
+ vpnName, rd, e);
}
- return false;
- }).count();
- return Collections.emptyList();
+ });
+ return emptyList();
});
}
}
@Override
- protected void add(final InstanceIdentifier<VpnInstanceOpDataEntry> identifier,
+ public void add(final InstanceIdentifier<VpnInstanceOpDataEntry> identifier,
final VpnInstanceOpDataEntry value) {
LOG.debug("add: Ignoring vpn Op {} with rd {}", value.getVpnInstanceName(), value.getVrfId());
}
+
+ private class PostDeleteVpnInstanceWorker implements FutureCallback<Void> {
+ private final Logger log = LoggerFactory.getLogger(VpnOpStatusListener.PostDeleteVpnInstanceWorker.class);
+ String vpnName;
+
+ PostDeleteVpnInstanceWorker(String vpnName) {
+ this.vpnName = vpnName;
+ }
+
+ /**
+ * This implies that all the future instances have returned success.
+ * Release the ID used for VPN back to IdManager
+ */
+ @Override
+ public void onSuccess(Void ignored) {
+ vpnUtil.releaseId(VpnConstants.VPN_IDPOOL_NAME, vpnName);
+ log.info("onSuccess: VpnId for VpnName {} is released to IdManager successfully.", vpnName);
+ }
+
+ /**
+ * This method is used to handle failure callbacks.
+ */
+ @Override
+ public void onFailure(Throwable throwable) {
+ log.error("onFailure: Job for vpnInstance: {} failed with exception:",
+ vpnName , throwable);
+ }
+ }
}