leaf-list managed-node-entry {
type ovsdb-bridge-ref;
}
- // New fields below
- list bridge {
- key "bridge-uuid";
- leaf bridge-uuid {
- description "The unique identifier of the bridge";
- type yang:uuid;
- }
- leaf bridge-ref {
- type ovsdb-bridge-ref;
- }
- }
}
augment "/topo:network-topology/topo:topology/topo:node" {
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.ovsdb.southbound;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.ovsdb.lib.OvsdbClient;
-import org.opendaylight.ovsdb.lib.notation.Row;
-import org.opendaylight.ovsdb.lib.notation.UUID;
-import org.opendaylight.ovsdb.lib.operations.OperationResult;
-import org.opendaylight.ovsdb.lib.operations.Operations;
-import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
-import org.opendaylight.ovsdb.lib.schema.ColumnSchema;
-import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
-import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
-import org.opendaylight.ovsdb.lib.schema.TableSchema;
-import org.opendaylight.ovsdb.schema.openvswitch.Bridge;
-import org.opendaylight.ovsdb.southbound.OvsdbSchemaContants.OVSDBSCHEMATABLES;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentation;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentationBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentationBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeRef;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-/**
-*
-* @author Anil Vishnoi (avishnoi@brocade.com)
-*
-*/
-
-public class OvsdbBridgeOperDataCollector extends OvsdbDataCollectionOperation {
-
- private static final Logger LOG = LoggerFactory.getLogger(OvsdbBridgeOperDataCollector.class);
-
- public OvsdbBridgeOperDataCollector(final OperationType operType,
- final OvsdbClient ovsdbClient, final DataBroker db) {
- super(operType, ovsdbClient, db);
- }
-
- @Override
- public void fetchAndStoreOperData(final OvsdbClient ovsdbClient, final DataBroker db) {
- DatabaseSchema dbSchema = ovsdbClient.getDatabaseSchema(OvsdbSchemaContants.databaseName);
- TransactionBuilder transactionBuilder = ovsdbClient.transactBuilder(dbSchema);
- TableSchema<GenericTableSchema> bridgeTableSchema = dbSchema.table(OVSDBSCHEMATABLES.BRIDGE.getTableName(), GenericTableSchema.class);
-
- ColumnSchema<GenericTableSchema, String> name = bridgeTableSchema.column("name", String.class);
- ColumnSchema<GenericTableSchema, UUID> _uuid = bridgeTableSchema.column("_uuid", UUID.class);
-
- transactionBuilder.add(Operations.op.select(bridgeTableSchema)
- .column(name)
- .column(_uuid));
- ListenableFuture<List<OperationResult>> results = transactionBuilder.execute();
-
- Futures.addCallback(results, new FutureCallback<List<OperationResult>>(){
-
- @Override
- public void onFailure(Throwable arg0) {
- LOG.error("Read operation failure while reading {} database from ovsdb node "
- + "running on {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
- }
-
- @Override
- public void onSuccess(List<OperationResult> arg0) {
- LOG.info("Successfully read operational data from {} database of ovsdb node "
- + "running on {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
- for(OperationResult result : arg0){
- if(result.getError() != null){
- LOG.error("Error occured while fetching bridge operational data from ovsdb database : {}",result.getDetails());
- }else{
- for(Row<GenericTableSchema> row : result.getRows()){
- ReadWriteTransaction rwTransaction = db.newReadWriteTransaction();
-
- LOG.info("Row data {}",row.toString());
- Bridge bridge = ovsdbClient.getTypedRowWrapper(Bridge.class, row);
- final InstanceIdentifier<Node> nodePath = SouthboundMapper.createInstanceIdentifier(ovsdbClient);
- Optional<Node> node = Optional.absent();
- try{
- node = rwTransaction.read(LogicalDatastoreType.OPERATIONAL, nodePath).checkedGet();
- }catch (final ReadFailedException e) {
- LOG.info("Read Operational/DS for Node fail! {}", nodePath, e);
- }
- if(node.isPresent()){
- LOG.info("Node {} is present",node);
- NodeBuilder managedNodeBuilder = new NodeBuilder();
- NodeId manageNodeId = SouthboundMapper.createManagedNodeId(ovsdbClient.getConnectionInfo(), bridge.getUuid());
- managedNodeBuilder.setNodeId(manageNodeId);
-
- OvsdbManagedNodeAugmentationBuilder ovsdbManagedNodeBuilder = new OvsdbManagedNodeAugmentationBuilder();
- ovsdbManagedNodeBuilder.setBridgeName(bridge.getName());
- ovsdbManagedNodeBuilder.setBridgeUuid(new Uuid(bridge.getUuid().toString()));
- ovsdbManagedNodeBuilder.setManagedBy(new OvsdbNodeRef(nodePath));
- managedNodeBuilder.addAugmentation(OvsdbManagedNodeAugmentation.class, ovsdbManagedNodeBuilder.build());
-
- InstanceIdentifier<Node> managedNodePath = InstanceIdentifier
- .create(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
- .child(Node.class,new NodeKey(manageNodeId));
-
- LOG.info("Store managed node augmentation data {}",ovsdbManagedNodeBuilder.toString());
- rwTransaction.put(LogicalDatastoreType.OPERATIONAL, managedNodePath, managedNodeBuilder.build());
-
- //Update node with managed node reference
- NodeBuilder nodeBuilder = new NodeBuilder();
- nodeBuilder.setNodeId(SouthboundMapper.createNodeId(ovsdbClient.getConnectionInfo()));
-
- OvsdbNodeAugmentationBuilder ovsdbNodeBuilder = new OvsdbNodeAugmentationBuilder();
- List<OvsdbBridgeRef> managedNodes = new ArrayList<OvsdbBridgeRef>();
- managedNodes.add(new OvsdbBridgeRef(managedNodePath));
- ovsdbNodeBuilder.setManagedNodeEntry(managedNodes);
-
- nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, ovsdbNodeBuilder.build());
-
- LOG.info("Update node with managed node ref {}",ovsdbNodeBuilder.toString());
- rwTransaction.merge(LogicalDatastoreType.OPERATIONAL, nodePath, nodeBuilder.build());
-
- Futures.addCallback(rwTransaction.submit(),new FutureCallback<Void>(){
-
- @Override
- public void onFailure(Throwable arg0) {
- LOG.info("Write to Operational Data Store for managed node {} failed with exception {}!",nodePath,arg0 );
- }
-
- @Override
- public void onSuccess(Void arg0) {
- LOG.info("Managed node's operational data stored successfully to md-sal operational data store.");
- }
-
- });
- }else{
- LOG.info("Node is not present in the operational data store, skipping bridge operational data write to data store");
- }
-
- }
- }
- }
- }
-
- });
-
- }
-
- @Override
- public void fetchAndUpdateOperData(final OvsdbClient ovsdbClient, final DataBroker db) {
- final ReadWriteTransaction rwTransaction = db.newReadWriteTransaction();
- DatabaseSchema dbSchema = ovsdbClient.getDatabaseSchema(OvsdbSchemaContants.databaseName);
- TransactionBuilder transactionBuilder = ovsdbClient.transactBuilder(dbSchema);
- TableSchema<GenericTableSchema> childTableSchema = dbSchema.table(OVSDBSCHEMATABLES.BRIDGE.getTableName(), GenericTableSchema.class);
- transactionBuilder.add(Operations.op.select(childTableSchema)).build();
- ListenableFuture<List<OperationResult>> results = transactionBuilder.execute();
-
- Futures.addCallback(results, new FutureCallback<List<OperationResult>>(){
-
- @Override
- public void onFailure(Throwable arg0) {
- LOG.error("Read operation failure while reading {} database from ovsdb node "
- + "running on {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
- }
-
- @Override
- public void onSuccess(List<OperationResult> arg0) {
- LOG.info("Successfully read operational data from {} database of ovsdb node "
- + "running on {}",OVSDBSCHEMATABLES.BRIDGE.getTableName(),ovsdbClient.getConnectionInfo().toString());
-
- for(OperationResult result : arg0){
- if(result.getError() != null){
- LOG.error("Error occured while fetching bridge operational data from ovsdb database : {}",result.getDetails());
- }else{
- for(Row<GenericTableSchema> row : result.getRows()){
- Bridge bridge = ovsdbClient.getTypedRowWrapper(Bridge.class, row);
- final InstanceIdentifier<Node> nodeIdent = SouthboundMapper.createInstanceIdentifier(ovsdbClient);
- Optional<Node> node = Optional.absent();
-
- try{
- node = rwTransaction.read(LogicalDatastoreType.OPERATIONAL, nodeIdent).checkedGet();
- }catch (final ReadFailedException e) {
- LOG.info("Read Operational/DS for Node fail! {}", nodeIdent, e);
- }
- if(node.isPresent()){
- LOG.info("Node {} is present",node);
- NodeBuilder managedNodeBuilder = new NodeBuilder();
- NodeId manageNodeId = SouthboundMapper.createManagedNodeId(ovsdbClient.getConnectionInfo(), bridge.getUuid());
- managedNodeBuilder.setNodeId(manageNodeId);
-
- OvsdbManagedNodeAugmentationBuilder ovsdbManagedNodeBuilder = new OvsdbManagedNodeAugmentationBuilder();
- ovsdbManagedNodeBuilder.setBridgeName(bridge.getName());
- ovsdbManagedNodeBuilder.setBridgeUuid(new Uuid(bridge.getUuid().toString()));
- ovsdbManagedNodeBuilder.setManagedBy(new OvsdbNodeRef(nodeIdent));
- managedNodeBuilder.addAugmentation(OvsdbManagedNodeAugmentation.class, ovsdbManagedNodeBuilder.build());
-
- InstanceIdentifier<Node> managedNodePath = InstanceIdentifier
- .create(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
- .child(Node.class,new NodeKey(manageNodeId));
-
- LOG.info("Store managed node augmentation data {}",ovsdbManagedNodeBuilder.toString());
- rwTransaction.merge(LogicalDatastoreType.OPERATIONAL, managedNodePath, managedNodeBuilder.build());
- Futures.addCallback(rwTransaction.submit(),new FutureCallback<Void>(){
-
- @Override
- public void onFailure(Throwable arg0) {
- LOG.info("Write to Operational Data Store for managed node {} failed with exception {}!",nodeIdent,arg0 );
- }
-
- @Override
- public void onSuccess(Void arg0) {
- LOG.info("Managed node operational data stored successfully to md-sal operational data store.");
- }
-
- });
- }
-
- }
- }
- }
- }
-
- });
-
- }
-
- @Override
- public void deleteOperData(OvsdbClient ovsdbClient, DataBroker db) {
- /* TODO Operational data of managed node need to be deleted in following two scenario
- * 1) When node is disconnected
- * 2) When any specific managed node will be deleted.
- * In case of 1), all the managed node data will get deleted with node data.
- * In case of 2), whenever user delete any bridge, ovsdb south bound plugin
- * should get the notification and it should update the data using fetchAndUpdateOperData
- */
-
- }
-}
\ No newline at end of file
return true;
}
- InstanceIdentifier<Node> toInstanceIndentifier() {
+ public InstanceIdentifier<Node> toInstanceIndentifier() {
return SouthboundMapper.createInstanceIdentifier(ipaddress,port);
}
}
import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
import org.opendaylight.ovsdb.lib.schema.TableSchema;
import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeCreateCommand;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionInstance.class);
private OvsdbClient client;
private OvsdbClientKey key;
+ private TransactionInvoker txInvoker;
private MonitorCallBack callback;
- OvsdbConnectionInstance(OvsdbClientKey key,OvsdbClient client) {
+ OvsdbConnectionInstance(OvsdbClientKey key,OvsdbClient client,TransactionInvoker txInvoker) {
this.key = key;
this.client = client;
- this.callback = new OvsdbMonitorCallback();
+ this.txInvoker = txInvoker;
+ txInvoker.invoke(new OvsdbNodeCreateCommand(key, null,null));
+ registerCallBack();
+ }
+
+ private void registerCallBack() {
+ this.callback = new OvsdbMonitorCallback(key,txInvoker);
try {
List<String> databases = getDatabases().get();
if(databases != null) {
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
-import org.opendaylight.ovsdb.southbound.OvsdbDataCollectionOperation.OperationType;
+import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.overlay.rev150105.IpPortLocator;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentation;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable {
Map<OvsdbClientKey,OvsdbConnectionInstance> clients = new ConcurrentHashMap<OvsdbClientKey,OvsdbConnectionInstance>();
private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
- DataBroker db;
+ private DataBroker db;
+ private TransactionInvoker txInvoker;
- private OvsdbOperationalDataCollectionManager ovsdbOperDataCollectionManager;
-
- public OvsdbConnectionManager(DataBroker db) {
+ public OvsdbConnectionManager(DataBroker db,TransactionInvoker txInvoker) {
this.db = db;
- ovsdbOperDataCollectionManager = new OvsdbOperationalDataCollectionManagerImpl();
+ this.txInvoker = txInvoker;
}
@Override
LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
externalClient.getConnectionInfo().getRemotePort());
OvsdbClientKey key = new OvsdbClientKey(externalClient);
- OvsdbConnectionInstance client = new OvsdbConnectionInstance(key,externalClient);
+ OvsdbConnectionInstance client = new OvsdbConnectionInstance(key,externalClient,txInvoker);
clients.put(key, client);
- WriteTransaction transaction = db.newWriteOnlyTransaction();
- transaction.put(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier(),
- SouthboundMapper.createNode(client));
-
- // Hook it to bridge operational data collector
- Futures.addCallback(transaction.submit(), new FutureCallback<Void>(){
-
- @Override
- public void onFailure(Throwable arg0) {
- LOG.info("Transaction failed while writing Node data to operational data store");
- }
-
- @Override
- public void onSuccess(Void arg0) {
- LOG.info("Node data is stored successfully to operational data store");
- ovsdbOperDataCollectionManager.enqueue(new OvsdbBridgeOperDataCollector(OperationType.FETCH_OVSDB_OPER_DATA,externalClient,db));
- }
-
- });
}
@Override
LOG.info("OVSDB Disconnect from {}:{}",client.getConnectionInfo().getRemoteAddress(),
client.getConnectionInfo().getRemotePort());
OvsdbClientKey key = new OvsdbClientKey(client);
- WriteTransaction transaction = db.newWriteOnlyTransaction();
- transaction.delete(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier());
- // TODO - Check the future and retry if needed
- transaction.submit();
+ txInvoker.invoke(new OvsdbNodeRemoveCommand(key,null,null));
clients.remove(key);
}
// TODO use transaction chains to handle ordering issues between disconnected and connected when writing to the operational store
InetAddress ip = SouthboundMapper.createInetAddress(ovsdbNode.getIp());
OvsdbClient client = OvsdbConnectionService.getService().connect(ip, ovsdbNode.getPort().getValue().intValue());
- OvsdbClientKey key = new OvsdbClientKey(client);
- OvsdbConnectionInstance instance = new OvsdbConnectionInstance(key,client);
- clients.put(key, instance);
connected(client); // For connections from the controller to the ovs instance, the library doesn't call this method for us
return client;
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.ovsdb.southbound;
-
-import java.util.concurrent.Callable;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.ovsdb.lib.OvsdbClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Main responsibility of the class is to define the interface that various
- * config operataion can extend to. It define common interface for data store
- * operations.
- *
- * @author Anil Vishnoi (avishnoi@brocade.com)
- *
- */
-public abstract class OvsdbDataCollectionOperation implements Callable<Object>{
- private static final Logger LOG = LoggerFactory.getLogger(OvsdbDataCollectionOperation.class);
-
- public enum OperationType {
- FETCH_OVSDB_OPER_DATA,
- REFERESH_OVSDB_OPER_DATA,
- DELETE_OVSDB_OPER_DATA
- }
-
- private OperationType operType = OperationType.FETCH_OVSDB_OPER_DATA;
- private OvsdbClient ovsdbClient = null;
- private DataBroker db;
-
-
- public OvsdbDataCollectionOperation(final OperationType operType, final OvsdbClient ovsdbClient, final DataBroker db){
- Preconditions.checkNotNull(ovsdbClient);
- this.ovsdbClient = ovsdbClient;
- this.operType = operType;
- this.db = db;
- }
- @Override
- public Object call(){
- if( operType == OperationType.FETCH_OVSDB_OPER_DATA ){
- LOG.debug("Fetch ovsdb operational data from {} and store it in the md-sal data store.",this.ovsdbClient);
- this.fetchAndStoreOperData(ovsdbClient, db);
- }else if( operType == OperationType.REFERESH_OVSDB_OPER_DATA ){
- LOG.debug("Fetch ovsdb operational data from {} and update the existing data in the md-sal data store.",this.ovsdbClient);
- this.fetchAndUpdateOperData(ovsdbClient, db);
- }else if( operType == OperationType.DELETE_OVSDB_OPER_DATA ){
- LOG.debug("Delete operational data fetched from {}",this.ovsdbClient);
- this.deleteOperData(ovsdbClient, db);
- }
- return operType;
- }
-
- public abstract void fetchAndStoreOperData(final OvsdbClient ovsdbClient, final DataBroker db);
-
- public abstract void fetchAndUpdateOperData(final OvsdbClient ovsdbClient, final DataBroker db);
-
- public abstract void deleteOperData( final OvsdbClient ovsdbClient, final DataBroker db);
-}
package org.opendaylight.ovsdb.southbound;
import org.opendaylight.ovsdb.lib.MonitorCallBack;
-import org.opendaylight.ovsdb.lib.message.TableUpdate;
import org.opendaylight.ovsdb.lib.message.TableUpdates;
-import org.opendaylight.ovsdb.lib.notation.Row;
-import org.opendaylight.ovsdb.lib.notation.UUID;
-import org.opendaylight.ovsdb.lib.schema.ColumnSchema;
import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
-import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
+import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbOperationalCommandAggregator;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OvsdbMonitorCallback implements MonitorCallBack {
private static final Logger LOG = LoggerFactory.getLogger(OvsdbMonitorCallback.class);
+ private TransactionInvoker txInvoker;
+ private OvsdbClientKey key;
+
+ OvsdbMonitorCallback(OvsdbClientKey key,TransactionInvoker txInvoker) {
+ this.txInvoker = txInvoker;
+ this.key = key;
+ }
+
@Override
public void update(TableUpdates result, DatabaseSchema dbSchema) {
LOG.debug("result: {} dbSchema: {}",result,dbSchema);
- GenericTableSchema bridge = dbSchema.table("Bridge", GenericTableSchema.class);
- ColumnSchema<GenericTableSchema, String> name = bridge.column("name", String.class);
- TableUpdate<GenericTableSchema> update = result.getUpdate(bridge);
- if (update != null) {
- for (UUID uuid : update.getRows().keySet()) {
- Row<GenericTableSchema> schemaRow = update.getNew(uuid);
- String bridgeName = schemaRow.getColumn(name).getData();
- LOG.trace("Bridge name: {}", bridgeName);
- }
- }
+ txInvoker.invoke(new OvsdbOperationalCommandAggregator(key, result, dbSchema));
}
@Override
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.ovsdb.southbound;
-
-/**
- *
- * @author Anil Vishnoi (avishnoi@brocade.com)
- *
- */
-public interface OvsdbOperationalDataCollectionManager {
-
- public void enqueue(final OvsdbDataCollectionOperation ovsdbDataStoreOper);
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.ovsdb.southbound;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class OvsdbOperationalDataCollectionManagerImpl implements
- OvsdbOperationalDataCollectionManager {
- private static final Logger LOG = LoggerFactory.getLogger(OvsdbOperationalDataCollectionManagerImpl.class);
-
- private final ExecutorService statDataStoreOperationServ;
-
- public OvsdbOperationalDataCollectionManagerImpl(){
- /*
- * Using single thread executor as of now. If going forward we figure out
- * that there is no order dependency between ovsdb operational data, we
- * can move to multi thread executor service.
- */
- ThreadFactory threadFact;
- threadFact = new ThreadFactoryBuilder().setNameFormat("ovsdb-oper-data-collection-manager-%d").build();
- statDataStoreOperationServ = Executors.newSingleThreadExecutor(threadFact);
- }
-
- @Override
- public void enqueue(OvsdbDataCollectionOperation ovsdbDataStoreOper) {
- LOG.info("Enqueued task {} for execution",ovsdbDataStoreOper);
- statDataStoreOperationServ.submit(ovsdbDataStoreOper);
- }
-}
\ No newline at end of file
nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, createOvsdbAugmentation(client));
return nodeBuilder.build();
}
+ public static Node createNode(OvsdbClientKey key) {
+ NodeBuilder nodeBuilder = new NodeBuilder();
+ nodeBuilder.setNodeId(createNodeId(key.getIp(),key.getPort()));
+ nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, createOvsdbAugmentation(key));
+ return nodeBuilder.build();
+ }
public static OvsdbNodeAugmentation createOvsdbAugmentation(OvsdbClient client) {
+ return createOvsdbAugmentation(new OvsdbClientKey(client));
+ }
+
+ public static OvsdbNodeAugmentation createOvsdbAugmentation(OvsdbClientKey key) {
OvsdbNodeAugmentationBuilder ovsdbNodeBuilder = new OvsdbNodeAugmentationBuilder();
- ovsdbNodeBuilder.setIp(createIpAddress(client.getConnectionInfo().getRemoteAddress()));
- ovsdbNodeBuilder.setPort(new PortNumber(client.getConnectionInfo().getRemotePort()));
+ ovsdbNodeBuilder.setIp(key.getIp());
+ ovsdbNodeBuilder.setPort(key.getPort());
return ovsdbNodeBuilder.build();
}
}
public static NodeId createManagedNodeId(OvsdbConnectionInfo connectionInfo, UUID managedNodeId) {
- return new NodeId(createNodeId(createIpAddress(connectionInfo.getRemoteAddress()),
- new PortNumber(connectionInfo.getRemotePort())).getValue()
- + "/"+SouthboundConstants.BRIDGE_URI_PREFIX+":"+managedNodeId.toString());
+ return createManagedNodeId(createIpAddress(connectionInfo.getRemoteAddress()),
+ new PortNumber(connectionInfo.getRemotePort()),
+ managedNodeId);
+ }
+
+ public static NodeId createManagedNodeId(OvsdbClientKey key, UUID managedModeId) {
+ return createManagedNodeId(key.getIp(),key.getPort(),managedModeId);
+ }
+
+ public static NodeId createManagedNodeId(IpAddress ip, PortNumber port, UUID managedModeId) {
+ return new NodeId(createNodeId(ip,port)
+ + "/"+SouthboundConstants.BRIDGE_URI_PREFIX+":"+managedModeId.toString());
}
public static NodeId createNodeId(IpAddress ip, PortNumber port) {
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.ovsdb.lib.OvsdbConnection;
import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
+import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvokerImpl;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
private DataBroker db;
private OvsdbConnectionManager cm;
private OvsdbNodeDataChangeListener ovsdbNodeListener;
-
+ private TransactionInvoker txInvoker;
@Override
public void onSessionInitiated(ProviderContext session) {
LOG.info("SouthboundProvider Session Initiated");
db = session.getSALService(DataBroker.class);
- cm = new OvsdbConnectionManager(db);
+ this.txInvoker = new TransactionInvokerImpl(db);
+ cm = new OvsdbConnectionManager(db,txInvoker);
ovsdbNodeListener = new OvsdbNodeDataChangeListener(db, cm);
initializeOvsdbTopology(LogicalDatastoreType.OPERATIONAL);
initializeOvsdbTopology(LogicalDatastoreType.CONFIGURATION);
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.schema.openvswitch.Bridge;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+import org.opendaylight.ovsdb.southbound.SouthboundConstants;
+import org.opendaylight.ovsdb.southbound.SouthboundMapper;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbManagedNodeAugmentationBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentationBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeRef;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+public class OvsdbBridgeUpdateCommand implements TransactionCommand {
+ private static final Logger LOG = LoggerFactory.getLogger(OvsdbBridgeUpdateCommand.class);
+
+ private TableUpdates updates;
+ private DatabaseSchema dbSchema;
+
+ private OvsdbClientKey key;
+
+ public OvsdbBridgeUpdateCommand(OvsdbClientKey key,TableUpdates updates, DatabaseSchema dbSchema) {
+ this.updates = updates;
+ this.dbSchema = dbSchema;
+ this.key = key;
+ }
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ List<TypedBaseTable<?>> updatedRows = TransactionUtils.extractRowsUpdated(Bridge.class, updates, dbSchema);
+ for(TypedBaseTable<?> updatedRow : updatedRows) {
+ if(updatedRow instanceof Bridge) {
+ Bridge bridge = (Bridge)updatedRow;
+ final InstanceIdentifier<Node> nodePath = key.toInstanceIndentifier();
+ Optional<Node> node = Optional.absent();
+ try{
+ node = transaction.read(LogicalDatastoreType.OPERATIONAL, nodePath).checkedGet();
+ }catch (final ReadFailedException e) {
+ LOG.debug("Read Operational/DS for Node fail! {}", nodePath, e);
+ }
+ if(node.isPresent()){
+ LOG.info("Node {} is present",node);
+ NodeBuilder managedNodeBuilder = new NodeBuilder();
+ NodeId manageNodeId = SouthboundMapper.createManagedNodeId(key, bridge.getUuid());
+ managedNodeBuilder.setNodeId(manageNodeId);
+
+ OvsdbManagedNodeAugmentationBuilder ovsdbManagedNodeBuilder = new OvsdbManagedNodeAugmentationBuilder();
+ ovsdbManagedNodeBuilder.setBridgeName(bridge.getName());
+ ovsdbManagedNodeBuilder.setBridgeUuid(new Uuid(bridge.getUuid().toString()));
+ ovsdbManagedNodeBuilder.setManagedBy(new OvsdbNodeRef(nodePath));
+ managedNodeBuilder.addAugmentation(OvsdbManagedNodeAugmentation.class, ovsdbManagedNodeBuilder.build());
+
+ InstanceIdentifier<Node> managedNodePath = InstanceIdentifier
+ .create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
+ .child(Node.class,new NodeKey(manageNodeId));
+
+ LOG.debug("Store managed node augmentation data {}",ovsdbManagedNodeBuilder.toString());
+ transaction.put(LogicalDatastoreType.OPERATIONAL, managedNodePath, managedNodeBuilder.build());
+
+ //Update node with managed node reference
+ NodeBuilder nodeBuilder = new NodeBuilder();
+ nodeBuilder.setNodeId(SouthboundMapper.createNodeId(key.getIp(),key.getPort()));
+
+ OvsdbNodeAugmentationBuilder ovsdbNodeBuilder = new OvsdbNodeAugmentationBuilder();
+ List<OvsdbBridgeRef> managedNodes = new ArrayList<OvsdbBridgeRef>();
+ managedNodes.add(new OvsdbBridgeRef(managedNodePath));
+ ovsdbNodeBuilder.setManagedNodeEntry(managedNodes);
+
+ nodeBuilder.addAugmentation(OvsdbNodeAugmentation.class, ovsdbNodeBuilder.build());
+
+ LOG.debug("Update node with managed node ref {}",ovsdbNodeBuilder.toString());
+ transaction.merge(LogicalDatastoreType.OPERATIONAL, nodePath, nodeBuilder.build());
+
+ }
+ }
+ }
+ }
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+import org.opendaylight.ovsdb.southbound.SouthboundMapper;
+
+public class OvsdbNodeCreateCommand implements TransactionCommand {
+
+ private OvsdbClientKey key;
+
+ public OvsdbNodeCreateCommand(OvsdbClientKey key,TableUpdates updates,DatabaseSchema dbSchema) {
+ this.key = key;
+ }
+
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ transaction.put(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier(),
+ SouthboundMapper.createNode(key));
+ }
+
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class OvsdbNodeRemoveCommand implements TransactionCommand {
+ private static final Logger LOG = LoggerFactory.getLogger(OvsdbNodeRemoveCommand.class);
+
+ private OvsdbClientKey key;
+
+ public OvsdbNodeRemoveCommand(OvsdbClientKey key,TableUpdates updates,DatabaseSchema dbSchema) {
+ this.key = key;
+ }
+
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ CheckedFuture<Optional<Node>, ReadFailedException> ovsdbNodeFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier());
+ Optional<Node> ovsdbNodeOptional;
+ try {
+ ovsdbNodeOptional = ovsdbNodeFuture.get();
+ if(ovsdbNodeOptional.isPresent()) {
+ Node ovsdbNode = ovsdbNodeOptional.get();
+ OvsdbNodeAugmentation ovsdbNodeAugmentation = ovsdbNode.getAugmentation(OvsdbNodeAugmentation.class);
+ for(OvsdbBridgeRef managedNode: ovsdbNodeAugmentation.getManagedNodeEntry()) {
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, managedNode.getValue());
+ }
+ transaction.delete(LogicalDatastoreType.OPERATIONAL, key.toInstanceIndentifier());
+ }
+ } catch (Exception e) {
+ LOG.warn("Failure to delete ovsdbNode {}",e);
+ }
+ }
+
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.southbound.OvsdbClientKey;
+
+public class OvsdbOperationalCommandAggregator implements TransactionCommand {
+
+
+ private List<TransactionCommand> commands = new ArrayList<TransactionCommand>();
+
+ public OvsdbOperationalCommandAggregator(OvsdbClientKey key,TableUpdates updates, DatabaseSchema dbSchema) {
+ commands.add(new OvsdbBridgeUpdateCommand(key, updates, dbSchema));
+ }
+
+ @Override
+ public void execute(ReadWriteTransaction transaction) {
+ for(TransactionCommand command: commands) {
+ command.execute(transaction);
+ }
+ }
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+
+public interface TransactionCommand {
+
+ public void execute(ReadWriteTransaction transaction);
+
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+public interface TransactionInvoker {
+
+ public void invoke(TransactionCommand command);
+
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
+ private static final int QUEUE_SIZE = 10000;
+ private BindingTransactionChain chain;
+ private DataBroker db;
+ private BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<TransactionCommand>(QUEUE_SIZE);
+ private BlockingQueue<ReadWriteTransaction> successfulTransactionQueue = new LinkedBlockingQueue<ReadWriteTransaction>(QUEUE_SIZE);
+ private BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<AsyncTransaction<?, ?>>(QUEUE_SIZE);
+ private ExecutorService executor;
+ private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
+ private List<ReadWriteTransaction> pendingTransactions = new ArrayList<ReadWriteTransaction>();
+
+ public TransactionInvokerImpl(DataBroker db) {
+ this.db = db;
+ this.chain = db.createTransactionChain(this);
+ ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
+ executor = Executors.newSingleThreadExecutor(threadFact);
+ executor.submit(this);
+ }
+
+ @Override
+ public void invoke(final TransactionCommand command) {
+ // TODO what do we do if queue is full?
+ inputQueue.offer(command);
+ }
+
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain,
+ AsyncTransaction<?, ?> transaction, Throwable cause) {
+ failedTransactionQueue.offer(transaction);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ // NO OP
+
+ }
+
+ @Override
+ public void run() {
+ while(true) {
+ forgetSuccessfulTransactions();
+ try {
+ List<TransactionCommand> commands = extractCommands();
+ for(TransactionCommand command: commands) {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ successfulTransactionQueue.offer(transaction);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // NOOP - handled by failure of transaction chain
+ }
+ });
+ }
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
+ private List<TransactionCommand> extractResubmitCommands() {
+ AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+ List<TransactionCommand> commands = new ArrayList<TransactionCommand>();
+ if(transaction != null) {
+ int index = pendingTransactions.lastIndexOf(transaction);
+ List<ReadWriteTransaction> transactions = pendingTransactions.subList(index, pendingTransactions.size()-1);
+ for(ReadWriteTransaction tx: transactions) {
+ commands.add(transactionToCommand.get(tx));
+ }
+ resetTransactionQueue();
+ }
+ return commands;
+ }
+
+ private void resetTransactionQueue() {
+ chain.close();
+ chain = db.createTransactionChain(this);
+ pendingTransactions = new ArrayList<ReadWriteTransaction>();
+ transactionToCommand = new HashMap<ReadWriteTransaction,TransactionCommand>();
+ failedTransactionQueue.clear();
+ successfulTransactionQueue.clear();
+ }
+
+ private void recordPendingTransaction(TransactionCommand command,
+ final ReadWriteTransaction transaction) {
+ transactionToCommand.put(transaction, command);
+ pendingTransactions.add(transaction);
+ }
+
+ private List<TransactionCommand> extractCommands() throws InterruptedException {
+ List<TransactionCommand> commands = extractResubmitCommands();
+ commands.addAll(extractCommandsFromQueue());
+ return commands;
+ }
+
+ private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
+ List<TransactionCommand> result = new ArrayList<TransactionCommand>();
+ TransactionCommand command = inputQueue.take();
+ while(command != null) {
+ result.add(command);
+ command = inputQueue.poll();
+ }
+ return result;
+ }
+
+ private void forgetSuccessfulTransactions() {
+ ReadWriteTransaction transaction = successfulTransactionQueue.poll();
+ while(transaction != null) {
+ pendingTransactions.remove(transaction);
+ transactionToCommand.remove(transaction);
+ transaction = successfulTransactionQueue.poll();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // TODO Auto-generated method stub
+ }
+}
--- /dev/null
+package org.opendaylight.ovsdb.southbound.transactions.md;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.opendaylight.ovsdb.lib.message.TableUpdate;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.notation.Row;
+import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
+import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
+import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
+
+import com.google.common.base.Preconditions;
+
+public class TransactionUtils {
+
+ public static List<TypedBaseTable<?>> extractRowsUpdated(Class<? extends TypedBaseTable<?>> klazz,TableUpdates updates,DatabaseSchema dbSchema) {
+ Preconditions.checkNotNull(klazz);
+ Preconditions.checkNotNull(updates);
+ Preconditions.checkNotNull(dbSchema);
+ List<TypedBaseTable<?>> result = new ArrayList<TypedBaseTable<?>>();
+
+ List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> rowUpdates = extractRowUpdates(klazz,updates,dbSchema);
+ for (TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema> rowUpdate : rowUpdates) {
+ if(rowUpdate != null) {
+ if(rowUpdate.getNew() != null) {
+ Row<GenericTableSchema> row = rowUpdate.getNew();
+ result.add(TyperUtils.getTypedRowWrapper(dbSchema,klazz,row));
+ }
+ }
+ }
+ return result;
+ }
+
+ public static List<TypedBaseTable<?>> extractRowsRemoved(Class<? extends TypedBaseTable<?>> klazz,TableUpdates updates,DatabaseSchema dbSchema) {
+ Preconditions.checkNotNull(klazz);
+ Preconditions.checkNotNull(updates);
+ Preconditions.checkNotNull(dbSchema);
+ List<TypedBaseTable<?>> result = new ArrayList<TypedBaseTable<?>>();
+
+ List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> rowUpdates = extractRowUpdates(klazz,updates,dbSchema);
+ for (TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema> rowUpdate : rowUpdates) {
+ if(rowUpdate != null) {
+ if(rowUpdate.getNew() == null && rowUpdate.getOld() != null) {
+ Row<GenericTableSchema> row = rowUpdate.getOld();
+ result.add(TyperUtils.getTypedRowWrapper(dbSchema,klazz,row));
+ }
+ }
+ }
+ return result;
+ }
+
+ public static List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> extractRowUpdates(Class<? extends TypedBaseTable<?>> klazz,TableUpdates updates,DatabaseSchema dbSchema) {
+ Preconditions.checkNotNull(klazz);
+ Preconditions.checkNotNull(updates);
+ Preconditions.checkNotNull(dbSchema);
+ List<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> result = new ArrayList<TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>>();
+ TableUpdate<GenericTableSchema> update = updates.getUpdate(TyperUtils.getTableSchema(dbSchema, klazz));
+ if(update != null) {
+ Map<UUID, TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema>> rows = update.getRows();
+ if(rows != null) {
+ for(TableUpdate<GenericTableSchema>.RowUpdate<GenericTableSchema> rowUpdate : rows.values()) {
+ if(rowUpdate != null) {
+ result.add(rowUpdate);
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+}
+