import static org.opendaylight.mdsal.binding.util.Datastore.CONFIGURATION;
import static org.opendaylight.mdsal.binding.util.Datastore.OPERATIONAL;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
private final InterfaceMetaUtils interfaceMetaUtils;
private final PortNameCache portNameCache;
private final InterfacemgrProvider interfacemgrProvider;
+ private final MigrationInProgressCache migrationInProgressCache;
@Inject
public InterfaceInventoryStateListener(@Reference final DataBroker dataBroker,
super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class)
.child(Node.class).child(NodeConnector.class)
.augmentation(FlowCapableNodeConnector.class),
- Executors.newSingleThreadExecutor("InterfaceInventoryStateListener", LOG));
+ Executors.newFixedThreadPool(1, "InterfaceInventoryStateListener", LOG));
this.dataBroker = dataBroker;
this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
this.idManager = idManagerService;
this.interfaceMetaUtils = interfaceMetaUtils;
this.portNameCache = portNameCache;
this.interfacemgrProvider = interfacemgrProvider;
+ this.migrationInProgressCache = new MigrationInProgressCache();
serviceRecoveryRegistry.addRecoverableListener(interfaceServiceRecoveryHandler.buildServiceRegistryKey(),
this);
}
EVENT_LOGGER.debug("IFM-InterfaceInventoryState Entity Owner,ADD {},{}", portName, nodeConnectorId.getValue());
if (InterfaceManagerCommonUtils.isNovaPort(portName) || InterfaceManagerCommonUtils.isK8SPort(portName)) {
+ Optional<NodeConnectorId> nodeConnectorIdFromCache = migrationInProgressCache.get(interfaceName);
+ if (nodeConnectorIdFromCache.isPresent() && nodeConnectorIdFromCache.get().equals(nodeConnectorId)) {
+ LOG.error("NodeConnectorId is changed. Dropping Port update event for {} from NodeConnectorId {}",
+ fcNodeConnectorNew.getName(), nodeConnectorId);
+ return;
+ }
NodeConnectorId nodeConnectorIdOld = null;
org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang
.ietf.interfaces.rev140508.interfaces.state.Interface interfaceState = interfaceManagerCommonUtils
LOG.warn("Port number update detected for {}", fcNodeConnectorNew.getName());
}
//VM Migration or Port Number Update: Delete existing interface entry for older DPN
+ migrationInProgressCache.put(interfaceName, nodeConnectorIdOld);
LOG.trace("Removing entry for port id {} from map",nodeConnectorIdOld.getValue());
portNameCache.remove(nodeConnectorIdOld.getValue());
EVENT_LOGGER.debug("IFM-VMMigration,{}", portName);
}
InterfaceStateAddWorker ifStateAddWorker = new InterfaceStateAddWorker(idManager, nodeConnectorId,
- fcNodeConnectorNew, portName);
+ fcNodeConnectorNew, portName, interfaceName, IfmConstants.JOB_MAX_RETRIES);
coordinator.enqueueJob(portName, ifStateAddWorker, IfmConstants.JOB_MAX_RETRIES);
}
private final FlowCapableNodeConnector fcNodeConnectorNew;
private final String interfaceName;
private final IdManagerService idManager;
+ private final String migrationCacheKey;
+ private int maxRetries;
InterfaceStateAddWorker(IdManagerService idManager, NodeConnectorId nodeConnectorId,
- FlowCapableNodeConnector fcNodeConnectorNew, String portName) {
+ FlowCapableNodeConnector fcNodeConnectorNew, String portName, String migrationCacheKey,
+ int maxRetries) {
this.nodeConnectorId = nodeConnectorId;
this.fcNodeConnectorNew = fcNodeConnectorNew;
this.interfaceName = portName;
this.idManager = idManager;
+ this.migrationCacheKey = migrationCacheKey;
+ this.maxRetries = maxRetries;
}
@Override
getInterfaceChildEntries(interfaceName);
for (InterfaceChildEntry interfaceChildEntry : interfaceChildEntries.values()) {
InterfaceStateAddWorker interfaceStateAddWorker = new InterfaceStateAddWorker(idManager,
- nodeConnectorId, fcNodeConnectorNew, interfaceChildEntry.getChildInterface());
+ nodeConnectorId, fcNodeConnectorNew, interfaceChildEntry.getChildInterface(), null, 0);
coordinator.enqueueJob(interfaceName, interfaceStateAddWorker);
}
+
+ if (migrationCacheKey != null && futures != null && !futures.isEmpty()) {
+ ListenableFuture<List<Object>> completedFuture = Futures.allAsList(futures);
+ Futures.addCallback(completedFuture, new FutureCallback<List<Object>>() {
+ @Override
+ public void onFailure(Throwable error) {
+ maxRetries--;
+ if (maxRetries == 0) {
+ migrationInProgressCache.remove(migrationCacheKey);
+ LOG.error("OvsInterfaceStateAddHelper addState failed for interface {}",
+ migrationCacheKey);
+ }
+ }
+
+ @Override
+ public void onSuccess(List<Object> result) {
+ migrationInProgressCache.remove(migrationCacheKey);
+ }
+ }, MoreExecutors.directExecutor());
+ }
return futures;
}