From a7637041f73ce8438566702222e22030cde9e5b0 Mon Sep 17 00:00:00 2001 From: "K.V Suneelu Verma" Date: Fri, 11 Aug 2017 14:53:56 +0530 Subject: [PATCH] bug 8029 handle expired in transit entries when ref count goes to zero , locator is kept in transit state. if the device still has references to it from other sources like tunnels the locator will not be deleted from device. The transactions which wait for this locator will expire. If the intransit state expires , query the device itself to get its status and proceed with the transactions which depends on this locator. added a timer job which triggers every 30sec and checks for expired intransit keys and clears them. Change-Id: I0022da4a3a2e748629bc031246f6e6c1509d27a4 Signed-off-by: K.V Suneelu Verma --- .../HwvtepConnectionInstance.java | 8 + .../hwvtepsouthbound/HwvtepTableReader.java | 261 ++++++++++++++++++ .../transact/DependencyQueue.java | 44 ++- .../transact/DependentJob.java | 90 +++++- .../DataChangeListenerTestBase.java | 19 ++ .../hwvtepsouthbound/DependencyQueueTest.java | 7 +- .../HwvtepDataChangeListenerTest.java | 5 +- .../SameThreadScheduledExecutor.java | 31 +++ 8 files changed, 442 insertions(+), 23 deletions(-) create mode 100644 hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java create mode 100644 hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java index c8eab8b1e..30e17d2cb 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java @@ -58,6 +58,7 @@ public class HwvtepConnectionInstance { private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionInstance.class); private ConnectionInfo connectionInfo; private OvsdbClient client; + private final HwvtepTableReader hwvtepTableReader; private InstanceIdentifier instanceIdentifier; private TransactionInvoker txInvoker; private Map transactInvokers; @@ -79,6 +80,7 @@ public class HwvtepConnectionInstance { this.txInvoker = txInvoker; this.deviceInfo = new HwvtepDeviceInfo(this); this.dataBroker = dataBroker; + this.hwvtepTableReader = new HwvtepTableReader(this); } public synchronized void transact(TransactCommand command) { @@ -133,6 +135,8 @@ public class HwvtepConnectionInstance { GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class); Set columns = new HashSet<>(tableSchema.getColumns()); List skipColumns = HwvtepSouthboundConstants.SKIP_COLUMN_FROM_HWVTEP_TABLE.get(tableName); + skipColumns = skipColumns == null ? new ArrayList<>() : new ArrayList<>(skipColumns); + skipColumns.add(HwvtepSouthboundConstants.VERSION_COLUMN); if (skipColumns != null) { LOG.info("HwvtepSouthbound NOT monitoring columns {} in table {}", skipColumns, tableName); columns.removeAll(skipColumns); @@ -306,4 +310,8 @@ public class HwvtepConnectionInstance { public OvsdbClient getOvsdbClient() { return client; } + + public HwvtepTableReader getHwvtepTableReader() { + return hwvtepTableReader; + } } diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java new file mode 100644 index 000000000..1ba42fa83 --- /dev/null +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.ovsdb.hwvtepsouthbound; + +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import org.opendaylight.ovsdb.lib.notation.Condition; +import org.opendaylight.ovsdb.lib.notation.Row; +import org.opendaylight.ovsdb.lib.notation.UUID; +import org.opendaylight.ovsdb.lib.operations.Operation; +import org.opendaylight.ovsdb.lib.operations.OperationResult; +import org.opendaylight.ovsdb.lib.operations.Select; +import org.opendaylight.ovsdb.lib.schema.DatabaseSchema; +import org.opendaylight.ovsdb.lib.schema.GenericTableSchema; +import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable; +import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils; +import org.opendaylight.ovsdb.schema.hardwarevtep.LogicalSwitch; +import org.opendaylight.ovsdb.schema.hardwarevtep.McastMacsRemote; +import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalLocator; +import org.opendaylight.ovsdb.schema.hardwarevtep.UcastMacsRemote; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; +import org.opendaylight.yangtools.yang.binding.Identifiable; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.opendaylight.ovsdb.lib.operations.Operations.op; + +public class HwvtepTableReader { + + private static final Logger LOG = LoggerFactory.getLogger(HwvtepTableReader.class); + + private final Map>> whereClauseGetterMap = new HashMap(); + private final Map tableMap = new HashMap(); + private final Map tables = new HashMap<>(); + + private final HwvtepConnectionInstance connectionInstance; + + public HwvtepTableReader(HwvtepConnectionInstance connectionInstance) { + this.connectionInstance = connectionInstance; + DatabaseSchema dbSchema = null; + try { + dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Not able to fetch schema for database {} from device {}", + HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e); + } + + tableMap.put(RemoteMcastMacs.class, McastMacsRemote.class); + tableMap.put(RemoteUcastMacs.class, UcastMacsRemote.class); + tableMap.put(LogicalSwitches.class, LogicalSwitch.class); + tableMap.put(TerminationPoint.class, PhysicalLocator.class); + + whereClauseGetterMap.put(RemoteMcastMacs.class, new RemoteMcastMacWhereClauseGetter()); + whereClauseGetterMap.put(RemoteUcastMacs.class, new RemoteUcastMacWhereClauseGetter()); + whereClauseGetterMap.put(LogicalSwitches.class, new LogicalSwitchWhereClauseGetter()); + whereClauseGetterMap.put(TerminationPoint.class, new LocatorWhereClauseGetter()); + + tables.put(McastMacsRemote.class, TyperUtils.getTypedRowWrapper(dbSchema, McastMacsRemote.class, null)); + tables.put(UcastMacsRemote.class, TyperUtils.getTypedRowWrapper(dbSchema, UcastMacsRemote.class, null)); + tables.put(LogicalSwitch.class, TyperUtils.getTypedRowWrapper(dbSchema, LogicalSwitch.class, null)); + tables.put(PhysicalLocator.class, TyperUtils.getTypedRowWrapper(dbSchema, PhysicalLocator.class, null)); + } + + class RemoteMcastMacWhereClauseGetter implements Function> { + @Override + public List apply(InstanceIdentifier iid) { + InstanceIdentifier macIid = iid; + String mac = macIid.firstKeyOf(RemoteMcastMacs.class).getMacEntryKey().getValue(); + InstanceIdentifier lsIid = (InstanceIdentifier) macIid.firstKeyOf( + RemoteMcastMacs.class).getLogicalSwitchRef().getValue(); + UUID lsUUID = connectionInstance.getDeviceInfo().getUUID(LogicalSwitches.class, lsIid); + if (lsUUID == null) { + LOG.error("Could not find uuid for ls key {}", lsIid); + return null; + } + + McastMacsRemote macTable = (McastMacsRemote) tables.get(McastMacsRemote.class); + ArrayList conditions = new ArrayList<>(); + conditions.add(macTable.getLogicalSwitchColumn().getSchema().opEqual(lsUUID)); + conditions.add(macTable.getMacColumn().getSchema().opEqual(mac)); + return conditions; + } + } + + class RemoteUcastMacWhereClauseGetter implements Function> { + @Override + public List apply(InstanceIdentifier iid) { + InstanceIdentifier macIid = iid; + String mac = macIid.firstKeyOf(RemoteUcastMacs.class).getMacEntryKey().getValue(); + InstanceIdentifier lsIid = (InstanceIdentifier) macIid.firstKeyOf( + RemoteUcastMacs.class).getLogicalSwitchRef().getValue(); + UUID lsUUID = connectionInstance.getDeviceInfo().getUUID(LogicalSwitches.class, lsIid); + if (lsUUID == null) { + LOG.error("Could not find uuid for ls key {}", lsIid); + return null; + } + + UcastMacsRemote macTable = (UcastMacsRemote) tables.get(UcastMacsRemote.class); + ArrayList conditions = new ArrayList<>(); + conditions.add(macTable.getLogicalSwitchColumn().getSchema().opEqual(lsUUID)); + conditions.add(macTable.getMacColumn().getSchema().opEqual(mac)); + return conditions; + } + } + + class LogicalSwitchWhereClauseGetter implements Function> { + @Override + public List apply(InstanceIdentifier iid) { + InstanceIdentifier lsIid = iid; + String lsName = lsIid.firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue(); + LogicalSwitch logicalSwitch = (LogicalSwitch) tables.get(LogicalSwitch.class); + return Lists.newArrayList(logicalSwitch.getNameColumn().getSchema().opEqual(lsName)); + } + } + + class LocatorWhereClauseGetter implements Function> { + @Override + public List apply(InstanceIdentifier iid) { + InstanceIdentifier tepIid = iid; + String locatorIp = tepIid.firstKeyOf(TerminationPoint.class).getTpId().getValue(); + locatorIp = locatorIp.substring(locatorIp.indexOf(":") + 1); + LOG.info("Locator ip to look for {}", locatorIp); + PhysicalLocator locatorTable = (PhysicalLocator) tables.get(PhysicalLocator.class); + return Lists.newArrayList(locatorTable.getDstIpColumn().getSchema().opEqual(locatorIp)); + } + } + + public Optional getHwvtepTableEntryUUID(Class cls, + InstanceIdentifier iid, + UUID existingUUID) { + try { + DatabaseSchema dbSchema = null; + TypedBaseTable globalRow = null; + Class tableClass = tableMap.get(cls); + try { + dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Not able to fetch schema for database {} from device {}", + HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e); + } + + if (dbSchema != null) { + GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, tableClass); + + List hwvtepTableColumn = new ArrayList<>(); + hwvtepTableColumn.addAll(hwvtepSchema.getColumns()); + Select selectOperation = op.select(hwvtepSchema); + selectOperation.setColumns(hwvtepTableColumn); + + if (existingUUID != null) { + TypedBaseTable table = tables.get(tableClass); + LOG.info("Setting uuid condition {} ", existingUUID); + selectOperation.where(table.getUuidColumn().getSchema().opEqual(existingUUID)); + } else { + if (whereClauseGetterMap.get(cls) != null) { + List conditions = whereClauseGetterMap.get(cls).apply(iid); + if (conditions != null) { + if (conditions.size() == 2) { + selectOperation.where(conditions.get(0)).and(conditions.get(1)); + } else { + selectOperation.where(conditions.get(0)); + } + } else { + LOG.error("Could not get where conditions for cls {} key {}", cls, iid); + return Optional.empty(); + } + } else { + LOG.error("Could not get where class for cls {} ", cls); + return Optional.empty(); + } + } + ArrayList operations = new ArrayList<>(); + operations.add(selectOperation); + try { + List results = connectionInstance.transact(dbSchema, operations).get(); + if (results != null && !results.isEmpty()) { + OperationResult selectResult = results.get(0); + if (selectResult.getRows() != null && !selectResult.getRows().isEmpty()) { + globalRow = TyperUtils.getTypedRowWrapper( + dbSchema, tableClass, selectResult.getRows().get(0)); + } + } + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Not able to fetch hardware_vtep table row from device {}", + connectionInstance.getConnectionInfo(), e); + } + } + LOG.trace("Fetched {} from hardware_vtep schema", globalRow); + if (globalRow != null && globalRow.getUuid() != null) { + return Optional.of(globalRow); + } + return Optional.empty(); + } catch (Throwable e) { + LOG.error("Failed to get the hwvtep row for iid {} {} ", iid, e); + return Optional.empty(); + } + } + + public List getHwvtepTableEntries(Class cls) { + try { + List tableRows = new ArrayList<>(); + DatabaseSchema dbSchema = null; + TypedBaseTable globalRow = null; + Class tableClass = tableMap.get(cls); + try { + dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Not able to fetch schema for database {} from device {}", + HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e); + } + + if (dbSchema != null) { + GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, tableClass); + + List hwvtepTableColumn = new ArrayList<>(); + hwvtepTableColumn.addAll(hwvtepSchema.getColumns()); + Select selectOperation = op.select(hwvtepSchema); + selectOperation.setColumns(hwvtepTableColumn); + + ArrayList operations = Lists.newArrayList(selectOperation); + try { + List results = connectionInstance.transact(dbSchema, operations).get(); + if (results != null && !results.isEmpty()) { + for (OperationResult selectResult : results) { + if (selectResult.getRows() != null && !selectResult.getRows().isEmpty()) { + for (Row row : selectResult.getRows()) { + tableRows.add(TyperUtils.getTypedRowWrapper(dbSchema, tableClass, row)); + } + } + } + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Not able to fetch hardware_vtep table row from device {}", + connectionInstance.getConnectionInfo(), e); + } + } + return tableRows; + } catch (Throwable e) { + LOG.error("Failed to get the hwvtep ", e); + } + return Collections.emptyList(); + } +} \ No newline at end of file diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependencyQueue.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependencyQueue.java index 65af6be9e..e0fca77a9 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependencyQueue.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependencyQueue.java @@ -13,34 +13,48 @@ import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance; import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo; import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants; import org.opendaylight.ovsdb.lib.operations.TransactionBuilder; -import org.opendaylight.yangtools.yang.binding.Identifiable; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; public class DependencyQueue { private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class); private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d"). build(); - private static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact); + private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact); private final LinkedBlockingQueue configWaitQueue = new LinkedBlockingQueue<>( HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY); private final LinkedBlockingQueue opWaitQueue = new LinkedBlockingQueue<>( HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY); private final HwvtepDeviceInfo deviceInfo; + private ScheduledFuture expiredTasksMonitorJob; + @SuppressWarnings("unchecked") public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) { this.deviceInfo = hwvtepDeviceInfo; + expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> { + try { + LOG.debug("Processing dependencies"); + if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) { + expiredTasksMonitorJob.cancel(false); + } + deviceInfo.onOperDataAvailable(); + } catch (Throwable e) { + //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable + LOG.error("Failed to process dependencies", e); + } + }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS); } /** @@ -84,22 +98,25 @@ public class DependencyQueue { final List readyJobs = getReadyJobs(queue); if (readyJobs.size() > 0) { executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() { + HwvtepOperationalState operationalState; @Override public void execute(TransactionBuilder transactionBuilder) { - HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance); + this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance); for (DependentJob job : readyJobs) { job.onDependencyResolved(operationalState, transactionBuilder); } } @Override - public void onConfigUpdate(TransactionBuilder transaction, InstanceIdentifier nodeIid, - Identifiable data, InstanceIdentifier key, Object... extraData) { + public void onFailure(TransactionBuilder deviceTransaction) { + readyJobs.forEach((job) -> job.onFailure(deviceTransaction)); + operationalState.clearIntransitKeys(); } @Override - public void doDeviceTransaction(TransactionBuilder transaction, InstanceIdentifier nodeIid, - Identifiable data, InstanceIdentifier key, Object... extraData) { + public void onSuccess(TransactionBuilder deviceTransaction) { + readyJobs.forEach((job) -> job.onSuccess(deviceTransaction)); + operationalState.getDeviceInfo().onOperDataAvailable(); } })); } @@ -111,13 +128,16 @@ public class DependencyQueue { while(jobIterator.hasNext()) { DependentJob job = jobIterator.next(); long currentTime = System.currentTimeMillis(); - if (job.isExpired(currentTime)) { + + //first check if its dependencies are met later check for expired status + if (job.areDependenciesMet(deviceInfo)) { jobIterator.remove(); + readyJobs.add(job); continue; } - if (job.areDependenciesMet(deviceInfo)) { + if (job.isExpired(currentTime)) { jobIterator.remove(); - readyJobs.add(job); + continue; } } return readyJobs; diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependentJob.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependentJob.java index 478dd17b0..820ccf101 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependentJob.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependentJob.java @@ -8,18 +8,47 @@ package org.opendaylight.ovsdb.hwvtepsouthbound.transact; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiPredicate; +import java.util.function.Predicate; + import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo; import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants; import org.opendaylight.ovsdb.lib.operations.TransactionBuilder; +import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.Identifiable; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; - -import java.util.List; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class DependentJob { + private static final Logger LOG = LoggerFactory.getLogger(DependentJob.class); + + private static final Predicate DATA_INTRANSIT + = (controllerData) -> controllerData != null && controllerData.isInTransitState(); + + private static final Predicate DATA_INTRANSIT_EXPIRED + = (controllerData) -> controllerData != null && controllerData.isInTransitState() + && controllerData.isIntransitTimeExpired(); + + //expecting the device to create the data + private static final BiPredicate> INTRANSIT_DATA_CREATED + = (controllerData, deviceData) -> controllerData.getUuid() == null && deviceData.isPresent(); + + private static final BiPredicate> INTRANSIT_DATA_NOT_CREATED + = (controllerData, deviceData) -> controllerData.getUuid() == null && !deviceData.isPresent(); + + //expecting the device to delete the data + private static final BiPredicate> INTRANSIT_DATA_DELETED + = (controllerData, deviceData) -> controllerData.getUuid() != null && !deviceData.isPresent(); + + private static final BiPredicate> INTRANSIT_DATA_NOT_DELETED + = (controllerData, deviceData) -> controllerData.getUuid() != null && deviceData.isPresent(); + private final long expiryTime; private final InstanceIdentifier key; private final T data; @@ -79,6 +108,16 @@ public abstract class DependentJob { return data; } + public boolean isConfigWaitingJob() { + return true; + } + + public void onFailure(TransactionBuilder deviceTransaction) { + } + + public void onSuccess(TransactionBuilder deviceTransaction) { + } + public abstract static class ConfigWaitingJob extends DependentJob { public ConfigWaitingJob(InstanceIdentifier key, T data, Map dependencies) { @@ -99,8 +138,47 @@ public abstract class DependentJob { @Override protected boolean isDependencyMet(HwvtepDeviceInfo deviceInfo, Class cls, InstanceIdentifier iid) { - HwvtepDeviceInfo.DeviceData deviceData = deviceInfo.getDeviceOperData(cls, iid); - return deviceData == null || deviceData.getStatus() != HwvtepDeviceInfo.DeviceDataStatus.IN_TRANSIT; + boolean depenencyMet = true; + HwvtepDeviceInfo.DeviceData controllerData = deviceInfo.getDeviceOperData(cls, iid); + + if (DATA_INTRANSIT_EXPIRED.test(controllerData)) { + LOG.info("Intransit state expired for key: {} --- dependency {}", iid, getKey()); + String clsName = cls.getSimpleName(); + + //either the device acted on the selected iid/uuid and sent the updated event or it did not + //here we are querying the device directly to get the latest status on the iid + Optional latestDeviceStatus = deviceInfo.getConnectionInstance(). + getHwvtepTableReader().getHwvtepTableEntryUUID(cls, iid, controllerData.getUuid()); + + TypedBaseTable latestDeviceData = latestDeviceStatus.isPresent() ? latestDeviceStatus.get() : null; + + if (INTRANSIT_DATA_CREATED.test(controllerData, latestDeviceStatus)) { + LOG.info("Intransit expired key is actually created but update is missed/delayed {}", iid); + deviceInfo.updateDeviceOperData(cls, iid, latestDeviceStatus.get().getUuid(), latestDeviceData); + + } else if (INTRANSIT_DATA_NOT_CREATED.test(controllerData, latestDeviceStatus)) { + LOG.info("Intransit expired key is actually not created but update is missed/delayed {}", iid); + deviceInfo.clearDeviceOperData(cls, iid); + + } else if (INTRANSIT_DATA_DELETED.test(controllerData, latestDeviceStatus)) { + //also deleted from device + LOG.info("Intransit expired key is actually deleted but update is missed/delayed {}", iid); + deviceInfo.clearDeviceOperData(cls, iid); + + } else if (INTRANSIT_DATA_NOT_DELETED.test(controllerData, latestDeviceStatus)) { + //not deleted from device we will reuse existing uuid + LOG.info("Intransit expired key is actually not deleted but update is missed/delayed {}", iid); + deviceInfo.updateDeviceOperData(cls, iid, latestDeviceStatus.get().getUuid(), latestDeviceData); + } + } else if (DATA_INTRANSIT.test(controllerData)) { + //device status is still in transit + depenencyMet = false; + } + return depenencyMet; + } + + public boolean isConfigWaitingJob() { + return false; } } -} +} \ No newline at end of file diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java index 5da2e45f9..a9b00b860 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java @@ -11,6 +11,7 @@ package org.opendaylight.ovsdb.hwvtepsouthbound; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -57,6 +58,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.net.InetAddress; import java.util.List; @@ -129,6 +132,20 @@ public class DataChangeListenerTestBase extends AbstractDataBrokerTest { deleteNode(CONFIGURATION); } + void setFinalStatic(Class cls, String fieldName, Object newValue) throws Exception { + Field fields[] = FieldUtils.getAllFields(cls); + for (Field field : fields) { + if (fieldName.equals(field.getName())) { + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, newValue); + break; + } + } + } + void loadSchema() { try (InputStream resourceAsStream = DataChangeListenerTestBase.class.getResourceAsStream("hwvtep_schema.json")) { ObjectMapper mapper = new ObjectMapper(); @@ -168,6 +185,8 @@ public class DataChangeListenerTestBase extends AbstractDataBrokerTest { field(HwvtepConnectionInstance.class, "txInvoker").set(connectionInstance, transactionInvoker); field(HwvtepConnectionInstance.class, "deviceInfo").set(connectionInstance, new HwvtepDeviceInfo(connectionInstance)); field(HwvtepConnectionInstance.class, "client").set(connectionInstance, ovsdbClient); + when(connectionInstance.getOvsdbClient()).thenReturn(ovsdbClient); + when(ovsdbClient.isActive()).thenReturn(true); when(connectionInstance.getConnectionInfo()).thenReturn(connectionInfo); when(connectionInstance.getConnectionInfo().getRemoteAddress()).thenReturn(mock(InetAddress.class)); when(connectionInstance.getInstanceIdentifier()).thenReturn(nodeIid); diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DependencyQueueTest.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DependencyQueueTest.java index fab640c42..b4fd63789 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DependencyQueueTest.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DependencyQueueTest.java @@ -8,9 +8,9 @@ package org.opendaylight.ovsdb.hwvtepsouthbound; -import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mockito; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependentJob; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.HwvtepOperationalState; @@ -28,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hw import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; import org.opendaylight.yangtools.yang.binding.Identifiable; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; @@ -55,7 +56,7 @@ public class DependencyQueueTest extends DataChangeListenerTestBase { InstanceIdentifier lsIid; Map, List> unMetDependencies; - void setupForTest() { + void setupForTest() throws Exception { MCAST_MAC_DATA_VALIDATOR = Whitebox.getInternalState(McastMacsRemoteUpdateCommand.class, UnMetDependencyGetter.class); opState = new HwvtepOperationalState(connectionInstance); mac = TestBuilders.buildRemoteMcastMacs(nodeIid,"FF:FF:FF:FF:FF:FF", "ls0", @@ -64,7 +65,7 @@ public class DependencyQueueTest extends DataChangeListenerTestBase { child(LogicalSwitches.class, new LogicalSwitchesKey(new HwvtepNodeName("ls0"))); macIid = nodeIid.augmentation(HwvtepGlobalAugmentation.class). child(RemoteMcastMacs.class, new RemoteMcastMacsKey(mac.getKey())); - Whitebox.setInternalState(DependencyQueue.class, "executorService", MoreExecutors.newDirectExecutorService()); + setFinalStatic(DependencyQueue.class, "executorService", PowerMockito.mock(SameThreadScheduledExecutor.class, Mockito.CALLS_REAL_METHODS)); } @Test diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepDataChangeListenerTest.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepDataChangeListenerTest.java index edb6f1271..491be0510 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepDataChangeListenerTest.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepDataChangeListenerTest.java @@ -9,13 +9,13 @@ package org.opendaylight.ovsdb.hwvtepsouthbound; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Matchers; +import org.mockito.Mockito; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue; import org.opendaylight.ovsdb.lib.operations.Operations; import org.opendaylight.ovsdb.lib.schema.DatabaseSchema; @@ -29,6 +29,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hw import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; import org.opendaylight.yangtools.yang.binding.DataObject; +import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; @@ -94,7 +95,7 @@ public class HwvtepDataChangeListenerTest extends DataChangeListenerTestBase { @Before public void setupListener() throws Exception { - setFinalStatic(DependencyQueue.class, "executorService", MoreExecutors.newDirectExecutorService()); + setFinalStatic(DependencyQueue.class, "executorService", PowerMockito.mock(SameThreadScheduledExecutor.class, Mockito.CALLS_REAL_METHODS)); opDataChangeListener = new HwvtepOperationalDataChangeListener(dataBroker, hwvtepConnectionManager, connectionInstance); } diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java new file mode 100644 index 000000000..cafe1dbe6 --- /dev/null +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.ovsdb.hwvtepsouthbound; + +import com.google.common.util.concurrent.SettableFuture; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class SameThreadScheduledExecutor implements ScheduledExecutorService { + + @Override + public Future submit(Runnable runnable) { + runnable.run(); + SettableFuture ft = SettableFuture.create(); + ft.set(null); + return ft; + } +} -- 2.36.6