X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Ftopology-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ftopology%2Fmanager%2FFlowCapableTopologyProvider.java;h=83942007b8f9a2503db44d32adf31014509e5ddc;hb=4016b1ab46a0df2c1e46f2907bdc27f273988d92;hp=558210d182371ae14b2c02d3cfa5e0d689fdf6cd;hpb=afd8072373d6685281b4497b1c5a5bf464529b8f;p=openflowplugin.git diff --git a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java index 558210d182..83942007b8 100644 --- a/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java +++ b/applications/topology-manager/src/main/java/org/opendaylight/openflowplugin/applications/topology/manager/FlowCapableTopologyProvider.java @@ -7,14 +7,12 @@ */ package org.opendaylight.openflowplugin.applications.topology.manager; -import java.util.concurrent.ExecutionException; - +import com.google.common.base.Optional; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; @@ -23,94 +21,82 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.NotificationListener; -import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlowCapableTopologyProvider extends AbstractBindingAwareProvider implements AutoCloseable { - private final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class); - private ListenerRegistration listenerRegistration; - private Thread thread; - private TerminationPointChangeListenerImpl terminationPointChangeListener; - private NodeChangeListenerImpl nodeChangeListener; +public class FlowCapableTopologyProvider implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyProvider.class); + private static final String TOPOLOGY_PROVIDER = "topology-provider"; static final String TOPOLOGY_ID = "flow:1"; + + private final DataBroker dataBroker; + private final NotificationProviderService notificationService; + private final OperationProcessor processor; + private TransactionChainManager transactionChainManager; + private ListenerRegistration listenerRegistration; + + public FlowCapableTopologyProvider(DataBroker dataBroker, NotificationProviderService notificationService, + OperationProcessor processor) { + this.dataBroker = dataBroker; + this.notificationService = notificationService; + this.processor = processor; + } + /** * Gets called on start of a bundle. - * - * @param session */ - @Override - public synchronized void onSessionInitiated(final ProviderContext session) { - final DataBroker dataBroker = session.getSALService(DataBroker.class); - final NotificationProviderService notificationService = session.getSALService(NotificationProviderService.class); - + public void start() { final TopologyKey key = new TopologyKey(new TopologyId(TOPOLOGY_ID)); final InstanceIdentifier path = InstanceIdentifier .create(NetworkTopology.class) .child(Topology.class, key); - final OperationProcessor processor = new OperationProcessor(dataBroker); final FlowCapableTopologyExporter listener = new FlowCapableTopologyExporter(processor, path); this.listenerRegistration = notificationService.registerNotificationListener(listener); - this.terminationPointChangeListener = new TerminationPointChangeListenerImpl(dataBroker, processor); - nodeChangeListener = new NodeChangeListenerImpl(dataBroker, processor); + this.transactionChainManager = new TransactionChainManager(dataBroker, TOPOLOGY_PROVIDER); + this.transactionChainManager.activateTransactionManager(); + this.transactionChainManager.initialSubmitWriteTransaction(); - final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction(); - tx.put(LogicalDatastoreType.OPERATIONAL, path, new TopologyBuilder().setKey(key).build(), true); - try { - tx.submit().get(); - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Initial topology export failed, continuing anyway", e); + if(!isFlowTopologyExist(path)){ + transactionChainManager.writeToTransaction( + LogicalDatastoreType.OPERATIONAL, + path, + new TopologyBuilder().setKey(key).build(), + true); + transactionChainManager.submitTransaction(); } - thread = new Thread(processor); - thread.setDaemon(true); - thread.setName("FlowCapableTopologyExporter-" + TOPOLOGY_ID); - thread.start(); + LOG.info("FlowCapableTopologyProvider started"); } @Override - public synchronized void close() throws InterruptedException { + public void close() { LOG.info("FlowCapableTopologyProvider stopped."); + this.transactionChainManager.close(); if (this.listenerRegistration != null) { try { this.listenerRegistration.close(); } catch (Exception e) { - LOG.error("Failed to close listener registration", e); + LOG.warn("Failed to close listener registration: {}", e.getMessage()); + LOG.debug("Failed to close listener registration.. ", e); } listenerRegistration = null; } - unregisterListener(terminationPointChangeListener); - unregisterListener(nodeChangeListener); - if (thread != null) { - thread.interrupt(); - thread.join(); - thread = null; - } } - private void unregisterListener(final AutoCloseable listenerToClose) { - if (listenerToClose != null) { - try { - listenerToClose.close(); - } catch (Exception e) { - LOG.error("Failed to close listener registration", e); - } - } - } - - /** - * Gets called during stop bundle - * - * @param context The execution context of the bundle being stopped. - */ - @Override - public void stopImpl(final BundleContext context) { + private boolean isFlowTopologyExist(final InstanceIdentifier path) { try { - this.close(); - } catch (InterruptedException e) { - LOG.error("Failed to stop provider", e); + Optional ofTopology = this.transactionChainManager + .readFromTransaction(LogicalDatastoreType.OPERATIONAL, path) + .checkedGet(); + LOG.debug("OpenFlow topology exist in the operational data store at {}",path); + if(ofTopology.isPresent()){ + return true; + } + } catch (ReadFailedException e) { + LOG.warn("OpenFlow topology read operation failed!", e); } + return false; } }