Bug 948 : Inventory and topology manager shifted to new DataStore API 47/9747/9
authorMartin Bobak <mbobak@cisco.com>
Wed, 6 Aug 2014 05:22:08 +0000 (07:22 +0200)
committerMartin Bobak <mbobak@cisco.com>
Thu, 14 Aug 2014 14:18:24 +0000 (16:18 +0200)
Change-Id: I1a3f2fb47742da0622a096bd6305e7a848a59a36
Signed-off-by: Martin Bobak <mbobak@cisco.com>
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/FlowCapableInventoryProvider.java
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryActivator.java
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/InventoryOperation.java
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyProvider.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/TopologyOperation.java

index 9724d31f9ae6cd3ab8eaef0c23212bf626cfca84..ff3984a548eeb3a7d6ed2f93da04b2f1c801802b 100644 (file)
@@ -7,21 +7,20 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
     private static final int QUEUE_DEPTH = 500;
@@ -29,12 +28,13 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
 
     private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
     private final NotificationProviderService notificationService;
-    private final DataProviderService dataService;
+
+    private final DataBroker dataBroker;
     private ListenerRegistration<?> listenerRegistration;
     private Thread thread;
 
-    FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) {
-        this.dataService = Preconditions.checkNotNull(dataService);
+    FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) {
+        this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.notificationService = Preconditions.checkNotNull(notificationService);
     }
 
