import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.genius.datastoreutils.AsyncDataTreeChangeListenerBase;
import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
public class VpnInterfaceManager extends AsyncDataTreeChangeListenerBase<VpnInterface, VpnInterfaceManager> {
private static final Logger LOG = LoggerFactory.getLogger(VpnInterfaceManager.class);
- private static final int VPN_INF_UPDATE_TIMER_TASK_DELAY = 1000;
- private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
private static final short DJC_MAX_RETRIES = 3;
private final DataBroker dataBroker;
private final ConcurrentHashMap<String, Runnable> vpnIntfMap = new ConcurrentHashMap<>();
- private final BlockingQueue<UpdateData> vpnInterfacesUpdateQueue = new LinkedBlockingQueue<>();
- private final ScheduledExecutorService vpnInfUpdateTaskExecutor = Executors.newScheduledThreadPool(1);
-
private final Map<String, ConcurrentLinkedQueue<UnprocessedVpnInterfaceData>> unprocessedVpnInterfaces =
new ConcurrentHashMap<>();
this.vpnManager = vpnManager;
this.arpResponderHandler = arpResponderHandler;
this.jobCoordinator = jobCoordinator;
- vpnInfUpdateTaskExecutor.scheduleWithFixedDelay(new VpnInterfaceUpdateTimerTask(),
- 0, VPN_INF_UPDATE_TIMER_TASK_DELAY, TIME_UNIT);
vpnInstanceOpDataEntryCache = new DataObjectCache<>(VpnInstanceOpDataEntry.class, dataBroker,
LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(
@PreDestroy
public void close() {
super.close();
- vpnInfUpdateTaskExecutor.shutdown();
vpnInstanceOpDataEntryCache.close();
}
try {
final BigInteger dpnId = InterfaceUtils.getDpIdFromInterface(interfaceState);
final int ifIndex = interfaceState.getIfIndex();
- jobCoordinator.enqueueJob("VPNINTERFACE-" + interfaceName + vpnName, () -> {
+ jobCoordinator.enqueueJob("VPNINTERFACE-" + interfaceName, () -> {
WriteTransaction writeConfigTxn = dataBroker.newWriteOnlyTransaction();
WriteTransaction writeOperTxn = dataBroker.newWriteOnlyTransaction();
WriteTransaction writeInvTxn = dataBroker.newWriteOnlyTransaction();
return;
}
} else if (Boolean.TRUE.equals(vpnInterface.isRouterInterface())) {
- jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterface.getName() + vpnName,
+ jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterface.getName(),
() -> {
WriteTransaction writeConfigTxn = dataBroker.newWriteOnlyTransaction();
createFibEntryForRouterInterface(primaryRd, vpnInterface, interfaceName,
final VpnInterface vpnInterface, final String vpnName,
final String interfaceName) {
if (Boolean.TRUE.equals(vpnInterface.isRouterInterface())) {
- jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterface.getName() + vpnName, () -> {
+ jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterface.getName(), () -> {
WriteTransaction writeConfigTxn = dataBroker.newWriteOnlyTransaction();
deleteFibEntryForRouterInterface(vpnInterface, writeConfigTxn, vpnName);
LOG.info("remove: Router interface {} for vpn {}", interfaceName, vpnName);
LOG.info("remove: VPN Interface remove event - intfName {} vpn {} dpn {}" ,vpnInterface.getName(),
vpnName, vpnInterface.getDpnId());
removeInterfaceFromUnprocessedList(identifier, vpnInterface);
- jobCoordinator.enqueueJob("VPNINTERFACE-" + interfaceName + vpnName,
+ jobCoordinator.enqueueJob("VPNINTERFACE-" + interfaceName,
() -> {
List<ListenableFuture<Void>> futures = new ArrayList<>(3);
ListenableFuture<Void> configFuture = txRunner
LOG.info("VPN Interface update event - intfName {}", vpnInterfaceName);
//handles switching between <internal VPN - external VPN>
- for (VpnInstanceNames vpnInterfaceVpnInstance : original.getVpnInstanceNames()) {
- String oldVpnName = vpnInterfaceVpnInstance.getVpnName();
- if (oldVpnName != null && (update.getVpnInstanceNames() == null
- || !VpnHelper.doesVpnInterfaceBelongToVpnInstance(oldVpnName, update.getVpnInstanceNames()))) {
- UpdateData updateData = new UpdateData(identifier, original, update);
- vpnInterfacesUpdateQueue.add(updateData);
- LOG.info("update: UpdateData on VPNInterface {} on dpn {} update upon VPN swap from oldVpn(s) {}"
- + "to newVpn(s) {} added to update queue",
- updateData.getOriginal().getName(), dpnId,
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(original.getVpnInstanceNames()),
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(update.getVpnInstanceNames()));
- return;
- }
- }
- for (VpnInstanceNames vpnInterfaceVpnInstance : update.getVpnInstanceNames()) {
- String newVpnName = vpnInterfaceVpnInstance.getVpnName();
- String primaryRd = VpnUtil.getPrimaryRd(dataBroker, newVpnName);
- if (newVpnName != null && (original.getVpnInstanceNames() == null
- || !VpnHelper.doesVpnInterfaceBelongToVpnInstance(newVpnName,
- original.getVpnInstanceNames()))) {
- if (!VpnUtil.isVpnPendingDelete(dataBroker, primaryRd)) {
- InstanceIdentifier<VpnInterfaceOpDataEntry> opIdentifier = VpnUtil
- .getVpnInterfaceOpDataEntryIdentifier(vpnInterfaceName, newVpnName);
- if (canHandleNewVpnInterface(identifier, update, newVpnName)) {
- List<Adjacency> copyNewAdjs = new ArrayList<>(newAdjs);
- List<Adjacency> copyOldAdjs = new ArrayList<>(oldAdjs);
- addVpnInterfaceToVpn(opIdentifier, update, copyOldAdjs, copyNewAdjs, identifier, newVpnName);
- }
- } else {
- UpdateData updateData = new UpdateData(identifier, original, update);
- vpnInterfacesUpdateQueue.add(updateData);
- LOG.info("update: UpdateData on VPNInterface {} on dpn {} update upon VPN swap from oldVpn(s) {}"
- + "to newVpn(s) {} added to update queue",
- updateData.getOriginal().getName(), dpnId,
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(original.getVpnInstanceNames()),
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(update.getVpnInstanceNames()));
- return;
- }
- }
+ if (handleVpnSwapForVpnInterface(identifier, original, update)) {
+ LOG.info("update: handled VPNInterface {} on dpn {} update"
+ + "upon VPN swap from oldVpn(s) {} to newVpn(s) {}",
+ original.getName(), dpnId,
+ VpnHelper.getVpnInterfaceVpnInstanceNamesString(original.getVpnInstanceNames()),
+ VpnHelper.getVpnInterfaceVpnInstanceNamesString(update.getVpnInstanceNames()));
+ return;
}
for (VpnInstanceNames vpnInterfaceVpnInstance : update.getVpnInstanceNames()) {
String newVpnName = vpnInterfaceVpnInstance.getVpnName();
List<Adjacency> copyOldAdjs = new ArrayList<>(oldAdjs);
String primaryRd = VpnUtil.getPrimaryRd(dataBroker, newVpnName);
if (!VpnUtil.isVpnPendingDelete(dataBroker, primaryRd)) {
- jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterfaceName + newVpnName, () -> {
+ jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterfaceName, () -> {
WriteTransaction writeConfigTxn = dataBroker.newWriteOnlyTransaction();
WriteTransaction writeOperTxn = dataBroker.newWriteOnlyTransaction();
InstanceIdentifier<VpnInterfaceOpDataEntry> vpnInterfaceOpIdentifier =
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private class VpnInterfaceUpdateTimerTask implements Runnable {
- private final Logger log = LoggerFactory.getLogger(VpnInterfaceUpdateTimerTask.class);
-
- @Override
- public void run() {
- try {
- doRun();
- } catch (ReadFailedException | RuntimeException e) {
- LOG.error("VpnInterfaceUpdateTimerTask failed", e);
- }
- }
-
- private void doRun() throws ReadFailedException {
- List<UpdateData> processQueue = new ArrayList<>();
- List<UpdateData> updateDataList = new ArrayList<>();
- vpnInterfacesUpdateQueue.drainTo(processQueue);
- int maxInterfaceList = 0;
-
- for (UpdateData updData : processQueue) {
- final VpnInterfaceKey key = updData.getIdentifier()
- .firstKeyOf(VpnInterface.class, VpnInterfaceKey.class);
- final String interfaceName = key.getName();
- Interface interfaceState = InterfaceUtils.getInterfaceStateFromOperDS(dataBroker, interfaceName);
- for (VpnInstanceNames vpnInterfaceVpnInstance : updData.getOriginal().getVpnInstanceNames()) {
- String oldVpnName = vpnInterfaceVpnInstance.getVpnName();
- if (updData.getUpdate().getVpnInstanceNames() != null
- && VpnHelper.doesVpnInterfaceBelongToVpnInstance(oldVpnName,
- updData.getUpdate().getVpnInstanceNames())) {
- continue;
- }
- log.info("run: VPN Interface update event - intfName {} remove vpnName {} running"
- + " config-driven swap removal", updData.getOriginal().getName(), oldVpnName);
- maxInterfaceList ++;
- removeVpnInterfaceCall(updData.getIdentifier(), updData.getOriginal(),
- oldVpnName, interfaceName);
- log.info("run: Processed Remove for update on VPNInterface {} upon VPN swap from old vpn {}"
- + " to newVpn(s) {}", updData.getOriginal().getName(),
- oldVpnName, VpnHelper.getVpnInterfaceVpnInstanceNamesString(updData
- .getUpdate().getVpnInstanceNames()));
- }
- updateDataList.add(updData);
- }
- /* Decide the max-wait time based on number of VpnInterfaces.
- * max-wait-time is num-of-interface * 4seconds (random choice).
- * Every 2sec poll VpnToDpnList. If VpnInterface is removed ,
- * remove it from vpnInterfaceList.
- */
- int maxWaitTime =
- maxInterfaceList * (int) (VpnConstants.PER_INTERFACE_MAX_WAIT_TIME_IN_MILLISECONDS / 1000);
- int waitTime = 2;
- Iterator<UpdateData> updateDataIterator = updateDataList.iterator();
- UpdateData updateDataSet;
- while (waitTime < maxWaitTime) {
- try {
- Thread.sleep(2000); // sleep for 2sec
- } catch (InterruptedException e) {
- // Ignored
- }
-
- while (updateDataIterator.hasNext()) {
- boolean interfaceIsRemoved = true;
- updateDataSet = updateDataIterator.next();
- for (VpnInstanceNames vpnInterfaceVpnInstance : updateDataSet.getOriginal().getVpnInstanceNames()) {
- String oldVpnName = vpnInterfaceVpnInstance.getVpnName();
- if (oldVpnName != null && updateDataSet.getUpdate().getVpnInstanceNames() != null
- && VpnHelper.doesVpnInterfaceBelongToVpnInstance(oldVpnName,
- updateDataSet.getUpdate().getVpnInstanceNames())) {
- continue;
- }
-
- boolean isPresent = isVpnIntfPresentInVpnToDpnList(updateDataSet.getOriginal(), oldVpnName);
- if (isPresent) {
- interfaceIsRemoved = false;
- }
- }
- if (interfaceIsRemoved) {
- updateDataIterator.remove();
- }
- }
- if (updateDataList.isEmpty()) {
- log.info("run: All VpnInterfaces are successfully removed from OLD VPN after time {}", waitTime);
- break;
- }
- waitTime += 2; //Increment linearly by 2sec.
- }
-
- if (updateDataList.size() > 0) {
- log.error("run: VpnInterfacesList not removed from old Vpn even after waiting {}", waitTime);
- }
- for (UpdateData updData : processQueue) {
- if (updateDataList.contains(updData)) {
- log.error("run: Failed to swap VpnInterface {} from oldVpn {} to target VPN {}"
- + "as it has not been cleaned up from the oldVpn", updData.getOriginal().getName(),
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(updData.getOriginal()
- .getVpnInstanceNames()),
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(updData.getUpdate()
- .getVpnInstanceNames()));
- continue;
- }
- for (VpnInstanceNames vpnInterfaceVpnInstance : updData.getUpdate().getVpnInstanceNames()) {
- String newVpnName = vpnInterfaceVpnInstance.getVpnName();
- if (updData.getOriginal().getVpnInstanceNames() != null
- && VpnHelper.doesVpnInterfaceBelongToVpnInstance(newVpnName,
- updData.getOriginal().getVpnInstanceNames())) {
- continue;
- }
- log.info("VPN Interface update event - intfName {} onto vpnName {} running config-driven"
- + " swap addition", updData.getUpdate().getName(), newVpnName);
- final Adjacencies origAdjs = updData.getOriginal().getAugmentation(Adjacencies.class);
- final List<Adjacency> oldAdjs = origAdjs != null && origAdjs.getAdjacency() != null
- ? origAdjs.getAdjacency() : new ArrayList<>();
- final Adjacencies updateAdjs = updData.getUpdate().getAugmentation(Adjacencies.class);
- final List<Adjacency> newAdjs = updateAdjs != null && updateAdjs.getAdjacency() != null
- ? updateAdjs.getAdjacency() : new ArrayList<>();
-
- addVpnInterfaceCall(updData.getIdentifier(), updData.getUpdate(),
- oldAdjs, newAdjs, newVpnName);
- log.info("run: Processed Add for update on VPNInterface {} from oldVpn(s) {} to newVpn {}"
- + " upon VPN swap", updData.getUpdate().getName(),
- VpnHelper.getVpnInterfaceVpnInstanceNamesString(updData.getOriginal()
- .getVpnInstanceNames()), newVpnName);
- }
- }
+ private boolean handleVpnSwapForVpnInterface(InstanceIdentifier<VpnInterface> identifier,
+ VpnInterface original, VpnInterface update) {
+ boolean isSwap = Boolean.FALSE;
+ final VpnInterfaceKey key = identifier.firstKeyOf(VpnInterface.class, VpnInterfaceKey.class);
+ final String interfaceName = key.getName();
+ List<String> oldVpnList = original.getVpnInstanceNames().stream()
+ .map(VpnInstanceNames::getVpnName).collect(Collectors.toList());
+ List<String> oldVpnListCopy = new ArrayList<>();
+ oldVpnListCopy.addAll(oldVpnList);
+ List<String> newVpnList = update.getVpnInstanceNames().stream()
+ .map(VpnInstanceNames::getVpnName).collect(Collectors.toList());
+ oldVpnList.removeAll(newVpnList);
+ for (String oldVpnName: oldVpnList) {
+ isSwap = Boolean.TRUE;
+ LOG.info("handleVpnSwapForVpnInterface: VPN Interface update event - intfName {} remove vpnName {} running"
+ + " config-driven swap removal", interfaceName, oldVpnName);
+ removeVpnInterfaceCall(identifier, original, oldVpnName, interfaceName);
+ LOG.info("handleVpnSwapForVpnInterface: Processed Remove for update on VPNInterface {} upon VPN swap"
+ + "from old vpn {} to newVpn(s) {}", interfaceName, oldVpnName, newVpnList);
+ }
+ //Wait for previous interface bindings to be removed
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ //Ignore
}
-
- private boolean isVpnIntfPresentInVpnToDpnList(VpnInterface vpnInterface, String vpnName)
- throws ReadFailedException {
- BigInteger dpnId = vpnInterface.getDpnId();
- String rd = VpnUtil.getVpnRd(dataBroker, vpnName);
- log.trace("isVpnIntfPresentInVpnToDpnList: GOT rd {} for VpnInterface {} VpnInstance {} ", rd ,
- vpnInterface.getName(), vpnName);
-
- if (rd == null) {
- return false;
- }
-
- Optional<VpnInstanceOpDataEntry> maybeVpnInstanceOpData = vpnInstanceOpDataEntryCache.get(
- VpnUtil.getVpnInstanceOpDataIdentifier(rd));
- if (maybeVpnInstanceOpData.isPresent()) {
- log.trace("isVpnIntfPresentInVpnToDpnList: GOT VpnInstanceOp {} for rd {} ",
- maybeVpnInstanceOpData.get(), rd);
- List<VpnToDpnList> dpnToVpns = maybeVpnInstanceOpData.get().getVpnToDpnList();
- if (dpnToVpns != null) {
- for (VpnToDpnList dpn : dpnToVpns) {
- if (dpn.getDpnId().equals(dpnId)) {
- return dpn.getVpnInterfaces().stream().anyMatch(
- vpnInterfaces -> vpnInterface.getName().equals(vpnInterfaces.getInterfaceName()));
- }
- log.info("isVpnIntfPresentInVpnToDpnList: VpnInterface {} not present in DpnId {} vpn {}",
- vpnInterface.getName(), dpn.getDpnId(), vpnName);
- }
- }
+ newVpnList.removeAll(oldVpnListCopy);
+ for (String newVpnName: newVpnList) {
+ String primaryRd = VpnUtil.getPrimaryRd(dataBroker, newVpnName);
+ isSwap = Boolean.TRUE;
+ if (!VpnUtil.isVpnPendingDelete(dataBroker, primaryRd)) {
+ LOG.info("handleVpnSwapForVpnInterface: VPN Interface update event - intfName {} onto vpnName {}"
+ + "running config-driven swap addition", interfaceName, newVpnName);
+ final Adjacencies origAdjs = original.getAugmentation(Adjacencies.class);
+ final List<Adjacency> oldAdjs = (origAdjs != null && origAdjs.getAdjacency() != null)
+ ? origAdjs.getAdjacency() : new ArrayList<>();
+ final Adjacencies updateAdjs = update.getAugmentation(Adjacencies.class);
+ final List<Adjacency> newAdjs = (updateAdjs != null && updateAdjs.getAdjacency() != null)
+ ? updateAdjs.getAdjacency() : new ArrayList<>();
+
+ addVpnInterfaceCall(identifier, update, oldAdjs, newAdjs, newVpnName);
+ LOG.info("handleVpnSwapForVpnInterface: Processed Add for update on VPNInterface {}"
+ + "from oldVpn(s) {} to newVpn {} upon VPN swap",
+ interfaceName, oldVpnListCopy, newVpnName);
}
-
- return false;
}
+ return isSwap;
}
private void updateLabelMapper(Long label, List<String> nextHopIpList) {
operationalAdjacency.getIpAddress().equals(adjacency.getIpAddress())))
.forEach(adjacency -> {
LOG.debug("Processing the vpnInterface{} for the Ajacency:{}", vpnInterface, adjacency);
- jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterface.getInterfaceName() + vpnName,
+ jobCoordinator.enqueueJob("VPNINTERFACE-" + vpnInterface.getInterfaceName(),
() -> {
WriteTransaction writeConfigTxn = dataBroker.newWriteOnlyTransaction();
WriteTransaction writeOperTxn = dataBroker.newWriteOnlyTransaction();