Changing the monitor operation in the library layer to return TableUpdates as per... 13/8713/1
authorMadhu Venugopal <mavenugo@gmail.com>
Mon, 7 Jul 2014 01:24:31 +0000 (18:24 -0700)
committerMadhu Venugopal <mavenugo@gmail.com>
Mon, 7 Jul 2014 01:40:48 +0000 (18:40 -0700)
Existing implementation pushes the Monitor response also as an asynchronous update() response.
Though it has similar effect, it does not comply with the RFC. Hence changing the monitor
reply as a synchronous response (using Future ofcourse) and continuing to use the update() as such.

Change-Id: I10bf114c52f37295efae70c8b9763570bcc11ce0
Signed-off-by: Madhu Venugopal <mavenugo@gmail.com>
library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java
library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java
library/src/test/java/org/opendaylight/ovsdb/lib/OvsdbClientTestIT.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java
schemas/Open_vSwitch/src/test/java/org/opendaylight/ovsdb/schema/openvswitch/MonitorTestCases.java

index fc0232b209878c3a20c25470e8ccb58c2f1b69ed..b2986dabdc4ae66cf2ae193238a817a5a2c4c0c1 100644 (file)
@@ -15,6 +15,7 @@ package org.opendaylight.ovsdb.lib;
 import java.util.List;
 
 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
 import org.opendaylight.ovsdb.lib.notation.Row;
 import org.opendaylight.ovsdb.lib.operations.Operation;
 import org.opendaylight.ovsdb.lib.operations.OperationResult;