@@ -82,10 +82,10 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
     @Override
     public void run() {
         try {
-            for (;;) {
+            for (; ; ) {
                 InventoryOperation op = queue.take();
 
-                final DataModificationTransaction tx = dataService.beginTransaction();
+                final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
                 LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
 
                 int ops = 0;
@@ -102,14 +102,17 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
 
                 LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
 
-                try {
-                    final RpcResult<TransactionStatus> result = tx.commit().get();
-                    if(!result.isSuccessful()) {
-                        LOG.error("Transaction {} failed", tx.getIdentifier());
+                final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
+                Futures.addCallback(result, new FutureCallback<Object>() {
+                    @Override
+                    public void onSuccess(Object o) {
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
                     }
-                } catch (ExecutionException e) {
-                    LOG.warn("Failed to commit inventory change", e.getCause());
-                }
+                });
             }
         } catch (InterruptedException e) {
             LOG.info("Processing interrupted, terminating", e);
index 5bcae367e3e98a6270e01e270adbd2f231256fa9..991611aebc4492c63bd8ece7f7eb1f10289b89da 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,11 +21,11 @@ public class InventoryActivator extends AbstractBindingAwareProvider {
 
     @Override
     public void onSessionInitiated(final ProviderContext session) {
-        DataProviderService salDataService = session.getSALService(DataProviderService.class);
+        DataBroker dataBroker = session.getSALService(DataBroker.class);
         NotificationProviderService salNotifiService =
                 session.getSALService(NotificationProviderService.class);
 
-        provider = new FlowCapableInventoryProvider(salDataService, salNotifiService);
+        provider = new FlowCapableInventoryProvider(dataBroker, salNotifiService);
         provider.start();
     }
 
index 3be5fcf643fec84694a5ba0ae4eacc1e557a5c01..cfc95799839a40cac8ef195e79d8320c1e42e91e 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 
 interface InventoryOperation {
 
-    void applyOperation(DataModificationTransaction tx);
+    void applyOperation(ReadWriteTransaction tx);
 
 }
index 3db3c93fcce11ccf46569514b901ef3c9c99e60d..1b031990ab9f6808aed11c265e0672614559f4c2 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
@@ -30,8 +32,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdenti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 class NodeChangeCommiter implements OpendaylightInventoryListener {
     private static final Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
 
@@ -43,21 +43,23 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
 
     @Override
     public synchronized void onNodeConnectorRemoved(final NodeConnectorRemoved connector) {
+        LOG.debug("Node connector removed notification received.");
         manager.enqueue(new InventoryOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction tx) {
+            public void applyOperation(final ReadWriteTransaction tx) {
                 final NodeConnectorRef ref = connector.getNodeConnectorRef();
                 LOG.debug("removing node connector {} ", ref.getValue());
-                tx.removeOperationalData(ref.getValue());
+                tx.delete(LogicalDatastoreType.OPERATIONAL, ref.getValue());
             }
         });
     }
 
     @Override
     public synchronized void onNodeConnectorUpdated(final NodeConnectorUpdated connector) {
+        LOG.debug("Node connector updated notification received.");
         manager.enqueue(new InventoryOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction tx) {
+            public void applyOperation(final ReadWriteTransaction tx) {
                 final NodeConnectorRef ref = connector.getNodeConnectorRef();
                 final NodeConnectorBuilder data = new NodeConnectorBuilder(connector);
                 data.setKey(new NodeConnectorKey(connector.getId()));
@@ -68,22 +70,23 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
                     final FlowCapableNodeConnector augment = InventoryMapping.toInventoryAugment(flowConnector);
                     data.addAugmentation(FlowCapableNodeConnector.class, augment);
                 }
-                InstanceIdentifier<? extends Object> value = ref.getValue();
+                InstanceIdentifier<NodeConnector> value = (InstanceIdentifier<NodeConnector>) ref.getValue();
                 LOG.debug("updating node connector : {}.", value);
                 NodeConnector build = data.build();
-                tx.putOperationalData(value, build);
+                tx.put(LogicalDatastoreType.OPERATIONAL, value, build);
             }
         });
     }
 
     @Override
     public synchronized void onNodeRemoved(final NodeRemoved node) {
+        LOG.debug("Node removed notification received.");
         manager.enqueue(new InventoryOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction tx) {
+            public void applyOperation(final ReadWriteTransaction tx) {
                 final NodeRef ref = node.getNodeRef();
                 LOG.debug("removing node : {}", ref.getValue());
-                tx.removeOperationalData((ref.getValue()));
+                tx.delete(LogicalDatastoreType.OPERATIONAL, ref.getValue());
             }
         });
     }
@@ -94,10 +97,10 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
         if (flowNode == null) {
             return;
         }
-
+        LOG.debug("Node updated notification received.");
         manager.enqueue(new InventoryOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction tx) {
+            public void applyOperation(final ReadWriteTransaction tx) {
                 final NodeRef ref = node.getNodeRef();
                 final NodeBuilder nodeBuilder = new NodeBuilder(node);
                 nodeBuilder.setKey(new NodeKey(node.getId()));
@@ -110,7 +113,7 @@ class NodeChangeCommiter implements OpendaylightInventoryListener {
                 InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
                 final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
                 LOG.debug("updating node :{} ", path);
-                tx.putOperationalData(path, augment);
+                tx.put(LogicalDatastoreType.OPERATIONAL, path, augment);
             }
         });
     }
index 6dbfd7225b81fe7651881e21f954dee73c729a7f..d7ce9485c63ec92b9fdb48a576246faa0f5ad7e2 100644 (file)
@@ -15,8 +15,14 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
-import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener;
@@ -41,10 +47,12 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
+
+    private final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
     private final InstanceIdentifier<Topology> topology;
     private final OperationProcessor processor;
 
@@ -55,13 +63,21 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
 
     @Override
     public void onNodeRemoved(final NodeRemoved notification) {
+
+        final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
+        final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
+
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction transaction) {
-                NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
-                InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
-                transaction.removeOperationalData(nodeInstance);
-                removeAffectedLinks(transaction, nodeId);
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                removeAffectedLinks(nodeId);
+            }
+        });
+
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(ReadWriteTransaction transaction) {
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
             }
         });
     }
@@ -72,10 +88,10 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         if (fcnu != null) {
             processor.enqueueOperation(new TopologyOperation() {
                 @Override
-                public void applyOperation(final DataModificationTransaction transaction) {
-                    Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef());
-                    InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
-                    transaction.putOperationalData(path, node);
+                public void applyOperation(final ReadWriteTransaction transaction) {
+                    final Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef());
+                    final InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
+                    transaction.put(LogicalDatastoreType.OPERATIONAL, path, node);
                 }
             });
         }
