import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.l2switch.loopremover.util.InstanceIdentifierUtils;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.l2switch.loopremover.rev140714.StpStatusAwareNodeConnectorBuilder;
public class TopologyLinkDataChangeHandler implements DataChangeListener {
private static final Logger _logger = LoggerFactory.getLogger(TopologyLinkDataChangeHandler.class);
private static final String DEFAULT_TOPOLOGY_ID = "flow:1";
+ private static final long DEFAULT_GRAPH_REFRESH_DELAY = 1000;
private final ScheduledExecutorService topologyDataChangeEventProcessor = Executors.newScheduledThreadPool(1);
private final NetworkGraphService networkGraphService;
private boolean networkGraphRefreshScheduled = false;
- private final long DEFAULT_GRAPH_REFRESH_DELAY = 10;
+ private boolean threadReschedule = false;
+ private long graphRefreshDelay;
+ private String topologyId;
private final DataBroker dataBroker;
- boolean doneOnce = false;
- /**
- * Uses default delay to refresh topology graph if this constructor is used.
- *
- * @param dataBroker
- * @param networkGraphService
- */
public TopologyLinkDataChangeHandler(DataBroker dataBroker, NetworkGraphService networkGraphService) {
Preconditions.checkNotNull(dataBroker, "dataBroker should not be null.");
Preconditions.checkNotNull(networkGraphService, "networkGraphService should not be null.");
this.networkGraphService = networkGraphService;
}
+ public void setGraphRefreshDelay(long graphRefreshDelay) {
+ if (graphRefreshDelay < 0) {
+ this.graphRefreshDelay = DEFAULT_GRAPH_REFRESH_DELAY;
+ }
+ else this.graphRefreshDelay = graphRefreshDelay;
+ }
+
+ public void setTopologyId(String topologyId) {
+ if (topologyId == null || topologyId.isEmpty()) {
+ this.topologyId = DEFAULT_TOPOLOGY_ID;
+ }
+ else this.topologyId = topologyId;
+ }
+
/**
* Registers as a data listener to receive changes done to
* {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link}
* under {@link org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology}
* operation data root.
*/
-
public ListenerRegistration<DataChangeListener> registerAsDataChangeListener() {
InstanceIdentifier<Link> linkInstance = InstanceIdentifier.builder(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(new TopologyId(DEFAULT_TOPOLOGY_ID))).child(Link.class).toInstance();
+ .child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Link.class).toInstance();
return dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, linkInstance, this, AsyncDataBroker.DataChangeScope.BASE);
}
+ /**
+ * Handler for onDataChanged events and schedules the building of the network graph.
+ * @param dataChangeEvent The data change event to process.
+ */
@Override
public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
if(dataChangeEvent == null) {
if(!networkGraphRefreshScheduled) {
synchronized(this) {
if(!networkGraphRefreshScheduled) {
- topologyDataChangeEventProcessor.schedule(new TopologyDataChangeEventProcessor(), DEFAULT_GRAPH_REFRESH_DELAY, TimeUnit.SECONDS);
+ topologyDataChangeEventProcessor.schedule(new TopologyDataChangeEventProcessor(), graphRefreshDelay, TimeUnit.MILLISECONDS);
networkGraphRefreshScheduled = true;
_logger.debug("Scheduled Graph for refresh.");
}
}
} else {
_logger.debug("Already scheduled for network graph refresh.");
+ threadReschedule = true;
}
}
@Override
public void run() {
+ if (threadReschedule) {
+ topologyDataChangeEventProcessor.schedule(this, graphRefreshDelay, TimeUnit.MILLISECONDS);
+ threadReschedule = false;
+ return;
+ }
_logger.debug("In network graph refresh thread.");
networkGraphRefreshScheduled = false;
networkGraphService.clear();
return;
}
networkGraphService.addLinks(links);
- ReadWriteTransaction readWriteTransaction = dataBroker.newReadWriteTransaction();
+ final ReadWriteTransaction readWriteTransaction = dataBroker.newReadWriteTransaction();
updateNodeConnectorStatus(readWriteTransaction);
- readWriteTransaction.submit();
+ final CheckedFuture writeTxResultFuture = readWriteTransaction.submit();
+ Futures.addCallback(writeTxResultFuture, new FutureCallback() {
+ @Override
+ public void onSuccess(Object o) {
+ _logger.debug("TopologyLinkDataChangeHandler write successful for tx :{}", readWriteTransaction.getIdentifier());
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ _logger.error("TopologyLinkDataChangeHandler write transaction {} failed", readWriteTransaction.getIdentifier(), throwable.getCause());
+ }
+ });
_logger.debug("Done with network graph refresh thread.");
}
private List<Link> getLinksFromTopology() {
- InstanceIdentifier<Topology> topologyInstanceIdentifier = InstanceIdentifierUtils.generateTopologyInstanceIdentifier(DEFAULT_TOPOLOGY_ID);
+ InstanceIdentifier<Topology> topologyInstanceIdentifier = InstanceIdentifierUtils.generateTopologyInstanceIdentifier(topologyId);
Topology topology = null;
ReadOnlyTransaction readOnlyTransaction = dataBroker.newReadOnlyTransaction();
try {
return null;
}
List<Link> internalLinks = new ArrayList<>();
- for(Link link:links) {
+ for(Link link : links) {
if(!(link.getLinkId().getValue().contains("host"))) {
internalLinks.add(link);
}
if(sameStatusPresent(nc.getAugmentation(StpStatusAwareNodeConnector.class), stpStatusAwareNodeConnector.getStatus())) {
return;
}
- nodeConnectorBuilder = new NodeConnectorBuilder(nc)
- .setKey(nc.getKey())
- .addAugmentation(StpStatusAwareNodeConnector.class, stpStatusAwareNodeConnector);
- readWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, (InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue(), nodeConnectorBuilder.build());
- _logger.debug("Updated node connector in operational {}", nodeConnectorRef);
- } else {
- NodeConnectorKey nodeConnectorKey = InstanceIdentifierUtils.getNodeConnectorKey(nodeConnectorRef.getValue());
- nodeConnectorBuilder = new NodeConnectorBuilder()
- .setKey(nodeConnectorKey)
- .setId(nodeConnectorKey.getId())
- .addAugmentation(StpStatusAwareNodeConnector.class, stpStatusAwareNodeConnector);
- nc = nodeConnectorBuilder.build();
- checkIfExistAndUpdateNode(readWriteTransaction, nodeConnectorRef, nc);
+ //build instance id for StpStatusAwareNodeConnector
+ InstanceIdentifier<StpStatusAwareNodeConnector> stpStatusAwareNcInstanceId =
+ ((InstanceIdentifier<NodeConnector>) nodeConnectorRef.getValue())
+ .augmentation(StpStatusAwareNodeConnector.class);
+ //update StpStatusAwareNodeConnector in operational store
+ readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
+ _logger.debug("Merged Stp Status aware node connector in operational {} with status {}", stpStatusAwareNcInstanceId, stpStatusAwareNodeConnector);
+ } else {
+ _logger.error("Unable to update Stp Status node connector {} note present in operational store", nodeConnectorRef.getValue());
}
}
return true;
}
-
- /**
- * @param readWriteTransaction
- * @param nodeConnectorRef
- * @param nc
- */
- private void checkIfExistAndUpdateNode(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef, NodeConnector nc) {
- Node node = null;
- InstanceIdentifier<Node> nodeInstanceIdentifier = InstanceIdentifierUtils.generateNodeInstanceIdentifier(nodeConnectorRef);
- try {
- Optional<Node> dataObjectOptional = readWriteTransaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstanceIdentifier).get();
- if(dataObjectOptional.isPresent())
- node = (Node) dataObjectOptional.get();
- } catch(Exception e) {
- _logger.error("Error reading node {}", nodeInstanceIdentifier);
- readWriteTransaction.submit();
- throw new RuntimeException("Error reading from operational store, node : " + nodeInstanceIdentifier, e);
- }
- if(node != null) {
- List<NodeConnector> nodeConnectors = node.getNodeConnector();
- if(nodeConnectors == null) {
- nodeConnectors = new ArrayList<>();
- }
- nodeConnectors.add(nc);
- NodeBuilder nodeBuilder = new NodeBuilder(node)
- .setNodeConnector(nodeConnectors);
- node = nodeBuilder.build();
- readWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, nodeInstanceIdentifier, node);
- _logger.debug("Updated node {} in operational store with node id {}", node, nodeInstanceIdentifier);
- } else {
- NodeKey nodeKey = nodeConnectorRef.getValue().firstKeyOf(Node.class, NodeKey.class);
- List<NodeConnector> nodeConnectors = new ArrayList<>();
- nodeConnectors.add(nc);
- NodeBuilder nodeBuilder = new NodeBuilder()
- .setKey(nodeKey)
- .setId(nodeKey.getId())
- .setNodeConnector(nodeConnectors);
- node = nodeBuilder.build();
-
- checkIfExistsAndUpdateNodes(readWriteTransaction, nodeConnectorRef, node);
- }
- }
-
- /**
- * @param readWriteTransaction
- * @param nodeConnectorRef
- * @param node
- */
- private void checkIfExistsAndUpdateNodes(ReadWriteTransaction readWriteTransaction, NodeConnectorRef nodeConnectorRef, Node node) {
-
- List<Node> nodesList = null;
-
- Nodes nodes = null;
- InstanceIdentifier<Nodes> nodesInstanceIdentifier = nodeConnectorRef.getValue().firstIdentifierOf(Nodes.class);
- try {
- Optional<Nodes> dataObjectOptional = readWriteTransaction.read(LogicalDatastoreType.OPERATIONAL, nodesInstanceIdentifier).get();
- if(dataObjectOptional.isPresent())
- nodes = (Nodes) dataObjectOptional.get();
- } catch(Exception e) {
- _logger.error("Error reading nodes {}", nodesInstanceIdentifier);
- readWriteTransaction.submit();
- throw new RuntimeException("Error reading from operational store, nodes : " + nodesInstanceIdentifier, e);
- }
- if(nodes != null) {
- nodesList = nodes.getNode();
- }
- if(nodesList == null) {
- nodesList = new ArrayList<>();
- }
- nodesList.add(node);
- NodesBuilder nodesBuilder = new NodesBuilder()
- .setNode(nodesList);
- nodes = nodesBuilder.build();
- readWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, nodesInstanceIdentifier, nodes);
- _logger.debug("Updated nodes {} in operational store with nodes id {}", nodes, nodesInstanceIdentifier);
- }
}
}