X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Finventory-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Finventory%2Fmanager%2FNodeChangeCommiter.java;h=57ec89307678ae5e28641671182650e8de9254b6;hp=674ae398d3194d06e8a097d28be01a5b8d7a1ec3;hb=d206d27042eef2185c875f85cf6eac61a1bd77c4;hpb=b5e7ec9a2ecf6225c6c9bbe5f63c1b0629a43998 diff --git a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java index 674ae398d3..57ec893076 100644 --- a/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java +++ b/opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java @@ -7,15 +7,20 @@ */ package org.opendaylight.controller.md.inventory.manager; -import java.util.concurrent.Future; - -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; -import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; @@ -29,125 +34,133 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.No import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; - -public class NodeChangeCommiter implements OpendaylightInventoryListener { - - protected final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class); +class NodeChangeCommiter implements OpendaylightInventoryListener { + private static final Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class); private final FlowCapableInventoryProvider manager; public NodeChangeCommiter(final FlowCapableInventoryProvider manager) { - this.manager = manager; - } - - public FlowCapableInventoryProvider getManager() { - return this.manager; + this.manager = Preconditions.checkNotNull(manager); } @Override public synchronized void onNodeConnectorRemoved(final NodeConnectorRemoved connector) { - - final NodeConnectorRef ref = connector.getNodeConnectorRef(); - final DataModificationTransaction it = this.getManager().startChange(); - LOG.debug("removing node connector {} ", ref.getValue()); - it.removeOperationalData(ref.getValue()); - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector removal", ref.getValue()); + LOG.debug("Node connector removed notification received."); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(final ReadWriteTransaction tx) { + final NodeConnectorRef ref = connector.getNodeConnectorRef(); + LOG.debug("removing node connector {} ", ref.getValue()); + tx.delete(LogicalDatastoreType.OPERATIONAL, ref.getValue()); + } + }); } @Override public synchronized void onNodeConnectorUpdated(final NodeConnectorUpdated connector) { - - final NodeConnectorRef ref = connector.getNodeConnectorRef(); - final FlowCapableNodeConnectorUpdated flowConnector = connector - .getAugmentation(FlowCapableNodeConnectorUpdated.class); - final DataModificationTransaction it = this.manager.startChange(); - final NodeConnectorBuilder data = new NodeConnectorBuilder(connector); - NodeConnectorId id = connector.getId(); - NodeConnectorKey nodeConnectorKey = new NodeConnectorKey(id); - data.setKey(nodeConnectorKey); - boolean notEquals = (!Objects.equal(flowConnector, null)); - if (notEquals) { - final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector); - data.addAugmentation(FlowCapableNodeConnector.class, augment); - } - InstanceIdentifier value = ref.getValue(); - LOG.debug("updating node connector : {}.", value); - NodeConnector build = data.build(); - it.putOperationalData((value), build); - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector update", ref.getValue()); + LOG.debug("Node connector updated notification received."); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(final ReadWriteTransaction tx) { + final NodeConnectorRef ref = connector.getNodeConnectorRef(); + final NodeConnectorBuilder data = new NodeConnectorBuilder(connector); + data.setKey(new NodeConnectorKey(connector.getId())); + + final FlowCapableNodeConnectorUpdated flowConnector = connector + .getAugmentation(FlowCapableNodeConnectorUpdated.class); + if (flowConnector != null) { + final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector); + data.addAugmentation(FlowCapableNodeConnector.class, augment); + } + InstanceIdentifier value = (InstanceIdentifier) ref.getValue(); + LOG.debug("updating node connector : {}.", value); + NodeConnector build = data.build(); + tx.put(LogicalDatastoreType.OPERATIONAL, value, build); + } + }); } @Override public synchronized void onNodeRemoved(final NodeRemoved node) { - - final NodeRef ref = node.getNodeRef(); - final DataModificationTransaction it = this.manager.startChange(); - LOG.debug("removing node : {}", ref.getValue()); - it.removeOperationalData((ref.getValue())); - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "node removal", ref.getValue()); + LOG.debug("Node removed notification received."); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(final ReadWriteTransaction tx) { + final NodeRef ref = node.getNodeRef(); + LOG.debug("removing node : {}", ref.getValue()); + tx.delete(LogicalDatastoreType.OPERATIONAL, ref.getValue()); + } + }); } @Override public synchronized void onNodeUpdated(final NodeUpdated node) { - - final NodeRef ref = node.getNodeRef(); - final FlowCapableNodeUpdated flowNode = node - . getAugmentation(FlowCapableNodeUpdated.class); - final DataModificationTransaction it = this.manager.startChange(); - final NodeBuilder nodeBuilder = new NodeBuilder(node); - nodeBuilder.setKey(new NodeKey(node.getId())); - boolean equals = Objects.equal(flowNode, null); - if (equals) { + final FlowCapableNodeUpdated flowNode = node.getAugmentation(FlowCapableNodeUpdated.class); + if (flowNode == null) { return; } - final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode); - nodeBuilder.addAugmentation(FlowCapableNode.class, augment); - InstanceIdentifier value = ref.getValue(); - InstanceIdentifierBuilder builder = ((InstanceIdentifier) value).builder(); - InstanceIdentifierBuilder augmentation = builder - . augmentation(FlowCapableNode.class); - final InstanceIdentifier path = augmentation.build(); - LOG.debug("updating node :{} ", path); - it.putOperationalData(path, augment); - - Future> commitResult = it.commit(); - listenOnTransactionState(it.getIdentifier(), commitResult, "node update", ref.getValue()); + LOG.debug("Node updated notification received."); + manager.enqueue(new InventoryOperation() { + @Override + public void applyOperation(ReadWriteTransaction tx) { + final NodeRef ref = node.getNodeRef(); + @SuppressWarnings("unchecked") + InstanceIdentifierBuilder builder = ((InstanceIdentifier) ref.getValue()).builder(); + InstanceIdentifierBuilder augmentation = builder.augmentation(FlowCapableNode.class); + final InstanceIdentifier path = augmentation.build(); + CheckedFuture readFuture = tx.read(LogicalDatastoreType.OPERATIONAL, path); + Futures.addCallback(readFuture, new FutureCallback>() { + @Override + public void onSuccess(Optional optional) { + enqueueWriteNodeDataTx(node, flowNode, path); + if (!optional.isPresent()) { + enqueuePutTable0Tx(ref); + } + } + + @Override + public void onFailure(Throwable throwable) { + LOG.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", node)); + enqueueWriteNodeDataTx(node, flowNode, path); + enqueuePutTable0Tx(ref); + } + }); + } + }); } - /** - * @param txId transaction identificator - * @param future transaction result - * @param action performed by transaction - * @param nodeConnectorPath target value - */ - private static void listenOnTransactionState(final Object txId, Future> future, - final String action, final InstanceIdentifier nodeConnectorPath) { - Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback>() { - + private void enqueueWriteNodeDataTx(final NodeUpdated node, final FlowCapableNodeUpdated flowNode, final InstanceIdentifier path) { + manager.enqueue(new InventoryOperation() { @Override - public void onFailure(Throwable t) { - LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId, t); - + public void applyOperation(final ReadWriteTransaction tx) { + final NodeBuilder nodeBuilder = new NodeBuilder(node); + nodeBuilder.setKey(new NodeKey(node.getId())); + + final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode); + nodeBuilder.addAugmentation(FlowCapableNode.class, augment); + LOG.debug("updating node :{} ", path); + tx.put(LogicalDatastoreType.OPERATIONAL, path, augment); } + }); + } + private void enqueuePutTable0Tx(final NodeRef ref) { + manager.enqueue(new InventoryOperation() { @Override - public void onSuccess(RpcResult result) { - if(!result.isSuccessful()) { - LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId); - } + public void applyOperation(ReadWriteTransaction tx) { + final TableKey tKey = new TableKey((short) 0); + final InstanceIdentifier tableIdentifier = + ((InstanceIdentifier) ref.getValue()).augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tKey)); + TableBuilder tableBuilder = new TableBuilder(); + Table table0 = tableBuilder.setId((short) 0).build(); + LOG.debug("writing table :{} ", tableIdentifier); + tx.put(LogicalDatastoreType.OPERATIONAL, tableIdentifier, table0, true); } }); }