*/
package org.opendaylight.openflowplugin.applications.topology.manager;
+import java.util.concurrent.Callable;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public abstract class DataChangeListenerImpl implements DataChangeListener, AutoCloseable {
+public abstract class DataChangeListenerImpl<T extends DataObject> implements DataTreeChangeListener<T>, AutoCloseable {
- private final static Logger LOG = LoggerFactory.getLogger(DataChangeListenerImpl.class);
- protected final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration;
+ private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerImpl.class);
+ private static final long STARTUP_LOOP_TICK = 500L;
+ private static final int STARTUP_LOOP_MAX_RETRIES = 8;
+ protected final ListenerRegistration<DataTreeChangeListener> dataChangeListenerRegistration;
protected OperationProcessor operationProcessor;
/**
*/
protected static final InstanceIdentifier<Topology> II_TO_TOPOLOGY =
InstanceIdentifier
- .builder(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(new TopologyId(FlowCapableTopologyProvider.TOPOLOGY_ID)))
- .build();
+ .create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(new TopologyId(FlowCapableTopologyProvider.TOPOLOGY_ID)));
-
- /**
- *
- */
- public DataChangeListenerImpl(final OperationProcessor operationProcessor, final DataBroker dataBroker,
- final InstanceIdentifier<?> ii) {
- dataChangeListenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, ii,
- this, AsyncDataBroker.DataChangeScope.BASE);
+ public DataChangeListenerImpl(final OperationProcessor operationProcessor,
+ final DataBroker dataBroker,
+ final InstanceIdentifier<T> ii) {
+ final DataTreeIdentifier<T> identifier = new DataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, ii);
+ final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+ try {
+ dataChangeListenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<DataTreeChangeListener>>() {
+ @Override
+ public ListenerRegistration<DataTreeChangeListener> call() throws Exception {
+ return dataBroker.registerDataTreeChangeListener(identifier, DataChangeListenerImpl.this);
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Data listener registration failed!");
+ throw new IllegalStateException("TopologyManager startup fail! TM bundle needs restart.", e);
+ }
this.operationProcessor = operationProcessor;
}
dataChangeListenerRegistration.close();
}
- protected <T extends DataObject> void sendToTransactionChain(final T node,
- final InstanceIdentifier<T> iiToTopologyNode) {
+ protected <T extends DataObject> void sendToTransactionChain(final T node, final InstanceIdentifier<T> iiToTopologyNode) {
operationProcessor.enqueueOperation(new TopologyOperation() {
-
@Override
public void applyOperation(ReadWriteTransaction transaction) {
transaction.merge(LogicalDatastoreType.OPERATIONAL, iiToTopologyNode, node, true);
nodeKeyInTopology).build();
}
- protected NodeId provideTopologyNodeId(InstanceIdentifier<?> iiToNodeInInventory) {
+ protected NodeId provideTopologyNodeId(InstanceIdentifier<T> iiToNodeInInventory) {
final NodeKey inventoryNodeKey = iiToNodeInInventory.firstKeyOf(Node.class, NodeKey.class);
if (inventoryNodeKey != null) {
return new NodeId(inventoryNodeKey.getId().getValue());