Switch to command pattern and using transaction chains 75/15675/1
authorEd Warnicke <eaw@cisco.com>
Mon, 23 Feb 2015 21:55:49 +0000 (14:55 -0700)
committerEd Warnicke <eaw@cisco.com>
Tue, 24 Feb 2015 16:34:10 +0000 (09:34 -0700)
This patch does a few things:

1)  Switch to using command pattern with transaction chains
    to provider robust writing to the operational store
2)  Moved over the existing operational writing to using the command
    pattern
3)  Removed the bridges from the ovsdb node as they need to be
    separate topological nodes.

Change-Id: I9796d90a7cee483271134dc5abbc6097df4ce567
Signed-off-by: Ed Warnicke <eaw@cisco.com>
19 files changed:
southbound/southbound-api/src/main/yang/ovsdb.yang
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbBridgeOperDataCollector.java [deleted file]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbClientKey.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataCollectionOperation.java [deleted file]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbMonitorCallback.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbOperationalDataCollectionManager.java [deleted file]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbOperationalDataCollectionManagerImpl.java [deleted file]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/SouthboundMapper.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/SouthboundProvider.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbBridgeUpdateCommand.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbNodeCreateCommand.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbNodeRemoveCommand.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbOperationalCommandAggregator.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionCommand.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvoker.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionUtils.java [new file with mode: 0644]