@@ -66,7 +67,7 @@ public interface OvsdbClient {
      *                       handle is used to later cancel ({@link #cancelMonitor(MonitorHandle)}) the monitor.
      * @param callback receives the monitor response
      */
-    public <E extends TableSchema<E>> MonitorHandle monitor(DatabaseSchema schema, List<MonitorRequest<E>> monitorRequests,
+    public <E extends TableSchema<E>> TableUpdates monitor(DatabaseSchema schema, List<MonitorRequest<E>> monitorRequests,
                                  MonitorCallBack callback);
 
     /**
index a396a0e1d8be6e1e8262e767df5eee74798ca914..75198f09b5a3cd0b23cd875d131eeed23d8adb9a 100644 (file)
@@ -17,6 +17,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
@@ -52,7 +53,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -94,7 +94,8 @@ public class OvsdbClientImpl implements OvsdbClient {
                         logger.info("callback received with context {}, but no known handler. Ignoring!", key);
                         return;
                     }
-                    _transformingCallback(updateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
+                    TableUpdates updates = _transformingCallback(updateNotification.getUpdates(), callbackContext.schema);
+                    monitorCallBack.update(updates, callbackContext.schema);
                 }
 
                 @Override
@@ -113,7 +114,7 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
 
-    protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
+    protected TableUpdates _transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
         //todo(ashwin): we should move all the JSON parsing logic to a utility class
         if (tableUpdatesJson instanceof ObjectNode) {
             Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
@@ -126,9 +127,9 @@ public class OvsdbClientImpl implements OvsdbClient {
                 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
 
             }
-            TableUpdates updates = new TableUpdates(tableUpdateMap);
-            monitorCallBack.update(updates, dbSchema);
+            return new TableUpdates(tableUpdateMap);
         }
+        return null;
     }
 
     @Override
@@ -144,7 +145,7 @@ public class OvsdbClientImpl implements OvsdbClient {
     }
 
     @Override
-    public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
+    public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
                                                             List<MonitorRequest<E>> monitorRequest,
                                                             final MonitorCallBack callback) {
 
@@ -165,19 +166,14 @@ public class OvsdbClientImpl implements OvsdbClient {
                 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
             }
         });
-        Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
-            @Override
-            public void onSuccess(JsonNode result) {
-                _transformingCallback(result, callback, dbSchema);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                callback.exception(t);
-            }
-        });
-
-        return monitorHandle;
+        JsonNode result;
+        try {
+            result = monitor.get();
+        } catch (InterruptedException | ExecutionException e) {
+            return null;
+        }
+        TableUpdates updates = _transformingCallback(result, dbSchema);
+        return updates;
     }
 
     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
index f8360ddf5679a9e89aecb22bf4f79ca273b3cfd7..aaa3d8ef3abb01a93a0eeebc5478058db47b4ef0 100644 (file)
@@ -95,7 +95,7 @@ public class OvsdbClientTestIT extends OvsdbTestBase {
 
         final List<Object> results = Lists.newArrayList();
 
-        MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
+        TableUpdates updates = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
             @Override
             public void update(TableUpdates result, DatabaseSchema dbSchema) {
                 results.add(result);
@@ -108,7 +108,7 @@ public class OvsdbClientTestIT extends OvsdbTestBase {
                 System.out.println("t = " + t);
             }
         });
-
+        if (updates != null) results.add(updates);
         for (int i = 0; i < 3 ; i++) { //wait 3 seconds to get a result
             System.out.println("waiting on monitor response for Bridge Table...");
             if (!results.isEmpty()) break;
@@ -118,7 +118,7 @@ public class OvsdbClientTestIT extends OvsdbTestBase {
         Assert.assertTrue(!results.isEmpty());
         Object result = results.get(0);
         Assert.assertTrue(result instanceof TableUpdates);
-        TableUpdates updates = (TableUpdates) result;
+        updates = (TableUpdates) result;
         TableUpdate<GenericTableSchema> update = updates.getUpdate(bridge);
         Row<GenericTableSchema> aNew = update.getNew();
         if (filter) {
@@ -166,7 +166,7 @@ public class OvsdbClientTestIT extends OvsdbTestBase {
 
         final List<Object> results = Lists.newArrayList();
 
-        MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
+        TableUpdates updates = ovs.monitor(dbSchema, monitorRequests, new MonitorCallBack() {
             @Override
             public void update(TableUpdates result, DatabaseSchema dbSchema) {
                 results.add(result);
@@ -179,16 +179,17 @@ public class OvsdbClientTestIT extends OvsdbTestBase {
             }
         });
 
+        if (updates != null) results.add(updates);
         for (int i = 0; i < 3 ; i++) { //wait 5 seconds to get a result
-            System.out.println("waiting on monitor response for open_vSwtich Table...");
             if (!results.isEmpty()) break;
+            System.out.println("waiting on monitor response for open_vSwtich Table...");
             Thread.sleep(1000);
         }
 
         Assert.assertTrue(!results.isEmpty());
         Object result = results.get(0); // open_vSwitch table has just 1 row.
         Assert.assertTrue(result instanceof TableUpdates);
-        TableUpdates updates = (TableUpdates) result;
+        updates = (TableUpdates) result;
         TableUpdate<GenericTableSchema> update = updates.getUpdate(ovsTable);
         Assert.assertNotNull(update.getUuid());
         return update.getUuid();
index 8c391ce5622f78f04aa56b6df1f6cc857e37f956..0bd775517a5ade31e56186e1e5cb6489e88da832 100644 (file)
@@ -15,7 +15,6 @@ import io.netty.channel.ChannelHandler;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +31,6 @@ import org.opendaylight.controller.sal.utils.ServiceHelper;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.ovsdb.lib.MonitorCallBack;
-import org.opendaylight.ovsdb.lib.MonitorHandle;
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.opendaylight.ovsdb.lib.OvsdbConnection;
 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
@@ -44,12 +42,10 @@ import org.opendaylight.ovsdb.lib.message.TableUpdates;
 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
 import org.opendaylight.ovsdb.lib.schema.TableSchema;
-import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
 
 
 /**
@@ -254,16 +250,6 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         props.add(l4Port);
         inventoryServiceInternal.addNode(connection.getNode(), props);
 
-        OpenVSwitch openVSwitch = connection.getClient().getTypedRowWrapper(OpenVSwitch.class, null);
-        List<String> dbNames = Arrays.asList(openVSwitch.getSchema().getName());
-        ListenableFuture<DatabaseSchema> dbSchemaF = client.getSchema(OvsVswitchdSchemaConstants.DATABASE_NAME);
-        DatabaseSchema databaseSchema = dbSchemaF.get();
-        this.monitorTables(connection.getNode());
-        inventoryServiceInternal.notifyNodeAdded(connection.getNode());
-    }
-
-    public void monitorTables(Node node) throws ExecutionException, InterruptedException, IOException {
-        OvsdbClient client = ovsdbConnections.get(node.getID()).getClient();
         List<String> databases = client.getDatabases().get();
         if (databases == null) {
             logger.error("Unable to get Databases for the ovsdb connection : {}", client.getConnectionInfo());
@@ -271,22 +257,31 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         }
         for (String database : databases) {
             DatabaseSchema dbSchema = client.getSchema(database).get();
-            if (dbSchema == null) {
-                logger.error("Unable to get Database Schema for the ovsdb connection : {} , database : {}", client.getConnectionInfo(), database);
-                return;
-            }
-            Set<String> tables = dbSchema.getTables();
-            if (tables == null) {
-                logger.warn("Database {} without any tables. Strange !", database);
-                continue;
-            }
-            List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
-            for (String tableName : tables) {
-                GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
-                monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
-            }
-            MonitorHandle monitor = client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
+            TableUpdates updates = this.monitorTables(connection.getNode(), dbSchema);
+            inventoryServiceInternal.processTableUpdates(connection.getNode(), dbSchema.getName(), updates);
+        }
+        inventoryServiceInternal.notifyNodeAdded(connection.getNode());
+    }
+
+    public TableUpdates monitorTables(Node node, DatabaseSchema dbSchema) throws ExecutionException, InterruptedException, IOException {
+        String identifier = (String) node.getID();
+        Connection connection = ovsdbConnections.get(identifier);
+        OvsdbClient client = connection.getClient();
+        if (dbSchema == null) {
+            logger.error("Unable to get Database Schema for the ovsdb connection : {} , database : {}", client.getConnectionInfo(), dbSchema.getName());
+            return null;
+        }
+        Set<String> tables = dbSchema.getTables();
+        if (tables == null) {
+            logger.warn("Database {} without any tables. Strange !", dbSchema.getName());
+            return null;
+        }
+        List<MonitorRequest<GenericTableSchema>> monitorRequests = Lists.newArrayList();
+        for (String tableName : tables) {
+            GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
+            monitorRequests.add(this.getAllColumnsMonitorRequest(tableSchema));
         }
+        return client.monitor(dbSchema, monitorRequests, new UpdateMonitor(node));
     }
 
     /**
index 975b32e87e184789695eb2c2325d97dc557dc1b3..ee7fde930d747c31baa984cd5a1e0d4811988074 100644 (file)
@@ -23,7 +23,6 @@ import junit.framework.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.ovsdb.lib.MonitorCallBack;
-import org.opendaylight.ovsdb.lib.MonitorHandle;
 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
@@ -60,8 +59,9 @@ public class MonitorTestCases extends OpenVswitchSchemaTestBase {
         monitorRequests.add(this.getAllColumnsMonitorRequest(Bridge.class));
         monitorRequests.add(this.getAllColumnsMonitorRequest(OpenVSwitch.class));
 
-        MonitorHandle monitor = ovs.monitor(dbSchema, monitorRequests, new UpdateMonitor());
-        Assert.assertNotNull(monitor);
+        TableUpdates updates = ovs.monitor(dbSchema, monitorRequests, new UpdateMonitor());
+        Assert.assertNotNull(updates);
+        this.updateTableCache(updates);
     }
 
     /**
@@ -104,22 +104,26 @@ public class MonitorTestCases extends OpenVswitchSchemaTestBase {
 
     }
 
+    private void updateTableCache(TableUpdates updates) {
+        for (String tableName : updates.getUpdates().keySet()) {
+            Map<UUID, Row> tUpdate = OpenVswitchSchemaSuiteIT.getTableCache().get(tableName);
+            TableUpdate update = updates.getUpdates().get(tableName);
+            if (update.getNew() != null) {
+                if (tUpdate == null) {
+                    tUpdate = new HashMap<>();
+                    OpenVswitchSchemaSuiteIT.getTableCache().put(tableName, tUpdate);
+                }
+                tUpdate.put(update.getUuid(), update.getNew());
+            } else {
+                tUpdate.remove(update.getUuid());
+            }
+        }
+    }
+
     private class UpdateMonitor implements MonitorCallBack {
         @Override
         public void update(TableUpdates result, DatabaseSchema dbSchema) {
-            for (String tableName : result.getUpdates().keySet()) {
-                Map<UUID, Row> tUpdate = OpenVswitchSchemaSuiteIT.getTableCache().get(tableName);
-                TableUpdate update = result.getUpdates().get(tableName);
-                if (update.getNew() != null) {
-                    if (tUpdate == null) {
-                        tUpdate = new HashMap<>();
-                        OpenVswitchSchemaSuiteIT.getTableCache().put(tableName, tUpdate);
-                    }
-                    tUpdate.put(update.getUuid(), update.getNew());
-                } else {
-                    tUpdate.remove(update.getUuid());
-                }
-            }
+            updateTableCache(result);
         }
 
         @Override