Existing implementation pushes the Monitor response also as an asynchronous update() response.
Though it has similar effect, it does not comply with the RFC. Hence changing the monitor
reply as a synchronous response (using Future ofcourse) and continuing to use the update() as such.
Change-Id: I10bf114c52f37295efae70c8b9763570bcc11ce0
Signed-off-by: Madhu Venugopal <mavenugo@gmail.com>
import java.util.List;
import org.opendaylight.ovsdb.lib.message.MonitorRequest;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
import org.opendaylight.ovsdb.lib.notation.Row;
import org.opendaylight.ovsdb.lib.operations.Operation;
import org.opendaylight.ovsdb.lib.operations.OperationResult;
* handle is used to later cancel ({@link #cancelMonitor(MonitorHandle)}) the monitor.
* @param callback receives the monitor response
*/
- public <E extends TableSchema<E>> MonitorHandle monitor(DatabaseSchema schema, List<MonitorRequest<E>> monitorRequests,
+ public <E extends TableSchema<E>> TableUpdates monitor(DatabaseSchema schema, List<MonitorRequest<E>> monitorRequests,
MonitorCallBack callback);
/**
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
logger.info("callback received with context {}, but no known handler. Ignoring!", key);
return;
}
- _transformingCallback(updateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
+ TableUpdates updates = _transformingCallback(updateNotification.getUpdates(), callbackContext.schema);
+ monitorCallBack.update(updates, callbackContext.schema);
}
@Override
}
- protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
+ protected TableUpdates _transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
//todo(ashwin): we should move all the JSON parsing logic to a utility class
if (tableUpdatesJson instanceof ObjectNode) {
Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
}
- TableUpdates updates = new TableUpdates(tableUpdateMap);
- monitorCallBack.update(updates, dbSchema);
+ return new TableUpdates(tableUpdateMap);
}
+ return null;
}
@Override
}
@Override
- public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
+ public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
List<MonitorRequest<E>> monitorRequest,
final MonitorCallBack callback) {
return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
}
});
- Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
- @Override
- public void onSuccess(JsonNode result) {
- _transformingCallback(result, callback, dbSchema);
- }
-
- @Override
- public void onFailure(Throwable t) {
- callback.exception(t);
- }
- });
-
- return monitorHandle;
+ JsonNode result;
+ try {
+ result = monitor.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return null;
+ }
+ TableUpdates updates = _transformingCallback(result, dbSchema);
+ return updates;
}
private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
final List<Object> results = Lists.newArrayList();
- MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
+ TableUpdates updates = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
@Override
public void update(TableUpdates result, DatabaseSchema dbSchema) {
results.add(result);
System.out.println("t = " + t);
}
});
-
+ if (updates != null) results.add(updates);
for (int i = 0; i < 3 ; i++) { //wait 3 seconds to get a result
System.out.println("waiting on monitor response for Bridge Table...");
if (!results.isEmpty()) break;
Assert.assertTrue(!results.isEmpty());
Object result = results.get(0);
Assert.assertTrue(result instanceof TableUpdates);
- TableUpdates updates = (TableUpdates) result;
+ updates = (TableUpdates) result;
TableUpdate<GenericTableSchema> update = updates.getUpdate(bridge);
Row<GenericTableSchema> aNew = update.getNew();
if (filter) {
final List<Object> results = Lists.newArrayList();
- MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
+ TableUpdates updates = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
@Override
public void update(TableUpdates result, DatabaseSchema dbSchema) {
results.add(result);
}
});
+ if (updates != null) results.add(updates);
for (int i = 0; i < 3 ; i++) { //wait 5 seconds to get a result
- System.out.println("waiting on monitor response for open_vSwtich Table...");
if (!results.isEmpty()) break;
+ System.out.println("waiting on monitor response for open_vSwtich Table...");
Thread.sleep(1000);
}
Assert.assertTrue(!results.isEmpty());
Object result = results.get(0); // open_vSwitch table has just 1 row.
Assert.assertTrue(result instanceof TableUpdates);
- TableUpdates updates = (TableUpdates) result;
+ updates = (TableUpdates) result;
TableUpdate<GenericTableSchema> update = updates.getUpdate(ovsTable);
Assert.assertNotNull(update.getUuid());
return update.getUuid();
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
import org.opendaylight.ovsdb.lib.MonitorCallBack;
-import org.opendaylight.ovsdb.lib.MonitorHandle;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnection;
import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
import org.opendaylight.ovsdb.lib.schema.TableSchema;
-import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
/**
props.add(l4Port);
inventoryServiceInternal.addNode(connection.getNode(), props);
- OpenVSwitch openVSwitch = connection.getClient().getTypedRowWrapper(OpenVSwitch.class, null);
- List<String> dbNames = Arrays.asList(openVSwitch.getSchema().getName());
- ListenableFuture<DatabaseSchema> dbSchemaF = client.getSchema(OvsVswitchdSchemaConstants.DATABASE_NAME);
- DatabaseSchema databaseSchema = dbSchemaF.get();
- this.monitorTables(connection.getNode());
- inventoryServiceInternal.notifyNodeAdded(connection.getNode());
- }
-
- public void monitorTables(Node node) throws ExecutionException, InterruptedException, IOException {
- OvsdbClient client = ovsdbConnections.get(node.getID()).getClient();
List<String> databases = client.getDatabases().get();
if (databases == null) {
logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
}
for (String database : databases) {
DatabaseSchema dbSchema = client.getSchema(database).get();
- if (dbSchema == null) {
- logger.error("Unable to get Database Schema for the ovsdb connection : {} , database : {}", client.getConnectionInfo(), database);
- return;
- }
- Set<String> tables = dbSchema.getTables();
- if (tables == null) {
- logger.warn("Database {} without any tables. Strange !", database);
- continue;
- }
- List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
- for (String tableName : tables) {
- GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
- monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
- }
- MonitorHandle monitor = client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
+ TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
+ inventoryServiceInternal.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
+ }
+ inventoryServiceInternal.notifyNodeAdded(connection.getNode());
+ }
+
+ public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
+ String identifier = (String) node.getID();
+ Connection connection = ovsdbConnections.get(identifier);
+ OvsdbClient client = connection.getClient();
+ if (dbSchema == null) {
+ logger.error("Unable to get Database Schema for the ovsdb connection : {} , database : {}", client.getConnectionInfo(), dbSchema.getName());
+ return null;
+ }
+ Set<String> tables = dbSchema.getTables();
+ if (tables == null) {
+ logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
+ return null;
+ }
+ List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
+ for (String tableName : tables) {
+ GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
+ monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
}
+ return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
}
/**
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.ovsdb.lib.MonitorCallBack;
-import org.opendaylight.ovsdb.lib.MonitorHandle;
import org.opendaylight.ovsdb.lib.message.MonitorRequest;
import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
import org.opendaylight.ovsdb.lib.message.MonitorSelect;
monitorRequests.add(this.getAllColumnsMonitorRequest(Bridge.class));
monitorRequests.add(this.getAllColumnsMonitorRequest(OpenVSwitch.class));
- MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new UpdateMonitor());
- Assert.assertNotNull(monitor);
+ TableUpdates updates = ovs.monitor(dbSchema, monitorRequests, new UpdateMonitor());
+ Assert.assertNotNull(updates);
+ this.updateTableCache(updates);
}
/**
}
+ private void updateTableCache(TableUpdates updates) {
+ for (String tableName : updates.getUpdates().keySet()) {
+ Map<UUID, Row> tUpdate = OpenVswitchSchemaSuiteIT.getTableCache().get(tableName);
+ TableUpdate update = updates.getUpdates().get(tableName);
+ if (update.getNew() != null) {
+ if (tUpdate == null) {
+ tUpdate = new HashMap<>();
+ OpenVswitchSchemaSuiteIT.getTableCache().put(tableName, tUpdate);
+ }
+ tUpdate.put(update.getUuid(), update.getNew());
+ } else {
+ tUpdate.remove(update.getUuid());
+ }
+ }
+ }
+
private class UpdateMonitor implements MonitorCallBack {
@Override
public void update(TableUpdates result, DatabaseSchema dbSchema) {
- for (String tableName : result.getUpdates().keySet()) {
- Map<UUID, Row> tUpdate = OpenVswitchSchemaSuiteIT.getTableCache().get(tableName);
- TableUpdate update = result.getUpdates().get(tableName);
- if (update.getNew() != null) {
- if (tUpdate == null) {
- tUpdate = new HashMap<>();
- OpenVswitchSchemaSuiteIT.getTableCache().put(tableName, tUpdate);
- }
- tUpdate.put(update.getUuid(), update.getNew());
- } else {
- tUpdate.remove(update.getUuid());
- }
- }
+ updateTableCache(result);
}
@Override