index 227c9f7504e53069d7ddc989c0f9a5ac8e6949ad..31d1e4f5705a84795fefdd435ccdc6fb036a461b 100644 (file)
@@ -46,17 +46,6 @@ module ovsdb {
         leaf-list managed-node-entry {
             type ovsdb-bridge-ref;
         }
-        // New fields below
-        list bridge {
-            key "bridge-uuid";
-            leaf bridge-uuid {
-                description "The unique identifier of the bridge";
-                type yang:uuid;
-            }
-            leaf bridge-ref {
-                type ovsdb-bridge-ref;
-            }
-        }
     }
 
     augment "/topo:network-topology/topo:topology/topo:node" {
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbBridgeOperDataCollector.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbBridgeOperDataCollector.java
deleted file mode 100644 (file)
index f71214b..0000000
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.ovsdb.southbound;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.ovsdb.lib.OvsdbClient;
-import org.opendaylight.ovsdb.lib.notation.Row;
-import org.opendaylight.ovsdb.lib.notation.UUID;
-import org.opendaylight.ovsdb.lib.operations.OperationResult;
-import org.opendaylight.ovsdb.lib.operations.Operations;
-import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
-import org.opendaylight.ovsdb.lib.schema.ColumnSchema;
-import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
-import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
-import org.opendaylight.ovsdb.lib.schema.TableSchema;
-import org.opendaylight.ovsdb.schema.openvswitch.Bridge;
-import org.opendaylight.ovsdb.southbound.OvsdbSchemaContants.OVSDBSCHEMATABLES;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentation;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentationBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentationBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeRef;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
-*
-* @author Anil Vishnoi (avishnoi@brocade.com)
-*
-*/
-
-public class OvsdbBridgeOperDataCollector extends OvsdbDataCollectionOperation {
-
-    private static final Logger LOG = LoggerFactory.getLogger(OvsdbBridgeOperDataCollector.class);
-
-    public OvsdbBridgeOperDataCollector(final OperationType operType,
-            final OvsdbClient ovsdbClient, final DataBroker db) {
-        super(operType, ovsdbClient, db);
-    }
-
-    @Override
-    public void fetchAndStoreOperData(final OvsdbClient ovsdbClient, final DataBroker db) {
-        DatabaseSchema dbSchema = ovsdbClient.getDatabaseSchema(OvsdbSchemaContants.databaseName);
-        TransactionBuilder transactionBuilder = ovsdbClient.transactBuilder(dbSchema);
-        TableSchema<GenericTableSchema> bridgeTableSchema = dbSchema.table(OVSDBSCHEMATABLES.BRIDGE.getTableName(), GenericTableSchema.class);
-
-        ColumnSchema<GenericTableSchema, String> name = bridgeTableSchema.column("name", String.class);
-        ColumnSchema<GenericTableSchema, UUID> _uuid = bridgeTableSchema.column("_uuid", UUID.class);
-
-        transactionBuilder.add(Operations.op.select(bridgeTableSchema)
-                .column(name)
-                .column(_uuid));
-        ListenableFuture<List<OperationResult>> results = transactionBuilder.execute();
-
-        Futures.addCallback(results, new FutureCallback<List<OperationResult>>(){
-
-            @Override
-            public void onFailure(Throwable arg0) {
-                LOG.error("Read operation failure while reading {} database from ovsdb node "
-                        + "running on  {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
-            }
-
-            @Override
-            public void onSuccess(List<OperationResult> arg0) {
-                LOG.info("Successfully read operational data from {} database of ovsdb node "
-                        + "running on {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
-                for(OperationResult result : arg0){
-                    if(result.getError() != null){
-                        LOG.error("Error occured while fetching bridge operational data from ovsdb database : {}",result.getDetails());
-                    }else{
-                        for(Row<GenericTableSchema> row : result.getRows()){
-                            ReadWriteTransaction rwTransaction = db.newReadWriteTransaction();
-
-                            LOG.info("Row data {}",row.toString());
-                            Bridge bridge = ovsdbClient.getTypedRowWrapper(Bridge.class, row);
-                            final InstanceIdentifier<Node> nodePath = SouthboundMapper.createInstanceIdentifier(ovsdbClient);
-                            Optional<Node> node = Optional.absent();
-                            try{
-                                node = rwTransaction.read(LogicalDatastoreType.OPERATIONAL, nodePath).checkedGet();
-                            }catch (final ReadFailedException e) {
-                                LOG.info("Read Operational/DS for Node fail! {}", nodePath, e);
-                            }
-                            if(node.isPresent()){
-                                LOG.info("Node {} is present",node);
-                                NodeBuilder managedNodeBuilder = new NodeBuilder();
-                                NodeId manageNodeId = SouthboundMapper.createManagedNodeId(ovsdbClient.getConnectionInfo(), bridge.getUuid());
-                                managedNodeBuilder.setNodeId(manageNodeId);
-
-                                OvsdbManagedNodeAugmentationBuilder ovsdbManagedNodeBuilder = new OvsdbManagedNodeAugmentationBuilder();
-                                ovsdbManagedNodeBuilder.setBridgeName(bridge.getName());
-                                ovsdbManagedNodeBuilder.setBridgeUuid(new Uuid(bridge.getUuid().toString()));
-                                ovsdbManagedNodeBuilder.setManagedBy(new OvsdbNodeRef(nodePath));
-                                managedNodeBuilder.addAugmentation(OvsdbManagedNodeAugmentation.class, ovsdbManagedNodeBuilder.build());
-
-                                InstanceIdentifier<Node> managedNodePath = InstanceIdentifier
-                                        .create(NetworkTopology.class)
-                                        .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
-                                        .child(Node.class,new NodeKey(manageNodeId));
-
-                                LOG.info("Store managed node augmentation data {}",ovsdbManagedNodeBuilder.toString());
-                                rwTransaction.put(LogicalDatastoreType.OPERATIONAL, managedNodePath, managedNodeBuilder.build());
-
-                                //Update node with managed node reference
-                                NodeBuilder nodeBuilder = new NodeBuilder();
-                                nodeBuilder.setNodeId(SouthboundMapper.createNodeId(ovsdbClient.getConnectionInfo()));
-
-                                OvsdbNodeAugmentationBuilder ovsdbNodeBuilder = new OvsdbNodeAugmentationBuilder();
-                                List<OvsdbBridgeRef> managedNodes = new ArrayList<OvsdbBridgeRef>();
-                                managedNodes.add(new OvsdbBridgeRef(managedNodePath));
-                                ovsdbNodeBuilder.setManagedNodeEntry(managedNodes);
-
-                                nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, ovsdbNodeBuilder.build());
-
-                                LOG.info("Update node with managed node ref {}",ovsdbNodeBuilder.toString());
-                                rwTransaction.merge(LogicalDatastoreType.OPERATIONAL, nodePath, nodeBuilder.build());
-
-                                Futures.addCallback(rwTransaction.submit(),new FutureCallback<Void>(){
-
-                                    @Override
-                                    public void onFailure(Throwable arg0) {
-                                        LOG.info("Write to Operational Data Store for managed node {} failed with exception {}!",nodePath,arg0 );
-                                    }
-
-                                    @Override
-                                    public void onSuccess(Void arg0) {
-                                        LOG.info("Managed node's operational data stored successfully to md-sal operational data store.");
-                                    }
-
-                                });
-                            }else{
-                                LOG.info("Node is not present in the operational data store, skipping bridge operational data write to data store");
-                            }
-
-                        }
-                    }
-                }
-            }
-
-        });
-
-    }
-
-    @Override
-    public void fetchAndUpdateOperData(final OvsdbClient ovsdbClient, final DataBroker db) {
-        final ReadWriteTransaction rwTransaction = db.newReadWriteTransaction();
-        DatabaseSchema dbSchema = ovsdbClient.getDatabaseSchema(OvsdbSchemaContants.databaseName);
-        TransactionBuilder transactionBuilder = ovsdbClient.transactBuilder(dbSchema);
-        TableSchema<GenericTableSchema> childTableSchema = dbSchema.table(OVSDBSCHEMATABLES.BRIDGE.getTableName(), GenericTableSchema.class);
-        transactionBuilder.add(Operations.op.select(childTableSchema)).build();
-        ListenableFuture<List<OperationResult>> results = transactionBuilder.execute();
-
-        Futures.addCallback(results, new FutureCallback<List<OperationResult>>(){
-
-            @Override
-            public void onFailure(Throwable arg0) {
-                LOG.error("Read operation failure while reading {} database from ovsdb node "
-                        + "running on  {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
-            }
-
-            @Override
-            public void onSuccess(List<OperationResult> arg0) {
-                LOG.info("Successfully read operational data from {} database of ovsdb node "
-                        + "running on {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
-                for(OperationResult result : arg0){
-                    if(result.getError() != null){
-                        LOG.error("Error occured while fetching bridge operational data from ovsdb database : {}",result.getDetails());
-                    }else{
-                        for(Row<GenericTableSchema> row : result.getRows()){
-                            Bridge bridge = ovsdbClient.getTypedRowWrapper(Bridge.class, row);
-                            final InstanceIdentifier<Node> nodeIdent = SouthboundMapper.createInstanceIdentifier(ovsdbClient);
-                            Optional<Node> node = Optional.absent();
-
-                            try{
-                                node = rwTransaction.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet();
-                            }catch (final ReadFailedException e) {
-                                LOG.info("Read Operational/DS for Node fail! {}", nodeIdent, e);
-                            }
-                            if(node.isPresent()){
-                                LOG.info("Node {} is present",node);
-                                NodeBuilder managedNodeBuilder = new NodeBuilder();
-                                NodeId manageNodeId = SouthboundMapper.createManagedNodeId(ovsdbClient.getConnectionInfo(), bridge.getUuid());
-                                managedNodeBuilder.setNodeId(manageNodeId);
-
-                                OvsdbManagedNodeAugmentationBuilder ovsdbManagedNodeBuilder = new OvsdbManagedNodeAugmentationBuilder();
-                                ovsdbManagedNodeBuilder.setBridgeName(bridge.getName());
-                                ovsdbManagedNodeBuilder.setBridgeUuid(new Uuid(bridge.getUuid().toString()));
-                                ovsdbManagedNodeBuilder.setManagedBy(new OvsdbNodeRef(nodeIdent));
-                                managedNodeBuilder.addAugmentation(OvsdbManagedNodeAugmentation.class, ovsdbManagedNodeBuilder.build());
-
-                                InstanceIdentifier<Node> managedNodePath = InstanceIdentifier
-                                        .create(NetworkTopology.class)
-                                        .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
-                                        .child(Node.class,new NodeKey(manageNodeId));
-
-                                LOG.info("Store managed node augmentation data {}",ovsdbManagedNodeBuilder.toString());
-                                rwTransaction.merge(LogicalDatastoreType.OPERATIONAL, managedNodePath, managedNodeBuilder.build());
-                                Futures.addCallback(rwTransaction.submit(),new FutureCallback<Void>(){
-
-                                    @Override
-                                    public void onFailure(Throwable arg0) {
-                                        LOG.info("Write to Operational Data Store for managed node {} failed with exception {}!",nodeIdent,arg0 );
-                                    }
-
-                                    @Override
-                                    public void onSuccess(Void arg0) {
-                                        LOG.info("Managed node operational data stored successfully to md-sal operational data store.");
-                                    }
-
-                                });
-                            }
-
-                        }
-                    }
-                }
-            }
-
-        });
-
-    }
-
-    @Override
-    public void deleteOperData(OvsdbClient ovsdbClient, DataBroker db) {
-        /* TODO Operational data of managed node need to be deleted in following two scenario
-         * 1) When node is disconnected
-         * 2) When any specific managed node will be deleted.
-         * In case of 1), all the managed node data will get deleted with node data.
-         * In case of 2), whenever user delete any bridge, ovsdb south bound plugin
-         * should get the notification and it should update the data using fetchAndUpdateOperData
-         */
-
-    }
-}
\ No newline at end of file
index 3056369b09908d4154e1104e704de656ae7535dc..46e1889f0b478989b5a807afd6368bd7d1f83aa8 100644 (file)
@@ -77,7 +77,7 @@ public class OvsdbClientKey {
         return true;
     }
 
-    InstanceIdentifier<Node> toInstanceIndentifier() {
+    public InstanceIdentifier<Node> toInstanceIndentifier() {
         return SouthboundMapper.createInstanceIdentifier(ipaddress,port);
     }
 }
index 5d7df98b990be61af6e710b0ce9f87b106d677b8..dd79aecfdc60dead7a27fcd2cb770c82392567f3 100644 (file)
@@ -30,6 +30,8 @@ import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
 import org.opendaylight.ovsdb.lib.schema.TableSchema;
 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeCreateCommand;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,12 +42,19 @@ public class OvsdbConnectionInstance implements OvsdbClient {
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionInstance.class);
     private OvsdbClient client;
     private OvsdbClientKey key;
+    private TransactionInvoker txInvoker;
     private MonitorCallBack callback;
 
-    OvsdbConnectionInstance(OvsdbClientKey key,OvsdbClient client) {
+    OvsdbConnectionInstance(OvsdbClientKey key,OvsdbClient client,TransactionInvoker txInvoker) {
         this.key = key;
         this.client = client;
-        this.callback = new OvsdbMonitorCallback();
+        this.txInvoker = txInvoker;
+        txInvoker.invoke(new OvsdbNodeCreateCommand(key, null,null));
+        registerCallBack();
+    }
+
+    private void registerCallBack() {
+        this.callback = new OvsdbMonitorCallback(key,txInvoker);
         try {
             List<String> databases = getDatabases().get();
             if(databases != null) {
index 55ebdd1b31b3e6a050ea855d2ac9d5bfd4b7b0da..d41c34aabb9929adba570d9be85ed17cdd394a7f 100644 (file)
@@ -14,13 +14,13 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
 import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
-import org.opendaylight.ovsdb.southbound.OvsdbDataCollectionOperation.OperationType;
+import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.overlay.rev150105.IpPortLocator;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentation;
@@ -32,20 +32,17 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 
 public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable {
     Map<OvsdbClientKey,OvsdbConnectionInstance> clients = new ConcurrentHashMap<OvsdbClientKey,OvsdbConnectionInstance>();
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
 
-    DataBroker db;
+    private DataBroker db;
+    private TransactionInvoker txInvoker;
 
-    private OvsdbOperationalDataCollectionManager ovsdbOperDataCollectionManager;
-
-    public OvsdbConnectionManager(DataBroker db) {
+    public OvsdbConnectionManager(DataBroker db,TransactionInvoker txInvoker) {
         this.db = db;
-        ovsdbOperDataCollectionManager = new OvsdbOperationalDataCollectionManagerImpl();
+        this.txInvoker = txInvoker;
     }
 
     @Override
@@ -53,27 +50,8 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
                 externalClient.getConnectionInfo().getRemotePort());
         OvsdbClientKey key = new OvsdbClientKey(externalClient);
-        OvsdbConnectionInstance client = new OvsdbConnectionInstance(key,externalClient);
+        OvsdbConnectionInstance client = new OvsdbConnectionInstance(key,externalClient,txInvoker);
         clients.put(key, client);
-        WriteTransaction transaction = db.newWriteOnlyTransaction();
-        transaction.put(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier(),
-                SouthboundMapper.createNode(client));
-
-        // Hook it to bridge operational data collector
-        Futures.addCallback(transaction.submit(), new FutureCallback<Void>(){
-
-            @Override
-            public void onFailure(Throwable arg0) {
-                LOG.info("Transaction failed while writing Node data to operational data store");
-            }
-
-            @Override
-            public void onSuccess(Void arg0) {
-                LOG.info("Node data is stored successfully to operational data store");
-                ovsdbOperDataCollectionManager.enqueue(new OvsdbBridgeOperDataCollector(OperationType.FETCH_OVSDB_OPER_DATA,externalClient,db));
-            }
-
-        });
     }
 
     @Override
@@ -81,10 +59,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
         LOG.info("OVSDB Disconnect from {}:{}",client.getConnectionInfo().getRemoteAddress(),
                 client.getConnectionInfo().getRemotePort());
         OvsdbClientKey key = new OvsdbClientKey(client);
-        WriteTransaction transaction = db.newWriteOnlyTransaction();
-        transaction.delete(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier());
-        // TODO - Check the future and retry if needed
-        transaction.submit();
+        txInvoker.invoke(new OvsdbNodeRemoveCommand(key,null,null));
         clients.remove(key);
     }
 
@@ -93,9 +68,6 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
         // TODO use transaction chains to handle ordering issues between disconnected and connected when writing to the operational store
         InetAddress ip = SouthboundMapper.createInetAddress(ovsdbNode.getIp());
         OvsdbClient client = OvsdbConnectionService.getService().connect(ip, ovsdbNode.getPort().getValue().intValue());
-        OvsdbClientKey key = new OvsdbClientKey(client);
-        OvsdbConnectionInstance instance = new OvsdbConnectionInstance(key,client);
-        clients.put(key, instance);
         connected(client); // For connections from the controller to the ovs instance, the library doesn't call this method for us
         return client;
     }
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataCollectionOperation.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataCollectionOperation.java
deleted file mode 100644 (file)
index 7836651..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.ovsdb.southbound;
-
-import java.util.concurrent.Callable;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.ovsdb.lib.OvsdbClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Main responsibility of the class is to define the interface that various
- * config operataion can extend to. It define common interface for data store
- * operations.
- *
- * @author Anil Vishnoi (avishnoi@brocade.com)
- *
- */
-public abstract class OvsdbDataCollectionOperation implements Callable<Object>{
-    private static final Logger LOG = LoggerFactory.getLogger(OvsdbDataCollectionOperation.class);
-
-    public enum OperationType {
-        FETCH_OVSDB_OPER_DATA,
-        REFERESH_OVSDB_OPER_DATA,
-        DELETE_OVSDB_OPER_DATA
-    }
-
-    private OperationType operType = OperationType.FETCH_OVSDB_OPER_DATA;
-    private OvsdbClient ovsdbClient = null;
-    private DataBroker db;
-
-
-    public OvsdbDataCollectionOperation(final OperationType operType, final OvsdbClient ovsdbClient, final DataBroker db){
-        Preconditions.checkNotNull(ovsdbClient);
-        this.ovsdbClient = ovsdbClient;
-        this.operType = operType;
-        this.db = db;
-    }
-    @Override
-    public Object call(){
-        if( operType == OperationType.FETCH_OVSDB_OPER_DATA ){
-            LOG.debug("Fetch ovsdb operational data from {} and store it in the md-sal data store.",this.ovsdbClient);
-            this.fetchAndStoreOperData(ovsdbClient, db);
-        }else if( operType == OperationType.REFERESH_OVSDB_OPER_DATA ){
-            LOG.debug("Fetch ovsdb operational data from {} and update the existing data in the md-sal data store.",this.ovsdbClient);
-            this.fetchAndUpdateOperData(ovsdbClient, db);
-        }else if( operType == OperationType.DELETE_OVSDB_OPER_DATA ){
-            LOG.debug("Delete operational data fetched from {}",this.ovsdbClient);
-            this.deleteOperData(ovsdbClient, db);
-        }
-        return operType;
-    }
-
-    public abstract void fetchAndStoreOperData(final OvsdbClient ovsdbClient, final DataBroker db);
-
-    public abstract void fetchAndUpdateOperData(final OvsdbClient ovsdbClient, final DataBroker db);
-
-    public abstract void deleteOperData( final OvsdbClient ovsdbClient, final DataBroker db);
-}
index ea50e9e73f587c7307bbcb0ce70f5da7115c020b..c2da87119b25a0f26210eec4ef138003560fe9e9 100644 (file)
@@ -8,32 +8,28 @@
 package org.opendaylight.ovsdb.southbound;
 
 import org.opendaylight.ovsdb.lib.MonitorCallBack;
-import org.opendaylight.ovsdb.lib.message.TableUpdate;
 import org.opendaylight.ovsdb.lib.message.TableUpdates;
-import org.opendaylight.ovsdb.lib.notation.Row;
-import org.opendaylight.ovsdb.lib.notation.UUID;
-import org.opendaylight.ovsdb.lib.schema.ColumnSchema;
 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
-import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
+import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbOperationalCommandAggregator;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class OvsdbMonitorCallback implements MonitorCallBack {
 
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbMonitorCallback.class);
+    private TransactionInvoker txInvoker;
+    private OvsdbClientKey key;
+
+    OvsdbMonitorCallback(OvsdbClientKey key,TransactionInvoker txInvoker) {
+        this.txInvoker = txInvoker;
+        this.key = key;
+    }
+
     @Override
     public void update(TableUpdates result, DatabaseSchema dbSchema) {
         LOG.debug("result: {} dbSchema: {}",result,dbSchema);
-        GenericTableSchema bridge = dbSchema.table("Bridge", GenericTableSchema.class);
-        ColumnSchema<GenericTableSchema, String> name = bridge.column("name", String.class);
-        TableUpdate<GenericTableSchema> update = result.getUpdate(bridge);
-        if (update != null) {
-            for (UUID uuid : update.getRows().keySet()) {
-                Row<GenericTableSchema> schemaRow = update.getNew(uuid);
-                String bridgeName = schemaRow.getColumn(name).getData();
-                LOG.trace("Bridge name: {}", bridgeName);
-            }
-        }
+        txInvoker.invoke(new OvsdbOperationalCommandAggregator(key, result, dbSchema));
     }
 
     @Override
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbOperationalDataCollectionManager.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbOperationalDataCollectionManager.java
deleted file mode 100644 (file)
index f42a9f6..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.ovsdb.southbound;
-
-/**
- *
- * @author Anil Vishnoi (avishnoi@brocade.com)
- *
- */
-public interface OvsdbOperationalDataCollectionManager {
-
-       public void enqueue(final OvsdbDataCollectionOperation ovsdbDataStoreOper);
-}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbOperationalDataCollectionManagerImpl.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbOperationalDataCollectionManagerImpl.java
deleted file mode 100644 (file)
index d604755..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.ovsdb.southbound;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class OvsdbOperationalDataCollectionManagerImpl implements
-        OvsdbOperationalDataCollectionManager {
-    private static final Logger LOG = LoggerFactory.getLogger(OvsdbOperationalDataCollectionManagerImpl.class);
-
-    private final ExecutorService statDataStoreOperationServ;
-
-    public OvsdbOperationalDataCollectionManagerImpl(){
-        /*
-         * Using single thread executor as of now. If going forward we figure out
-         * that there is no order dependency between ovsdb operational data, we
-         * can move to multi thread executor service.
-         */
-        ThreadFactory threadFact;
-        threadFact = new ThreadFactoryBuilder().setNameFormat("ovsdb-oper-data-collection-manager-%d").build();
-        statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
-    }
-
-    @Override
-    public void enqueue(OvsdbDataCollectionOperation ovsdbDataStoreOper) {
-        LOG.info("Enqueued task {} for execution",ovsdbDataStoreOper);
-        statDataStoreOperationServ.submit(ovsdbDataStoreOper);
-    }
-}
\ No newline at end of file
index e244722f644c92a5e950fb8ba31216332c76e3fd..0cb54f4560a7e4f0e061754750339ce549fcd86c 100644 (file)
@@ -42,11 +42,21 @@ public class SouthboundMapper {
         nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, createOvsdbAugmentation(client));
         return nodeBuilder.build();
     }
+    public static Node createNode(OvsdbClientKey key) {
+        NodeBuilder nodeBuilder = new NodeBuilder();
+        nodeBuilder.setNodeId(createNodeId(key.getIp(),key.getPort()));
+        nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, createOvsdbAugmentation(key));
+        return nodeBuilder.build();
+    }
 
     public static OvsdbNodeAugmentation createOvsdbAugmentation(OvsdbClient client) {
+        return createOvsdbAugmentation(new OvsdbClientKey(client));
+    }
+
+    public static OvsdbNodeAugmentation createOvsdbAugmentation(OvsdbClientKey key) {
         OvsdbNodeAugmentationBuilder ovsdbNodeBuilder = new OvsdbNodeAugmentationBuilder();
-        ovsdbNodeBuilder.setIp(createIpAddress(client.getConnectionInfo().getRemoteAddress()));
-        ovsdbNodeBuilder.setPort(new PortNumber(client.getConnectionInfo().getRemotePort()));
+        ovsdbNodeBuilder.setIp(key.getIp());
+        ovsdbNodeBuilder.setPort(key.getPort());
         return ovsdbNodeBuilder.build();
     }
 
@@ -93,9 +103,18 @@ public class SouthboundMapper {
     }
 
     public static NodeId createManagedNodeId(OvsdbConnectionInfo connectionInfo, UUID managedNodeId) {
-        return new NodeId(createNodeId(createIpAddress(connectionInfo.getRemoteAddress()),
-                new PortNumber(connectionInfo.getRemotePort())).getValue()
-                + "/"+SouthboundConstants.BRIDGE_URI_PREFIX+":"+managedNodeId.toString());
+        return createManagedNodeId(createIpAddress(connectionInfo.getRemoteAddress()),
+                new PortNumber(connectionInfo.getRemotePort()),
+                managedNodeId);
+    }
+
+    public static NodeId createManagedNodeId(OvsdbClientKey key, UUID managedModeId) {
+        return createManagedNodeId(key.getIp(),key.getPort(),managedModeId);
+    }
+
+    public static NodeId createManagedNodeId(IpAddress ip, PortNumber port, UUID managedModeId) {
+        return new NodeId(createNodeId(ip,port)
+                + "/"+SouthboundConstants.BRIDGE_URI_PREFIX+":"+managedModeId.toString());
     }
 
     public static NodeId createNodeId(IpAddress ip, PortNumber port) {
index c6a55798f26d76cf6654a065f17dad855336fe3f..88713a4f22066cf1ea1b6080ed9fd49eb4fc42a4 100644 (file)
@@ -15,6 +15,8 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo
 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
 import org.opendaylight.ovsdb.lib.OvsdbConnection;
 import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvokerImpl;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -33,14 +35,15 @@ public class SouthboundProvider implements BindingAwareProvider, AutoCloseable {
     private DataBroker db;
     private OvsdbConnectionManager cm;
     private OvsdbNodeDataChangeListener ovsdbNodeListener;
-
+    private TransactionInvoker txInvoker;
 
 
     @Override
     public void onSessionInitiated(ProviderContext session) {
         LOG.info("SouthboundProvider Session Initiated");
         db = session.getSALService(DataBroker.class);
-        cm = new OvsdbConnectionManager(db);
+        this.txInvoker = new TransactionInvokerImpl(db);
+        cm = new OvsdbConnectionManager(db,txInvoker);
         ovsdbNodeListener = new OvsdbNodeDataChangeListener(db, cm);
         initializeOvsdbTopology(LogicalDatastoreType.OPERATIONAL);
         initializeOvsdbTopology(LogicalDatastoreType.CONFIGURATION);
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbBridgeUpdateCommand.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbBridgeUpdateCommand.java
new file mode 100644 (file)
index 0000000..6afe67b
--- /dev/null
@@ -0,0 +1,100 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.schema.openvswitch.Bridge;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+import org.opendaylight.ovsdb.southbound.SouthboundConstants;
+import org.opendaylight.ovsdb.southbound.SouthboundMapper;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentationBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentationBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeRef;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+public class OvsdbBridgeUpdateCommand implements TransactionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(OvsdbBridgeUpdateCommand.class);
+
+    private TableUpdates updates;
+    private DatabaseSchema dbSchema;
+
+    private OvsdbClientKey key;
+
+    public OvsdbBridgeUpdateCommand(OvsdbClientKey key,TableUpdates updates, DatabaseSchema dbSchema) {
+        this.updates = updates;
+        this.dbSchema = dbSchema;
+        this.key = key;
+    }
+    @Override
+    public void execute(ReadWriteTransaction transaction) {
+        List<TypedBaseTable<?>> updatedRows = TransactionUtils.extractRowsUpdated(Bridge.class, updates, dbSchema);
+        for(TypedBaseTable<?> updatedRow : updatedRows) {
+            if(updatedRow instanceof Bridge) {
+                Bridge bridge = (Bridge)updatedRow;
+                final InstanceIdentifier<Node> nodePath = key.toInstanceIndentifier();
+                Optional<Node> node = Optional.absent();
+                try{
+                    node = transaction.read(LogicalDatastoreType.OPERATIONAL, nodePath).checkedGet();
+                }catch (final ReadFailedException e) {
+                    LOG.debug("Read Operational/DS for Node fail! {}", nodePath, e);
+                }
+                if(node.isPresent()){
+                    LOG.info("Node {} is present",node);
+                    NodeBuilder managedNodeBuilder = new NodeBuilder();
+                    NodeId manageNodeId = SouthboundMapper.createManagedNodeId(key, bridge.getUuid());
+                    managedNodeBuilder.setNodeId(manageNodeId);
+
+                    OvsdbManagedNodeAugmentationBuilder ovsdbManagedNodeBuilder = new OvsdbManagedNodeAugmentationBuilder();
+                    ovsdbManagedNodeBuilder.setBridgeName(bridge.getName());
+                    ovsdbManagedNodeBuilder.setBridgeUuid(new Uuid(bridge.getUuid().toString()));
+                    ovsdbManagedNodeBuilder.setManagedBy(new OvsdbNodeRef(nodePath));
+                    managedNodeBuilder.addAugmentation(OvsdbManagedNodeAugmentation.class, ovsdbManagedNodeBuilder.build());
+
+                    InstanceIdentifier<Node> managedNodePath = InstanceIdentifier
+                            .create(NetworkTopology.class)
+                            .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
+                            .child(Node.class,new NodeKey(manageNodeId));
+
+                    LOG.debug("Store managed node augmentation data {}",ovsdbManagedNodeBuilder.toString());
+                    transaction.put(LogicalDatastoreType.OPERATIONAL, managedNodePath, managedNodeBuilder.build());
+
+                    //Update node with managed node reference
+                    NodeBuilder nodeBuilder = new NodeBuilder();
+                    nodeBuilder.setNodeId(SouthboundMapper.createNodeId(key.getIp(),key.getPort()));
+
+                    OvsdbNodeAugmentationBuilder ovsdbNodeBuilder = new OvsdbNodeAugmentationBuilder();
+                    List<OvsdbBridgeRef> managedNodes = new ArrayList<OvsdbBridgeRef>();
+                    managedNodes.add(new OvsdbBridgeRef(managedNodePath));
+                    ovsdbNodeBuilder.setManagedNodeEntry(managedNodes);
+
+                    nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, ovsdbNodeBuilder.build());
+
+                    LOG.debug("Update node with managed node ref {}",ovsdbNodeBuilder.toString());
+                    transaction.merge(LogicalDatastoreType.OPERATIONAL, nodePath, nodeBuilder.build());
+
+                }
+            }
+        }
+    }
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbNodeCreateCommand.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbNodeCreateCommand.java
new file mode 100644 (file)
index 0000000..08c36a2
--- /dev/null
@@ -0,0 +1,24 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+import org.opendaylight.ovsdb.southbound.SouthboundMapper;
+
+public class OvsdbNodeCreateCommand implements TransactionCommand {
+
+    private OvsdbClientKey key;
+
+    public OvsdbNodeCreateCommand(OvsdbClientKey key,TableUpdates updates,DatabaseSchema dbSchema) {
+        this.key = key;
+    }
+
+    @Override
+    public void execute(ReadWriteTransaction transaction) {
+        transaction.put(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier(),
+                SouthboundMapper.createNode(key));
+    }
+
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbNodeRemoveCommand.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbNodeRemoveCommand.java
new file mode 100644 (file)
index 0000000..2b1be53
--- /dev/null
@@ -0,0 +1,46 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class OvsdbNodeRemoveCommand implements TransactionCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(OvsdbNodeRemoveCommand.class);
+
+    private OvsdbClientKey key;
+
+    public OvsdbNodeRemoveCommand(OvsdbClientKey key,TableUpdates updates,DatabaseSchema dbSchema) {
+        this.key = key;
+    }
+
+    @Override
+    public void execute(ReadWriteTransaction transaction) {
+        CheckedFuture<Optional<Node>, ReadFailedException> ovsdbNodeFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier());
+        Optional<Node> ovsdbNodeOptional;
+        try {
+            ovsdbNodeOptional = ovsdbNodeFuture.get();
+            if(ovsdbNodeOptional.isPresent()) {
+                Node ovsdbNode = ovsdbNodeOptional.get();
+                OvsdbNodeAugmentation ovsdbNodeAugmentation = ovsdbNode.getAugmentation(OvsdbNodeAugmentation.class);
+                for(OvsdbBridgeRef managedNode: ovsdbNodeAugmentation.getManagedNodeEntry()) {
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, managedNode.getValue());
+                }
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier());
+            }
+        } catch (Exception e) {
+            LOG.warn("Failure to delete ovsdbNode {}",e);
+        }
+    }
+
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbOperationalCommandAggregator.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/OvsdbOperationalCommandAggregator.java
new file mode 100644 (file)
index 0000000..1450ec4
--- /dev/null
@@ -0,0 +1,26 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+
+public class OvsdbOperationalCommandAggregator implements TransactionCommand {
+
+
+    private List<TransactionCommand> commands = new ArrayList<TransactionCommand>();
+
+    public OvsdbOperationalCommandAggregator(OvsdbClientKey key,TableUpdates updates, DatabaseSchema dbSchema) {
+        commands.add(new OvsdbBridgeUpdateCommand(key, updates,  dbSchema));
+    }
+
+    @Override
+    public void execute(ReadWriteTransaction transaction) {
+        for(TransactionCommand command: commands) {
+            command.execute(transaction);
+        }
+    }
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionCommand.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionCommand.java
new file mode 100644 (file)
index 0000000..069414d
--- /dev/null
@@ -0,0 +1,9 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+
+public interface TransactionCommand {
+
+    public void execute(ReadWriteTransaction transaction);
+
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvoker.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvoker.java
new file mode 100644 (file)
index 0000000..892a581
--- /dev/null
@@ -0,0 +1,7 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+public interface TransactionInvoker {
+
+    public void invoke(TransactionCommand command);
+
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java
new file mode 100644 (file)
index 0000000..3e0383f
--- /dev/null
@@ -0,0 +1,148 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
+    private static final int QUEUE_SIZE = 10000;
+    private BindingTransactionChain chain;
+    private DataBroker db;
+    private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<TransactionCommand>(QUEUE_SIZE);
+    private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue = new LinkedBlockingQueue<ReadWriteTransaction>(QUEUE_SIZE);
+    private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<AsyncTransaction<?, ?>>(QUEUE_SIZE);
+    private ExecutorService executor;
+    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
+    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<ReadWriteTransaction>();
+
+    public TransactionInvokerImpl(DataBroker db) {
+        this.db = db;
+        this.chain = db.createTransactionChain(this);
+        ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
+        executor = Executors.newSingleThreadExecutor(threadFact);
+        executor.submit(this);
+    }
+
+    @Override
+    public void invoke(final TransactionCommand command) {
+        // TODO what do we do if queue is full?
+        inputQueue.offer(command);
+    }
+
+    @Override
+    public void onTransactionChainFailed(TransactionChain<?, ?> chain,
+            AsyncTransaction<?, ?> transaction, Throwable cause) {
+        failedTransactionQueue.offer(transaction);
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+        // NO OP
+
+    }
+
+    @Override
+    public void run() {
+        while(true) {
+            forgetSuccessfulTransactions();
+            try {
+                List<TransactionCommand> commands = extractCommands();
+                for(TransactionCommand command: commands) {
+                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                    recordPendingTransaction(command, transaction);
+                    command.execute(transaction);
+                    Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+                        @Override
+                        public void onSuccess(final Void result) {
+                            successfulTransactionQueue.offer(transaction);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t) {
+                            // NOOP - handled by failure of transaction chain
+                        }
+                    });
+                }
+            } catch (Exception e) {
+
+            }
+        }
+    }
+
+    private List<TransactionCommand> extractResubmitCommands() {
+        AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+        List<TransactionCommand> commands = new ArrayList<TransactionCommand>();
+        if(transaction != null) {
+            int index = pendingTransactions.lastIndexOf(transaction);
+            List<ReadWriteTransaction> transactions = pendingTransactions.subList(index, pendingTransactions.size()-1);
+            for(ReadWriteTransaction tx: transactions) {
+                commands.add(transactionToCommand.get(tx));
+            }
+            resetTransactionQueue();
+        }
+        return commands;
+    }
+
+    private void resetTransactionQueue() {
+        chain.close();
+        chain = db.createTransactionChain(this);
+        pendingTransactions = new ArrayList<ReadWriteTransaction>();
+        transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
+        failedTransactionQueue.clear();
+        successfulTransactionQueue.clear();
+    }
+
+    private void recordPendingTransaction(TransactionCommand command,
+            final ReadWriteTransaction transaction) {
+        transactionToCommand.put(transaction, command);
+        pendingTransactions.add(transaction);
+    }
+
+    private List<TransactionCommand> extractCommands() throws InterruptedException {
+        List<TransactionCommand> commands = extractResubmitCommands();
+        commands.addAll(extractCommandsFromQueue());
+        return commands;
+    }
+
+    private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
+        List<TransactionCommand> result = new ArrayList<TransactionCommand>();
+        TransactionCommand command = inputQueue.take();
+        while(command != null) {
+            result.add(command);
+            command = inputQueue.poll();
+        }
+        return result;
+    }
+
+    private void forgetSuccessfulTransactions() {
+        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
+        while(transaction != null) {
+            pendingTransactions.remove(transaction);
+            transactionToCommand.remove(transaction);
+            transaction = successfulTransactionQueue.poll();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        // TODO Auto-generated method stub
+    }
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionUtils.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionUtils.java
new file mode 100644 (file)
index 0000000..36cdbf7
--- /dev/null
@@ -0,0 +1,76 @@
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.opendaylight.ovsdb.lib.message.TableUpdate;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.notation.Row;
+import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
+
+import com.google.common.base.Preconditions;
+
+public class TransactionUtils {
+
+    public static List<TypedBaseTable<?>> extractRowsUpdated(Class<? extends TypedBaseTable<?>> klazz,TableUpdates updates,DatabaseSchema dbSchema) {
+        Preconditions.checkNotNull(klazz);
+        Preconditions.checkNotNull(updates);
+        Preconditions.checkNotNull(dbSchema);
+        List<TypedBaseTable<?>> result = new ArrayList<TypedBaseTable<?>>();
+
+        List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> rowUpdates = extractRowUpdates(klazz,updates,dbSchema);
+        for (TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema> rowUpdate : rowUpdates) {
+            if(rowUpdate != null) {
+                if(rowUpdate.getNew() != null) {
+                    Row<GenericTableSchema> row = rowUpdate.getNew();
+                    result.add(TyperUtils.getTypedRowWrapper(dbSchema,klazz,row));
+                }
+            }
+        }
+        return result;
+    }
+
+    public static List<TypedBaseTable<?>> extractRowsRemoved(Class<? extends TypedBaseTable<?>> klazz,TableUpdates updates,DatabaseSchema dbSchema) {
+        Preconditions.checkNotNull(klazz);
+        Preconditions.checkNotNull(updates);
+        Preconditions.checkNotNull(dbSchema);
+        List<TypedBaseTable<?>> result = new ArrayList<TypedBaseTable<?>>();
+
+        List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> rowUpdates = extractRowUpdates(klazz,updates,dbSchema);
+        for (TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema> rowUpdate : rowUpdates) {
+            if(rowUpdate != null) {
+                if(rowUpdate.getNew() == null && rowUpdate.getOld() != null) {
+                    Row<GenericTableSchema> row = rowUpdate.getOld();
+                    result.add(TyperUtils.getTypedRowWrapper(dbSchema,klazz,row));
+                }
+            }
+        }
+        return result;
+    }
+
+    public static List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> extractRowUpdates(Class<? extends TypedBaseTable<?>> klazz,TableUpdates updates,DatabaseSchema dbSchema) {
+        Preconditions.checkNotNull(klazz);
+        Preconditions.checkNotNull(updates);
+        Preconditions.checkNotNull(dbSchema);
+        List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> result = new ArrayList<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>>();
+        TableUpdate<GenericTableSchema> update = updates.getUpdate(TyperUtils.getTableSchema(dbSchema, klazz));
+        if(update != null) {
+            Map<UUID, TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> rows = update.getRows();
+            if(rows != null) {
+                for(TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema> rowUpdate : rows.values()) {
+                    if(rowUpdate != null) {
+                        result.add(rowUpdate);
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+}
+