bug 8029 handle expired in transit entries 37/61537/9
authorK.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
Fri, 11 Aug 2017 09:23:56 +0000 (14:53 +0530)
committerAnil Vishnoi <vishnoianil@gmail.com>
Tue, 3 Oct 2017 09:08:06 +0000 (09:08 +0000)
when ref count goes to zero , locator is kept in transit state.

if the device still has references to it from other sources like tunnels
the locator will not be deleted from device.

The transactions which wait for this locator will expire.

If the intransit state expires , query the device itself to get its
status and proceed with the transactions which depends on this locator.

added a timer job which triggers every 30sec and checks for expired
intransit keys and clears them.

Change-Id: I0022da4a3a2e748629bc031246f6e6c1509d27a4
Signed-off-by: K.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java [new file with mode: 0644]
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependencyQueue.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/DependentJob.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DependencyQueueTest.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepDataChangeListenerTest.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java [new file with mode: 0644]

index c8eab8b1e24d4a88245e2a1517468b6ed7f07aa1..30e17d2cb3ca398b1c45e03cc3bac154fb0ad62a 100644 (file)
@@ -58,6 +58,7 @@ public class HwvtepConnectionInstance {
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionInstance.class);
     private ConnectionInfo connectionInfo;
     private OvsdbClient client;
+    private final HwvtepTableReader hwvtepTableReader;
     private InstanceIdentifier<Node> instanceIdentifier;
     private TransactionInvoker txInvoker;
     private Map<DatabaseSchema,TransactInvoker> transactInvokers;