@@ -83,15 +99,22 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
 
     @Override
     public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
+
+        final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
+                .getNodeConnectorRef());
+
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction transaction) {
-                InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
-                        .getNodeConnectorRef());
-                TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                final TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
+                removeAffectedLinks(tpId);
+            }
+        });
 
-                transaction.removeOperationalData(tpInstance);
-                removeAffectedLinks(transaction, tpId);
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(ReadWriteTransaction transaction) {
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
             }
         });
     }
@@ -102,16 +125,15 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         if (fcncu != null) {
             processor.enqueueOperation(new TopologyOperation() {
                 @Override
-                public void applyOperation(final DataModificationTransaction transaction) {
-                    NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId());
+                public void applyOperation(final ReadWriteTransaction transaction) {
+                    final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId());
                     TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()),
                             notification.getNodeConnectorRef());
-                    InstanceIdentifier<TerminationPoint> path = tpPath(nodeId, point.getKey().getTpId());
-
-                    transaction.putOperationalData(path, point);
+                    final InstanceIdentifier<TerminationPoint> path = tpPath(nodeId, point.getKey().getTpId());
+                    transaction.put(LogicalDatastoreType.OPERATIONAL, path, point);
                     if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
                             || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
-                        removeAffectedLinks(transaction, point.getTpId());
+                        removeAffectedLinks(point.getTpId());
                     }
                 }
             });
@@ -122,10 +144,10 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     public void onLinkDiscovered(final LinkDiscovered notification) {
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction transaction) {
-                Link link = toTopologyLink(notification);
-                InstanceIdentifier<Link> path = linkPath(link);
-                transaction.putOperationalData(path, link);
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                final Link link = toTopologyLink(notification);
+                final InstanceIdentifier<Link> path = linkPath(link);
+                transaction.put(LogicalDatastoreType.OPERATIONAL, path, link);
             }
         });
     }
@@ -139,8 +161,8 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     public void onLinkRemoved(final LinkRemoved notification) {
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final DataModificationTransaction transaction) {
-                transaction.removeOperationalData(linkPath(toTopologyLink(notification)));
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
             }
         });
     }
@@ -162,28 +184,58 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
     }
 
-    private void removeAffectedLinks(final DataModificationTransaction transaction, final NodeId id) {
-        TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction);
-        Topology topologyData = reader.readOperationalData(topology);
-        if (topologyData != null) {
-            for (Link link : topologyData.getLink()) {
-                if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
-                    transaction.removeOperationalData(linkPath(link));
-                }
+    private void removeAffectedLinks(final NodeId id) {
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+                    @Override
+                    public void onSuccess(Optional<Topology> topologyOptional) {
+                        if (topologyOptional.isPresent()) {
+                            Topology topologyData = topologyOptional.get();
+                            for (Link link : topologyData.getLink()) {
+                                if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
+                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Error reading topology data for topology {}", topology, throwable);
+                    }
+                });
             }
-        }
+        });
     }
 
-    private void removeAffectedLinks(final DataModificationTransaction transaction, final TpId id) {
-        TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction);
-        Topology topologyData = reader.readOperationalData(topology);
-        if (topologyData != null) {
-            for (Link link : topologyData.getLink()) {
-                if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
-                    transaction.removeOperationalData(linkPath(link));
-                }
+    private void removeAffectedLinks(final TpId id) {
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+                    @Override
+                    public void onSuccess(Optional<Topology> topologyOptional) {
+                        if (topologyOptional.isPresent()) {
+                            Topology topologyData = topologyOptional.get();
+                            for (Link link : topologyData.getLink()) {
+                                if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
+                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Error reading topology data for topology {}", topology, throwable);
+                    }
+                });
             }
-        }
+        });
     }
 
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
index a87971bc6bc0e1d6a294c30a08dfb73cf81484be..0a3b9f6a6b7c53c193cd3fd523504d4e729df180 100644 (file)
@@ -8,12 +8,12 @@
 package org.opendaylight.md.controller.topology.manager;
 
 import java.util.concurrent.ExecutionException;
