when ref count goes to zero , locator is kept in transit state.
if the device still has references to it from other sources like tunnels
the locator will not be deleted from device.
The transactions which wait for this locator will expire.
If the intransit state expires , query the device itself to get its
status and proceed with the transactions which depends on this locator.
added a timer job which triggers every 30sec and checks for expired
intransit keys and clears them.
Change-Id: I0022da4a3a2e748629bc031246f6e6c1509d27a4
Signed-off-by: K.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionInstance.class);
private ConnectionInfo connectionInfo;
private OvsdbClient client;
+ private final HwvtepTableReader hwvtepTableReader;
private InstanceIdentifier<Node> instanceIdentifier;
private TransactionInvoker txInvoker;
private Map<DatabaseSchema,TransactInvoker> transactInvokers;
this.txInvoker = txInvoker;
this.deviceInfo = new HwvtepDeviceInfo(this);
this.dataBroker = dataBroker;
+ this.hwvtepTableReader = new HwvtepTableReader(this);
}
public synchronized void transact(TransactCommand command) {
GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
Set<String> columns = new HashSet<>(tableSchema.getColumns());
List<String> skipColumns = HwvtepSouthboundConstants.SKIP_COLUMN_FROM_HWVTEP_TABLE.get(tableName);
+ skipColumns = skipColumns == null ? new ArrayList<>() : new ArrayList<>(skipColumns);
+ skipColumns.add(HwvtepSouthboundConstants.VERSION_COLUMN);
if (skipColumns != null) {
LOG.info("HwvtepSouthbound NOT monitoring columns {} in table {}", skipColumns, tableName);
columns.removeAll(skipColumns);
public OvsdbClient getOvsdbClient() {
return client;
}
+
+ public HwvtepTableReader getHwvtepTableReader() {
+ return hwvtepTableReader;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import org.opendaylight.ovsdb.lib.notation.Condition;
+import org.opendaylight.ovsdb.lib.notation.Row;
+import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.operations.Operation;
+import org.opendaylight.ovsdb.lib.operations.OperationResult;
+import org.opendaylight.ovsdb.lib.operations.Select;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
+import org.opendaylight.ovsdb.schema.hardwarevtep.LogicalSwitch;
+import org.opendaylight.ovsdb.schema.hardwarevtep.McastMacsRemote;
+import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalLocator;
+import org.opendaylight.ovsdb.schema.hardwarevtep.UcastMacsRemote;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.opendaylight.ovsdb.lib.operations.Operations.op;
+
+public class HwvtepTableReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HwvtepTableReader.class);
+
+ private final Map<Class, Function<InstanceIdentifier, List<Condition>>> whereClauseGetterMap = new HashMap();
+ private final Map<Class, Class> tableMap = new HashMap();
+ private final Map<Class, TypedBaseTable> tables = new HashMap<>();
+
+ private final HwvtepConnectionInstance connectionInstance;
+
+ public HwvtepTableReader(HwvtepConnectionInstance connectionInstance) {
+ this.connectionInstance = connectionInstance;
+ DatabaseSchema dbSchema = null;
+ try {
+ dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Not able to fetch schema for database {} from device {}",
+ HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
+ }
+
+ tableMap.put(RemoteMcastMacs.class, McastMacsRemote.class);
+ tableMap.put(RemoteUcastMacs.class, UcastMacsRemote.class);
+ tableMap.put(LogicalSwitches.class, LogicalSwitch.class);
+ tableMap.put(TerminationPoint.class, PhysicalLocator.class);
+
+ whereClauseGetterMap.put(RemoteMcastMacs.class, new RemoteMcastMacWhereClauseGetter());
+ whereClauseGetterMap.put(RemoteUcastMacs.class, new RemoteUcastMacWhereClauseGetter());
+ whereClauseGetterMap.put(LogicalSwitches.class, new LogicalSwitchWhereClauseGetter());
+ whereClauseGetterMap.put(TerminationPoint.class, new LocatorWhereClauseGetter());
+
+ tables.put(McastMacsRemote.class, TyperUtils.getTypedRowWrapper(dbSchema, McastMacsRemote.class, null));
+ tables.put(UcastMacsRemote.class, TyperUtils.getTypedRowWrapper(dbSchema, UcastMacsRemote.class, null));
+ tables.put(LogicalSwitch.class, TyperUtils.getTypedRowWrapper(dbSchema, LogicalSwitch.class, null));
+ tables.put(PhysicalLocator.class, TyperUtils.getTypedRowWrapper(dbSchema, PhysicalLocator.class, null));
+ }
+
+ class RemoteMcastMacWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+ @Override
+ public List<Condition> apply(InstanceIdentifier iid) {
+ InstanceIdentifier<RemoteMcastMacs> macIid = iid;
+ String mac = macIid.firstKeyOf(RemoteMcastMacs.class).getMacEntryKey().getValue();
+ InstanceIdentifier<LogicalSwitches> lsIid = (InstanceIdentifier<LogicalSwitches>) macIid.firstKeyOf(
+ RemoteMcastMacs.class).getLogicalSwitchRef().getValue();
+ UUID lsUUID = connectionInstance.getDeviceInfo().getUUID(LogicalSwitches.class, lsIid);
+ if (lsUUID == null) {
+ LOG.error("Could not find uuid for ls key {}", lsIid);
+ return null;
+ }
+
+ McastMacsRemote macTable = (McastMacsRemote) tables.get(McastMacsRemote.class);
+ ArrayList<Condition> conditions = new ArrayList<>();
+ conditions.add(macTable.getLogicalSwitchColumn().getSchema().opEqual(lsUUID));
+ conditions.add(macTable.getMacColumn().getSchema().opEqual(mac));
+ return conditions;
+ }
+ }
+
+ class RemoteUcastMacWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+ @Override
+ public List<Condition> apply(InstanceIdentifier iid) {
+ InstanceIdentifier<RemoteUcastMacs> macIid = iid;
+ String mac = macIid.firstKeyOf(RemoteUcastMacs.class).getMacEntryKey().getValue();
+ InstanceIdentifier<LogicalSwitches> lsIid = (InstanceIdentifier<LogicalSwitches>) macIid.firstKeyOf(
+ RemoteUcastMacs.class).getLogicalSwitchRef().getValue();
+ UUID lsUUID = connectionInstance.getDeviceInfo().getUUID(LogicalSwitches.class, lsIid);
+ if (lsUUID == null) {
+ LOG.error("Could not find uuid for ls key {}", lsIid);
+ return null;
+ }
+
+ UcastMacsRemote macTable = (UcastMacsRemote) tables.get(UcastMacsRemote.class);
+ ArrayList<Condition> conditions = new ArrayList<>();
+ conditions.add(macTable.getLogicalSwitchColumn().getSchema().opEqual(lsUUID));
+ conditions.add(macTable.getMacColumn().getSchema().opEqual(mac));
+ return conditions;
+ }
+ }
+
+ class LogicalSwitchWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+ @Override
+ public List<Condition> apply(InstanceIdentifier iid) {
+ InstanceIdentifier<LogicalSwitches> lsIid = iid;
+ String lsName = lsIid.firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
+ LogicalSwitch logicalSwitch = (LogicalSwitch) tables.get(LogicalSwitch.class);
+ return Lists.newArrayList(logicalSwitch.getNameColumn().getSchema().opEqual(lsName));
+ }
+ }
+
+ class LocatorWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+ @Override
+ public List<Condition> apply(InstanceIdentifier iid) {
+ InstanceIdentifier<TerminationPoint> tepIid = iid;
+ String locatorIp = tepIid.firstKeyOf(TerminationPoint.class).getTpId().getValue();
+ locatorIp = locatorIp.substring(locatorIp.indexOf(":") + 1);
+ LOG.info("Locator ip to look for {}", locatorIp);
+ PhysicalLocator locatorTable = (PhysicalLocator) tables.get(PhysicalLocator.class);
+ return Lists.newArrayList(locatorTable.getDstIpColumn().getSchema().opEqual(locatorIp));
+ }
+ }
+
+ public Optional<TypedBaseTable> getHwvtepTableEntryUUID(Class<? extends Identifiable> cls,
+ InstanceIdentifier iid,
+ UUID existingUUID) {
+ try {
+ DatabaseSchema dbSchema = null;
+ TypedBaseTable globalRow = null;
+ Class<TypedBaseTable> tableClass = tableMap.get(cls);
+ try {
+ dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Not able to fetch schema for database {} from device {}",
+ HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
+ }
+
+ if (dbSchema != null) {
+ GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, tableClass);
+
+ List<String> hwvtepTableColumn = new ArrayList<>();
+ hwvtepTableColumn.addAll(hwvtepSchema.getColumns());
+ Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
+ selectOperation.setColumns(hwvtepTableColumn);
+
+ if (existingUUID != null) {
+ TypedBaseTable table = tables.get(tableClass);
+ LOG.info("Setting uuid condition {} ", existingUUID);
+ selectOperation.where(table.getUuidColumn().getSchema().opEqual(existingUUID));
+ } else {
+ if (whereClauseGetterMap.get(cls) != null) {
+ List<Condition> conditions = whereClauseGetterMap.get(cls).apply(iid);
+ if (conditions != null) {
+ if (conditions.size() == 2) {
+ selectOperation.where(conditions.get(0)).and(conditions.get(1));
+ } else {
+ selectOperation.where(conditions.get(0));
+ }
+ } else {
+ LOG.error("Could not get where conditions for cls {} key {}", cls, iid);
+ return Optional.empty();
+ }
+ } else {
+ LOG.error("Could not get where class for cls {} ", cls);
+ return Optional.empty();
+ }
+ }
+ ArrayList<Operation> operations = new ArrayList<>();
+ operations.add(selectOperation);
+ try {
+ List<OperationResult> results = connectionInstance.transact(dbSchema, operations).get();
+ if (results != null && !results.isEmpty()) {
+ OperationResult selectResult = results.get(0);
+ if (selectResult.getRows() != null && !selectResult.getRows().isEmpty()) {
+ globalRow = TyperUtils.getTypedRowWrapper(
+ dbSchema, tableClass, selectResult.getRows().get(0));
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Not able to fetch hardware_vtep table row from device {}",
+ connectionInstance.getConnectionInfo(), e);
+ }
+ }
+ LOG.trace("Fetched {} from hardware_vtep schema", globalRow);
+ if (globalRow != null && globalRow.getUuid() != null) {
+ return Optional.of(globalRow);
+ }
+ return Optional.empty();
+ } catch (Throwable e) {
+ LOG.error("Failed to get the hwvtep row for iid {} {} ", iid, e);
+ return Optional.empty();
+ }
+ }
+
+ public List<TypedBaseTable> getHwvtepTableEntries(Class<? extends Identifiable> cls) {
+ try {
+ List<TypedBaseTable> tableRows = new ArrayList<>();
+ DatabaseSchema dbSchema = null;
+ TypedBaseTable globalRow = null;
+ Class<TypedBaseTable> tableClass = tableMap.get(cls);
+ try {
+ dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Not able to fetch schema for database {} from device {}",
+ HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
+ }
+
+ if (dbSchema != null) {
+ GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, tableClass);
+
+ List<String> hwvtepTableColumn = new ArrayList<>();
+ hwvtepTableColumn.addAll(hwvtepSchema.getColumns());
+ Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
+ selectOperation.setColumns(hwvtepTableColumn);
+
+ ArrayList<Operation> operations = Lists.newArrayList(selectOperation);
+ try {
+ List<OperationResult> results = connectionInstance.transact(dbSchema, operations).get();
+ if (results != null && !results.isEmpty()) {
+ for (OperationResult selectResult : results) {
+ if (selectResult.getRows() != null && !selectResult.getRows().isEmpty()) {
+ for (Row<GenericTableSchema> row : selectResult.getRows()) {
+ tableRows.add(TyperUtils.getTypedRowWrapper(dbSchema, tableClass, row));
+ }
+ }
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Not able to fetch hardware_vtep table row from device {}",
+ connectionInstance.getConnectionInfo(), e);
+ }
+ }
+ return tableRows;
+ } catch (Throwable e) {
+ LOG.error("Failed to get the hwvtep ", e);
+ }
+ return Collections.emptyList();
+ }
+}
\ No newline at end of file
import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
-import org.opendaylight.yangtools.yang.binding.Identifiable;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
public class DependencyQueue {
private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
build();
- private static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
+ private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
private final HwvtepDeviceInfo deviceInfo;
+ private ScheduledFuture expiredTasksMonitorJob;
+ @SuppressWarnings("unchecked")
public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
this.deviceInfo = hwvtepDeviceInfo;
+ expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> {
+ try {
+ LOG.debug("Processing dependencies");
+ if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
+ expiredTasksMonitorJob.cancel(false);
+ }
+ deviceInfo.onOperDataAvailable();
+ } catch (Throwable e) {
+ //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
+ LOG.error("Failed to process dependencies", e);
+ }
+ }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
}
/**
final List<DependentJob> readyJobs = getReadyJobs(queue);
if (readyJobs.size() > 0) {
executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() {
+ HwvtepOperationalState operationalState;
@Override
public void execute(TransactionBuilder transactionBuilder) {
- HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
+ this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
for (DependentJob job : readyJobs) {
job.onDependencyResolved(operationalState, transactionBuilder);
}
}
@Override
- public void onConfigUpdate(TransactionBuilder transaction, InstanceIdentifier nodeIid,
- Identifiable data, InstanceIdentifier key, Object... extraData) {
+ public void onFailure(TransactionBuilder deviceTransaction) {
+ readyJobs.forEach((job) -> job.onFailure(deviceTransaction));
+ operationalState.clearIntransitKeys();
}
@Override
- public void doDeviceTransaction(TransactionBuilder transaction, InstanceIdentifier nodeIid,
- Identifiable data, InstanceIdentifier key, Object... extraData) {
+ public void onSuccess(TransactionBuilder deviceTransaction) {
+ readyJobs.forEach((job) -> job.onSuccess(deviceTransaction));
+ operationalState.getDeviceInfo().onOperDataAvailable();
}
}));
}
while(jobIterator.hasNext()) {
DependentJob job = jobIterator.next();
long currentTime = System.currentTimeMillis();
- if (job.isExpired(currentTime)) {
+
+ //first check if its dependencies are met later check for expired status
+ if (job.areDependenciesMet(deviceInfo)) {
jobIterator.remove();
+ readyJobs.add(job);
continue;
}
- if (job.areDependenciesMet(deviceInfo)) {
+ if (job.isExpired(currentTime)) {
jobIterator.remove();
- readyJobs.add(job);
+ continue;
}
}
return readyJobs;
package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+
import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.Identifiable;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import java.util.List;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class DependentJob<T extends Identifiable> {
+ private static final Logger LOG = LoggerFactory.getLogger(DependentJob.class);
+
+ private static final Predicate<HwvtepDeviceInfo.DeviceData> DATA_INTRANSIT
+ = (controllerData) -> controllerData != null && controllerData.isInTransitState();
+
+ private static final Predicate<HwvtepDeviceInfo.DeviceData> DATA_INTRANSIT_EXPIRED
+ = (controllerData) -> controllerData != null && controllerData.isInTransitState()
+ && controllerData.isIntransitTimeExpired();
+
+ //expecting the device to create the data
+ private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_CREATED
+ = (controllerData, deviceData) -> controllerData.getUuid() == null && deviceData.isPresent();
+
+ private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_NOT_CREATED
+ = (controllerData, deviceData) -> controllerData.getUuid() == null && !deviceData.isPresent();
+
+ //expecting the device to delete the data
+ private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_DELETED
+ = (controllerData, deviceData) -> controllerData.getUuid() != null && !deviceData.isPresent();
+
+ private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_NOT_DELETED
+ = (controllerData, deviceData) -> controllerData.getUuid() != null && deviceData.isPresent();
+
private final long expiryTime;
private final InstanceIdentifier key;
private final T data;
return data;
}
+ public boolean isConfigWaitingJob() {
+ return true;
+ }
+
+ public void onFailure(TransactionBuilder deviceTransaction) {
+ }
+
+ public void onSuccess(TransactionBuilder deviceTransaction) {
+ }
+
public abstract static class ConfigWaitingJob<T extends Identifiable> extends DependentJob {
public ConfigWaitingJob(InstanceIdentifier key, T data, Map dependencies) {
@Override
protected boolean isDependencyMet(HwvtepDeviceInfo deviceInfo, Class cls, InstanceIdentifier iid) {
- HwvtepDeviceInfo.DeviceData deviceData = deviceInfo.getDeviceOperData(cls, iid);
- return deviceData == null || deviceData.getStatus() != HwvtepDeviceInfo.DeviceDataStatus.IN_TRANSIT;
+ boolean depenencyMet = true;
+ HwvtepDeviceInfo.DeviceData controllerData = deviceInfo.getDeviceOperData(cls, iid);
+
+ if (DATA_INTRANSIT_EXPIRED.test(controllerData)) {
+ LOG.info("Intransit state expired for key: {} --- dependency {}", iid, getKey());
+ String clsName = cls.getSimpleName();
+
+ //either the device acted on the selected iid/uuid and sent the updated event or it did not
+ //here we are querying the device directly to get the latest status on the iid
+ Optional<TypedBaseTable> latestDeviceStatus = deviceInfo.getConnectionInstance().
+ getHwvtepTableReader().getHwvtepTableEntryUUID(cls, iid, controllerData.getUuid());
+
+ TypedBaseTable latestDeviceData = latestDeviceStatus.isPresent() ? latestDeviceStatus.get() : null;
+
+ if (INTRANSIT_DATA_CREATED.test(controllerData, latestDeviceStatus)) {
+ LOG.info("Intransit expired key is actually created but update is missed/delayed {}", iid);
+ deviceInfo.updateDeviceOperData(cls, iid, latestDeviceStatus.get().getUuid(), latestDeviceData);
+
+ } else if (INTRANSIT_DATA_NOT_CREATED.test(controllerData, latestDeviceStatus)) {
+ LOG.info("Intransit expired key is actually not created but update is missed/delayed {}", iid);
+ deviceInfo.clearDeviceOperData(cls, iid);
+
+ } else if (INTRANSIT_DATA_DELETED.test(controllerData, latestDeviceStatus)) {
+ //also deleted from device
+ LOG.info("Intransit expired key is actually deleted but update is missed/delayed {}", iid);
+ deviceInfo.clearDeviceOperData(cls, iid);
+
+ } else if (INTRANSIT_DATA_NOT_DELETED.test(controllerData, latestDeviceStatus)) {
+ //not deleted from device we will reuse existing uuid
+ LOG.info("Intransit expired key is actually not deleted but update is missed/delayed {}", iid);
+ deviceInfo.updateDeviceOperData(cls, iid, latestDeviceStatus.get().getUuid(), latestDeviceData);
+ }
+ } else if (DATA_INTRANSIT.test(controllerData)) {
+ //device status is still in transit
+ depenencyMet = false;
+ }
+ return depenencyMet;
+ }
+
+ public boolean isConfigWaitingJob() {
+ return false;
}
}
-}
+}
\ No newline at end of file
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.util.List;
deleteNode(CONFIGURATION);
}
+ void setFinalStatic(Class cls, String fieldName, Object newValue) throws Exception {
+ Field fields[] = FieldUtils.getAllFields(cls);
+ for (Field field : fields) {
+ if (fieldName.equals(field.getName())) {
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(null, newValue);
+ break;
+ }
+ }
+ }
+
void loadSchema() {
try (InputStream resourceAsStream = DataChangeListenerTestBase.class.getResourceAsStream("hwvtep_schema.json")) {
ObjectMapper mapper = new ObjectMapper();
field(HwvtepConnectionInstance.class, "txInvoker").set(connectionInstance, transactionInvoker);
field(HwvtepConnectionInstance.class, "deviceInfo").set(connectionInstance, new HwvtepDeviceInfo(connectionInstance));
field(HwvtepConnectionInstance.class, "client").set(connectionInstance, ovsdbClient);
+ when(connectionInstance.getOvsdbClient()).thenReturn(ovsdbClient);
+ when(ovsdbClient.isActive()).thenReturn(true);
when(connectionInstance.getConnectionInfo()).thenReturn(connectionInfo);
when(connectionInstance.getConnectionInfo().getRemoteAddress()).thenReturn(mock(InetAddress.class));
when(connectionInstance.getInstanceIdentifier()).thenReturn(nodeIid);
package org.opendaylight.ovsdb.hwvtepsouthbound;
-import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mockito;
import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependentJob;
import org.opendaylight.ovsdb.hwvtepsouthbound.transact.HwvtepOperationalState;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
import org.opendaylight.yangtools.yang.binding.Identifiable;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
InstanceIdentifier<LogicalSwitches> lsIid;
Map<Class<? extends Identifiable>, List<InstanceIdentifier>> unMetDependencies;
- void setupForTest() {
+ void setupForTest() throws Exception {
MCAST_MAC_DATA_VALIDATOR = Whitebox.getInternalState(McastMacsRemoteUpdateCommand.class, UnMetDependencyGetter.class);
opState = new HwvtepOperationalState(connectionInstance);
mac = TestBuilders.buildRemoteMcastMacs(nodeIid,"FF:FF:FF:FF:FF:FF", "ls0",
child(LogicalSwitches.class, new LogicalSwitchesKey(new HwvtepNodeName("ls0")));
macIid = nodeIid.augmentation(HwvtepGlobalAugmentation.class).
child(RemoteMcastMacs.class, new RemoteMcastMacsKey(mac.getKey()));
- Whitebox.setInternalState(DependencyQueue.class, "executorService", MoreExecutors.newDirectExecutorService());
+ setFinalStatic(DependencyQueue.class, "executorService", PowerMockito.mock(SameThreadScheduledExecutor.class, Mockito.CALLS_REAL_METHODS));
}
@Test
package org.opendaylight.ovsdb.hwvtepsouthbound;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
+import org.mockito.Mockito;
import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
import org.opendaylight.ovsdb.lib.operations.Operations;
import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
@Before
public void setupListener() throws Exception {
- setFinalStatic(DependencyQueue.class, "executorService", MoreExecutors.newDirectExecutorService());
+ setFinalStatic(DependencyQueue.class, "executorService", PowerMockito.mock(SameThreadScheduledExecutor.class, Mockito.CALLS_REAL_METHODS));
opDataChangeListener = new HwvtepOperationalDataChangeListener(dataBroker, hwvtepConnectionManager, connectionInstance);
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public abstract class SameThreadScheduledExecutor implements ScheduledExecutorService {
+
+ @Override
+ public Future<?> submit(Runnable runnable) {
+ runnable.run();
+ SettableFuture ft = SettableFuture.create();
+ ft.set(null);
+ return ft;
+ }
+}