@@ -79,6 +80,7 @@ public class HwvtepConnectionInstance {
         this.txInvoker = txInvoker;
         this.deviceInfo = new HwvtepDeviceInfo(this);
         this.dataBroker = dataBroker;
+        this.hwvtepTableReader = new HwvtepTableReader(this);
     }
 
     public synchronized void transact(TransactCommand command) {
@@ -133,6 +135,8 @@ public class HwvtepConnectionInstance {
                     GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
                     Set<String> columns = new HashSet<>(tableSchema.getColumns());
                     List<String> skipColumns = HwvtepSouthboundConstants.SKIP_COLUMN_FROM_HWVTEP_TABLE.get(tableName);
+                    skipColumns = skipColumns == null ? new ArrayList<>() : new ArrayList<>(skipColumns);
+                    skipColumns.add(HwvtepSouthboundConstants.VERSION_COLUMN);
                     if (skipColumns != null) {
                         LOG.info("HwvtepSouthbound NOT monitoring columns {} in table {}", skipColumns, tableName);
                         columns.removeAll(skipColumns);
@@ -306,4 +310,8 @@ public class HwvtepConnectionInstance {
     public OvsdbClient getOvsdbClient() {
         return client;
     }
+
+    public HwvtepTableReader getHwvtepTableReader() {
+        return hwvtepTableReader;
+    }
 }
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepTableReader.java
new file mode 100644 (file)
index 0000000..1ba42fa
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+import org.opendaylight.ovsdb.lib.notation.Condition;
+import org.opendaylight.ovsdb.lib.notation.Row;
+import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.operations.Operation;
+import org.opendaylight.ovsdb.lib.operations.OperationResult;
+import org.opendaylight.ovsdb.lib.operations.Select;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
+import org.opendaylight.ovsdb.schema.hardwarevtep.LogicalSwitch;
+import org.opendaylight.ovsdb.schema.hardwarevtep.McastMacsRemote;
+import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalLocator;
+import org.opendaylight.ovsdb.schema.hardwarevtep.UcastMacsRemote;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteUcastMacs;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.opendaylight.ovsdb.lib.operations.Operations.op;
+
+public class HwvtepTableReader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HwvtepTableReader.class);
+
+    private final Map<Class, Function<InstanceIdentifier, List<Condition>>> whereClauseGetterMap = new HashMap();
+    private final Map<Class, Class> tableMap = new HashMap();
+    private final Map<Class, TypedBaseTable> tables = new HashMap<>();
+
+    private final HwvtepConnectionInstance connectionInstance;
+
+    public HwvtepTableReader(HwvtepConnectionInstance connectionInstance) {
+        this.connectionInstance = connectionInstance;
+        DatabaseSchema dbSchema = null;
+        try {
+            dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.warn("Not able to fetch schema for database {} from device {}",
+                    HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
+        }
+
+        tableMap.put(RemoteMcastMacs.class, McastMacsRemote.class);
+        tableMap.put(RemoteUcastMacs.class, UcastMacsRemote.class);
+        tableMap.put(LogicalSwitches.class, LogicalSwitch.class);
+        tableMap.put(TerminationPoint.class, PhysicalLocator.class);
+
+        whereClauseGetterMap.put(RemoteMcastMacs.class, new RemoteMcastMacWhereClauseGetter());
+        whereClauseGetterMap.put(RemoteUcastMacs.class, new RemoteUcastMacWhereClauseGetter());
+        whereClauseGetterMap.put(LogicalSwitches.class, new LogicalSwitchWhereClauseGetter());
+        whereClauseGetterMap.put(TerminationPoint.class, new LocatorWhereClauseGetter());
+
+        tables.put(McastMacsRemote.class, TyperUtils.getTypedRowWrapper(dbSchema, McastMacsRemote.class, null));
+        tables.put(UcastMacsRemote.class, TyperUtils.getTypedRowWrapper(dbSchema, UcastMacsRemote.class, null));
+        tables.put(LogicalSwitch.class, TyperUtils.getTypedRowWrapper(dbSchema, LogicalSwitch.class, null));
+        tables.put(PhysicalLocator.class, TyperUtils.getTypedRowWrapper(dbSchema, PhysicalLocator.class, null));
+    }
+
+    class RemoteMcastMacWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+        @Override
+        public List<Condition> apply(InstanceIdentifier iid) {
+            InstanceIdentifier<RemoteMcastMacs> macIid = iid;
+            String mac = macIid.firstKeyOf(RemoteMcastMacs.class).getMacEntryKey().getValue();
+            InstanceIdentifier<LogicalSwitches> lsIid = (InstanceIdentifier<LogicalSwitches>) macIid.firstKeyOf(
+                    RemoteMcastMacs.class).getLogicalSwitchRef().getValue();
+            UUID lsUUID = connectionInstance.getDeviceInfo().getUUID(LogicalSwitches.class, lsIid);
+            if (lsUUID == null) {
+                LOG.error("Could not find uuid for ls key {}", lsIid);
+                return null;
+            }
+
+            McastMacsRemote macTable = (McastMacsRemote) tables.get(McastMacsRemote.class);
+            ArrayList<Condition> conditions = new ArrayList<>();
+            conditions.add(macTable.getLogicalSwitchColumn().getSchema().opEqual(lsUUID));
+            conditions.add(macTable.getMacColumn().getSchema().opEqual(mac));
+            return conditions;
+        }
+    }
+
+    class RemoteUcastMacWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+        @Override
+        public List<Condition> apply(InstanceIdentifier iid) {
+            InstanceIdentifier<RemoteUcastMacs> macIid = iid;
+            String mac = macIid.firstKeyOf(RemoteUcastMacs.class).getMacEntryKey().getValue();
+            InstanceIdentifier<LogicalSwitches> lsIid = (InstanceIdentifier<LogicalSwitches>) macIid.firstKeyOf(
+                    RemoteUcastMacs.class).getLogicalSwitchRef().getValue();
+            UUID lsUUID = connectionInstance.getDeviceInfo().getUUID(LogicalSwitches.class, lsIid);
+            if (lsUUID == null) {
+                LOG.error("Could not find uuid for ls key {}", lsIid);
+                return null;
+            }
+
+            UcastMacsRemote macTable = (UcastMacsRemote) tables.get(UcastMacsRemote.class);
+            ArrayList<Condition> conditions = new ArrayList<>();
+            conditions.add(macTable.getLogicalSwitchColumn().getSchema().opEqual(lsUUID));
+            conditions.add(macTable.getMacColumn().getSchema().opEqual(mac));
+            return conditions;
+        }
+    }
+
+    class LogicalSwitchWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+        @Override
+        public List<Condition> apply(InstanceIdentifier iid) {
+            InstanceIdentifier<LogicalSwitches> lsIid = iid;
+            String lsName = lsIid.firstKeyOf(LogicalSwitches.class).getHwvtepNodeName().getValue();
+            LogicalSwitch logicalSwitch = (LogicalSwitch) tables.get(LogicalSwitch.class);
+            return Lists.newArrayList(logicalSwitch.getNameColumn().getSchema().opEqual(lsName));
+        }
+    }
+
+    class LocatorWhereClauseGetter implements Function<InstanceIdentifier, List<Condition>> {
+        @Override
+        public List<Condition> apply(InstanceIdentifier iid) {
+            InstanceIdentifier<TerminationPoint> tepIid = iid;
+            String locatorIp = tepIid.firstKeyOf(TerminationPoint.class).getTpId().getValue();
+            locatorIp = locatorIp.substring(locatorIp.indexOf(":") + 1);
+            LOG.info("Locator ip to look for {}", locatorIp);
+            PhysicalLocator locatorTable = (PhysicalLocator) tables.get(PhysicalLocator.class);
+            return Lists.newArrayList(locatorTable.getDstIpColumn().getSchema().opEqual(locatorIp));
+        }
+    }
+
+    public Optional<TypedBaseTable> getHwvtepTableEntryUUID(Class<? extends Identifiable> cls,
+                                                            InstanceIdentifier iid,
+                                                            UUID existingUUID) {
+        try {
+            DatabaseSchema dbSchema = null;
+            TypedBaseTable globalRow = null;
+            Class<TypedBaseTable> tableClass = tableMap.get(cls);
+            try {
+                dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.warn("Not able to fetch schema for database {} from device {}",
+                        HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
+            }
+
+            if (dbSchema != null) {
+                GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, tableClass);
+
+                List<String> hwvtepTableColumn = new ArrayList<>();
+                hwvtepTableColumn.addAll(hwvtepSchema.getColumns());
+                Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
+                selectOperation.setColumns(hwvtepTableColumn);
+
+                if (existingUUID != null) {
+                    TypedBaseTable table = tables.get(tableClass);
+                    LOG.info("Setting uuid condition {} ", existingUUID);
+                    selectOperation.where(table.getUuidColumn().getSchema().opEqual(existingUUID));
+                } else {
+                    if (whereClauseGetterMap.get(cls) != null) {
+                        List<Condition> conditions = whereClauseGetterMap.get(cls).apply(iid);
+                        if (conditions != null) {
+                            if (conditions.size() == 2) {
+                                selectOperation.where(conditions.get(0)).and(conditions.get(1));
+                            } else {
+                                selectOperation.where(conditions.get(0));
+                            }
+                        } else {
+                            LOG.error("Could not get where conditions for cls {} key {}", cls, iid);
+                            return Optional.empty();
+                        }
+                    } else {
+                        LOG.error("Could not get where class for cls {} ", cls);
+                        return Optional.empty();
+                    }
+                }
+                ArrayList<Operation> operations = new ArrayList<>();
+                operations.add(selectOperation);
+                try {
+                    List<OperationResult> results = connectionInstance.transact(dbSchema, operations).get();
+                    if (results != null && !results.isEmpty()) {
+                        OperationResult selectResult = results.get(0);
+                        if (selectResult.getRows() != null && !selectResult.getRows().isEmpty()) {
+                            globalRow = TyperUtils.getTypedRowWrapper(
+                                    dbSchema, tableClass, selectResult.getRows().get(0));
+                        }
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.warn("Not able to fetch hardware_vtep table row from device {}",
+                            connectionInstance.getConnectionInfo(), e);
+                }
+            }
+            LOG.trace("Fetched {} from hardware_vtep schema", globalRow);
+            if (globalRow != null && globalRow.getUuid() != null) {
+                return Optional.of(globalRow);
+            }
+            return Optional.empty();
+        } catch (Throwable e) {
+            LOG.error("Failed to get the hwvtep row for iid {} {} ", iid, e);
+            return Optional.empty();
+        }
+    }
+
+    public List<TypedBaseTable> getHwvtepTableEntries(Class<? extends Identifiable> cls) {
+        try {
+            List<TypedBaseTable> tableRows = new ArrayList<>();
+            DatabaseSchema dbSchema = null;
+            TypedBaseTable globalRow = null;
+            Class<TypedBaseTable> tableClass = tableMap.get(cls);
+            try {
+                dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Not able to fetch schema for database {} from device {}",
+                        HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
+            }
+
+            if (dbSchema != null) {
+                GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, tableClass);
+
+                List<String> hwvtepTableColumn = new ArrayList<>();
+                hwvtepTableColumn.addAll(hwvtepSchema.getColumns());
+                Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
+                selectOperation.setColumns(hwvtepTableColumn);
+
+                ArrayList<Operation> operations = Lists.newArrayList(selectOperation);
+                try {
+                    List<OperationResult> results = connectionInstance.transact(dbSchema, operations).get();
+                    if (results != null && !results.isEmpty()) {
+                        for (OperationResult selectResult : results) {
+                            if (selectResult.getRows() != null && !selectResult.getRows().isEmpty()) {
+                                for (Row<GenericTableSchema> row : selectResult.getRows()) {
+                                    tableRows.add(TyperUtils.getTypedRowWrapper(dbSchema, tableClass, row));
+                                }
+                            }
+                        }
+                    }
+                } catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Not able to fetch hardware_vtep table row from device {}",
+                            connectionInstance.getConnectionInfo(), e);
+                }
+            }
+            return tableRows;
+        } catch (Throwable e) {
+            LOG.error("Failed to get the hwvtep ", e);
+        }
+        return Collections.emptyList();
+    }
+}
\ No newline at end of file
index 65af6be9ead37d85c4edf18176729371dfbf3a00..e0fca77a936afcf7361d5a8acfd326e47c509ea0 100644 (file)
@@ -13,34 +13,48 @@ import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
-import org.opendaylight.yangtools.yang.binding.Identifiable;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 public class DependencyQueue {
 
     private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
     private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
             build();
-    private static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
+    private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
 
     private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
     private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
     private final HwvtepDeviceInfo deviceInfo;
+    private ScheduledFuture expiredTasksMonitorJob;
 
+    @SuppressWarnings("unchecked")
     public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
         this.deviceInfo = hwvtepDeviceInfo;
+        expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> {
+            try {
+                LOG.debug("Processing dependencies");
+                if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
+                    expiredTasksMonitorJob.cancel(false);
+                }
+                deviceInfo.onOperDataAvailable();
+            } catch (Throwable e) {
+                //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
+                LOG.error("Failed to process dependencies", e);
+            }
+        }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -84,22 +98,25 @@ public class DependencyQueue {
         final List<DependentJob> readyJobs =  getReadyJobs(queue);
         if (readyJobs.size() > 0) {
             executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() {
+                HwvtepOperationalState operationalState;
                 @Override
                 public void execute(TransactionBuilder transactionBuilder) {
-                    HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
+                    this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
                     for (DependentJob job : readyJobs) {
                         job.onDependencyResolved(operationalState, transactionBuilder);
                     }
                 }
 
                 @Override
-                public void onConfigUpdate(TransactionBuilder transaction, InstanceIdentifier nodeIid,
-                                           Identifiable data, InstanceIdentifier key, Object... extraData) {
+                public void onFailure(TransactionBuilder deviceTransaction) {
+                    readyJobs.forEach((job) -> job.onFailure(deviceTransaction));
+                    operationalState.clearIntransitKeys();
                 }
 
                 @Override
-                public void doDeviceTransaction(TransactionBuilder transaction, InstanceIdentifier nodeIid,
-                                                Identifiable data, InstanceIdentifier key, Object... extraData) {
+                public void onSuccess(TransactionBuilder deviceTransaction) {
+                    readyJobs.forEach((job) -> job.onSuccess(deviceTransaction));
+                    operationalState.getDeviceInfo().onOperDataAvailable();
                 }
             }));
         }
@@ -111,13 +128,16 @@ public class DependencyQueue {
         while(jobIterator.hasNext()) {
             DependentJob job = jobIterator.next();
             long currentTime = System.currentTimeMillis();
-            if (job.isExpired(currentTime)) {
+
+            //first check if its dependencies are met later check for expired status
+            if (job.areDependenciesMet(deviceInfo)) {
                 jobIterator.remove();
+                readyJobs.add(job);
                 continue;
             }
-            if (job.areDependenciesMet(deviceInfo)) {
+            if (job.isExpired(currentTime)) {
                 jobIterator.remove();
-                readyJobs.add(job);
+                continue;
             }
         }
         return readyJobs;
index 478dd17b0dd6873e342b40cdc929700b4561d4d8..820ccf10151f0d8b88d9dae22ba62c9306644f56 100644 (file)
@@ -8,18 +8,47 @@
 
 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+
 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Identifiable;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import java.util.List;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class DependentJob<T extends Identifiable> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DependentJob.class);
+
+    private static final Predicate<HwvtepDeviceInfo.DeviceData> DATA_INTRANSIT
+            = (controllerData) -> controllerData != null && controllerData.isInTransitState();
+
+    private static final Predicate<HwvtepDeviceInfo.DeviceData> DATA_INTRANSIT_EXPIRED
+            = (controllerData) -> controllerData != null && controllerData.isInTransitState()
+            && controllerData.isIntransitTimeExpired();
+
+    //expecting the device to create the data
+    private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_CREATED
+            = (controllerData, deviceData) -> controllerData.getUuid() == null && deviceData.isPresent();
+
+    private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_NOT_CREATED
+            = (controllerData, deviceData) -> controllerData.getUuid() == null && !deviceData.isPresent();
+
+    //expecting the device to delete the data
+    private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_DELETED
+            = (controllerData, deviceData) -> controllerData.getUuid() != null && !deviceData.isPresent();
+
+    private static final BiPredicate<HwvtepDeviceInfo.DeviceData, Optional<TypedBaseTable>> INTRANSIT_DATA_NOT_DELETED
+            = (controllerData, deviceData) -> controllerData.getUuid() != null && deviceData.isPresent();
+
     private final long expiryTime;
     private final InstanceIdentifier key;
     private final T data;
@@ -79,6 +108,16 @@ public abstract class DependentJob<T extends Identifiable> {
         return data;
     }
 
+    public boolean isConfigWaitingJob() {
+        return true;
+    }
+
+    public void onFailure(TransactionBuilder deviceTransaction) {
+    }
+
+    public void onSuccess(TransactionBuilder deviceTransaction) {
+    }
+
     public abstract static class ConfigWaitingJob<T extends Identifiable> extends DependentJob {
 
         public ConfigWaitingJob(InstanceIdentifier key, T data, Map dependencies) {
@@ -99,8 +138,47 @@ public abstract class DependentJob<T extends Identifiable> {
 
         @Override
         protected boolean isDependencyMet(HwvtepDeviceInfo deviceInfo, Class cls, InstanceIdentifier iid) {
-            HwvtepDeviceInfo.DeviceData deviceData = deviceInfo.getDeviceOperData(cls, iid);
-            return deviceData == null || deviceData.getStatus() != HwvtepDeviceInfo.DeviceDataStatus.IN_TRANSIT;
+            boolean depenencyMet = true;
+            HwvtepDeviceInfo.DeviceData controllerData = deviceInfo.getDeviceOperData(cls, iid);
+
+            if (DATA_INTRANSIT_EXPIRED.test(controllerData)) {
+                LOG.info("Intransit state expired for key: {} --- dependency {}", iid, getKey());
+                String clsName = cls.getSimpleName();
+
+                //either the device acted on the selected iid/uuid and sent the updated event or it did not
+                //here we are querying the device directly to get the latest status on the iid
+                Optional<TypedBaseTable> latestDeviceStatus = deviceInfo.getConnectionInstance().
+                        getHwvtepTableReader().getHwvtepTableEntryUUID(cls, iid, controllerData.getUuid());
+
+                TypedBaseTable latestDeviceData = latestDeviceStatus.isPresent() ? latestDeviceStatus.get() : null;
+
+                if (INTRANSIT_DATA_CREATED.test(controllerData, latestDeviceStatus)) {
+                    LOG.info("Intransit expired key is actually created but update is missed/delayed {}", iid);
+                    deviceInfo.updateDeviceOperData(cls, iid, latestDeviceStatus.get().getUuid(), latestDeviceData);
+
+                } else if (INTRANSIT_DATA_NOT_CREATED.test(controllerData, latestDeviceStatus)) {
+                    LOG.info("Intransit expired key is actually not created but update is missed/delayed {}", iid);
+                    deviceInfo.clearDeviceOperData(cls, iid);
+
+                } else if (INTRANSIT_DATA_DELETED.test(controllerData, latestDeviceStatus)) {
+                    //also deleted from device
+                    LOG.info("Intransit expired key is actually deleted but update is missed/delayed {}", iid);
+                    deviceInfo.clearDeviceOperData(cls, iid);
+
+                } else if (INTRANSIT_DATA_NOT_DELETED.test(controllerData, latestDeviceStatus)) {
+                    //not deleted from device we will reuse existing uuid
+                    LOG.info("Intransit expired key is actually not deleted but update is missed/delayed {}", iid);
+                    deviceInfo.updateDeviceOperData(cls, iid, latestDeviceStatus.get().getUuid(), latestDeviceData);
+                }
+            } else if (DATA_INTRANSIT.test(controllerData)) {
+                //device status is still in transit
+                depenencyMet = false;
+            }
+            return depenencyMet;
+        }
+
+        public boolean isConfigWaitingJob() {
+            return false;
         }
     }
-}
+}
\ No newline at end of file
index 5da2e45f992f70ff54d7dd65f7d55e07bee4eabe..a9b00b86089b1174aaaf3d3f0a5ce26b9d206600 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.ovsdb.hwvtepsouthbound;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.ArgumentCaptor;
@@ -57,6 +58,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.net.InetAddress;
 import java.util.List;
 
@@ -129,6 +132,20 @@ public class DataChangeListenerTestBase extends AbstractDataBrokerTest {
         deleteNode(CONFIGURATION);
     }
 
+    void setFinalStatic(Class cls, String fieldName, Object newValue) throws Exception {
+        Field fields[] = FieldUtils.getAllFields(cls);
+        for (Field field : fields) {
+            if (fieldName.equals(field.getName())) {
+                field.setAccessible(true);
+                Field modifiersField = Field.class.getDeclaredField("modifiers");
+                modifiersField.setAccessible(true);
+                modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+                field.set(null, newValue);
+                break;
+            }
+        }
+    }
+
     void loadSchema() {
         try (InputStream resourceAsStream = DataChangeListenerTestBase.class.getResourceAsStream("hwvtep_schema.json")) {
             ObjectMapper mapper = new ObjectMapper();
@@ -168,6 +185,8 @@ public class DataChangeListenerTestBase extends AbstractDataBrokerTest {
         field(HwvtepConnectionInstance.class, "txInvoker").set(connectionInstance, transactionInvoker);
         field(HwvtepConnectionInstance.class, "deviceInfo").set(connectionInstance, new HwvtepDeviceInfo(connectionInstance));
         field(HwvtepConnectionInstance.class, "client").set(connectionInstance, ovsdbClient);
+        when(connectionInstance.getOvsdbClient()).thenReturn(ovsdbClient);
+        when(ovsdbClient.isActive()).thenReturn(true);
         when(connectionInstance.getConnectionInfo()).thenReturn(connectionInfo);
         when(connectionInstance.getConnectionInfo().getRemoteAddress()).thenReturn(mock(InetAddress.class));
         when(connectionInstance.getInstanceIdentifier()).thenReturn(nodeIid);
index fab640c4299d0ce9e68aa0844452f32f422052d7..b4fd63789786102e1276b1139b7d33e28f12528b 100644 (file)
@@ -8,9 +8,9 @@
 
 package org.opendaylight.ovsdb.hwvtepsouthbound;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mockito;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependentJob;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.HwvtepOperationalState;
@@ -28,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hw
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yangtools.yang.binding.Identifiable;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
@@ -55,7 +56,7 @@ public class DependencyQueueTest extends DataChangeListenerTestBase {
     InstanceIdentifier<LogicalSwitches> lsIid;
     Map<Class<? extends Identifiable>, List<InstanceIdentifier>> unMetDependencies;
 
-    void setupForTest() {
+    void setupForTest() throws Exception {
         MCAST_MAC_DATA_VALIDATOR = Whitebox.getInternalState(McastMacsRemoteUpdateCommand.class, UnMetDependencyGetter.class);
         opState = new HwvtepOperationalState(connectionInstance);
         mac = TestBuilders.buildRemoteMcastMacs(nodeIid,"FF:FF:FF:FF:FF:FF", "ls0",
@@ -64,7 +65,7 @@ public class DependencyQueueTest extends DataChangeListenerTestBase {
                 child(LogicalSwitches.class, new LogicalSwitchesKey(new HwvtepNodeName("ls0")));
         macIid = nodeIid.augmentation(HwvtepGlobalAugmentation.class).
                 child(RemoteMcastMacs.class, new RemoteMcastMacsKey(mac.getKey()));
-        Whitebox.setInternalState(DependencyQueue.class, "executorService", MoreExecutors.newDirectExecutorService());
+        setFinalStatic(DependencyQueue.class, "executorService", PowerMockito.mock(SameThreadScheduledExecutor.class, Mockito.CALLS_REAL_METHODS));
     }
 
     @Test
index edb6f1271fc7ed470b72564fec2b82409038e4dc..491be05107f9894dff5182aa844cfa153fb6b907 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.ovsdb.hwvtepsouthbound;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Matchers;
+import org.mockito.Mockito;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
 import org.opendaylight.ovsdb.lib.operations.Operations;
 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
@@ -29,6 +29,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hw
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
@@ -94,7 +95,7 @@ public class HwvtepDataChangeListenerTest extends DataChangeListenerTestBase {
 
     @Before
     public void setupListener() throws Exception {
-        setFinalStatic(DependencyQueue.class, "executorService", MoreExecutors.newDirectExecutorService());
+        setFinalStatic(DependencyQueue.class, "executorService", PowerMockito.mock(SameThreadScheduledExecutor.class, Mockito.CALLS_REAL_METHODS));
         opDataChangeListener = new HwvtepOperationalDataChangeListener(dataBroker, hwvtepConnectionManager, connectionInstance);
     }
 
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/SameThreadScheduledExecutor.java
new file mode 100644 (file)
index 0000000..cafe1db
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public abstract class SameThreadScheduledExecutor implements ScheduledExecutorService {
+
+    @Override
+    public Future<?> submit(Runnable runnable) {
+        runnable.run();
+        SettableFuture ft = SettableFuture.create();
+        ft.set(null);
+        return ft;
+    }
+}