-
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -38,7 +38,7 @@ public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider im
      */
     @Override
     public synchronized void onSessionInitiated(final ProviderContext session) {
-        final DataProviderService dataService = session.getSALService(DataProviderService.class);
+        final DataBroker dataBroker = session.getSALService(DataBroker.class);
         final NotificationProviderService notificationService = session.getSALService(NotificationProviderService.class);
 
         final String name = "flow:1";
@@ -48,14 +48,14 @@ public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider im
                 .child(Topology.class, key)
                 .build();
 
-        final OperationProcessor processor = new OperationProcessor(dataService);
+        final OperationProcessor processor = new OperationProcessor(dataBroker);
         final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path);
         this.listenerRegistration = notificationService.registerNotificationListener(listener);
 
-        final DataModificationTransaction tx = dataService.beginTransaction();
-        tx.putOperationalData(path, new TopologyBuilder().setKey(key).build());
+        final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
+        tx.put(LogicalDatastoreType.OPERATIONAL, path, new TopologyBuilder().setKey(key).build());
         try {
-            tx.commit().get();
+            tx.submit().get();
         } catch (InterruptedException | ExecutionException e) {
             LOG.warn("Initial topology export failed, continuing anyway", e);
         }
@@ -87,8 +87,7 @@ public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider im
     /**
      * Gets called during stop bundle
      *
-     * @param context
-     *            The execution context of the bundle being stopped.
+     * @param context The execution context of the bundle being stopped.
      */
     @Override
     public void stopImpl(final BundleContext context) {
index d60c88032dbcc7015fc064a791ca9a16921d7332..3800413eb1b9964d00207f526177862eb69c5885 100644 (file)
@@ -7,30 +7,27 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 final class OperationProcessor implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
     private static final int OPERATION_QUEUE_DEPTH = 500;
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
-    // FIXME: Flow capable topology exporter should use transaction chaining API
-    private final DataProviderService dataService;
+    private final DataBroker dataBroker;
 
-    OperationProcessor(final DataProviderService dataService) {
-        this.dataService = Preconditions.checkNotNull(dataService);
+    OperationProcessor(final DataBroker dataBroker) {
+        this.dataBroker = Preconditions.checkNotNull(dataBroker);
     }
 
     void enqueueOperation(final TopologyOperation task) {
@@ -44,11 +41,11 @@ final class OperationProcessor implements Runnable {
     @Override
     public void run() {
         try {
-            for (;;) {
+            for (; ; ) {
                 TopologyOperation op = queue.take();
 
                 LOG.debug("New operations available, starting transaction");
-                final DataModificationTransaction tx = dataService.beginTransaction();
+                final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
 
                 int ops = 0;
                 do {
@@ -64,14 +61,18 @@ final class OperationProcessor implements Runnable {
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                try {
-                    final RpcResult<TransactionStatus> s = tx.commit().get();
-                    if (!s.isSuccessful()) {
-                        LOG.error("Topology export failed for Tx:{}", tx.getIdentifier());
+                final CheckedFuture txResultFuture = tx.submit();
+                Futures.addCallback(txResultFuture, new FutureCallback() {
+                    @Override
+                    public void onSuccess(Object o) {
+                        LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
                     }
-                } catch (ExecutionException e) {
-                    LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause());
-                }
+                });
             }
         } catch (InterruptedException e) {
             LOG.info("Interrupted processing, terminating", e);
index 29d06beade946a1a0bf341917a3741097b5ce980..bbb8a74b03ce4a8701c301bd74fafcd14437465e 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 
 /**
  * Internal interface for submitted operations. Implementations of this
@@ -19,5 +19,5 @@ interface TopologyOperation {
      *
      * @param transaction Datastore transaction
      */
-    void applyOperation(DataModificationTransaction transaction);
+    void applyOperation(ReadWriteTransaction transaction);
 }
\ No newline at end of file