Switch to MD-SAL APIs
[openflowplugin.git] / applications / lldp-speaker / src / main / java / org / opendaylight / openflowplugin / applications / lldpspeaker / NodeConnectorInventoryEventTranslator.java
index 71b3b3e6ce7c6c909e238e7b6fb77c16397aaa76..0e5c97f4a61e49ce70185b527422a8a8e3f6e7be 100644 (file)
@@ -5,25 +5,27 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.lldpspeaker;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.flow.capable.port.State;
+import java.util.Collection;
 import java.util.HashMap;
-import com.google.common.collect.ImmutableSet;
 import java.util.Map;
 import java.util.Set;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortConfig;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.flow.capable.port.State;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -33,133 +35,151 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * NodeConnectorInventoryEventTranslator is listening for changes in inventory operational DOM tree
  * and update LLDPSpeaker and topology.
  */
-public class NodeConnectorInventoryEventTranslator implements DataChangeListener, AutoCloseable {
-    /**
-     *
-     */
-    private static final InstanceIdentifier<State> II_TO_STATE 
-        = InstanceIdentifier.builder(Nodes.class)
-            .child(Node.class)
-            .child(NodeConnector.class)
-            .augmentation(FlowCapableNodeConnector.class)
-            .child(State.class)
-            .build();
+public class NodeConnectorInventoryEventTranslator<T extends DataObject>
+        implements ClusteredDataTreeChangeListener<T>, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorInventoryEventTranslator.class);
+
+    private static final InstanceIdentifier<State> II_TO_STATE = InstanceIdentifier.builder(Nodes.class)
+            .child(Node.class).child(NodeConnector.class).augmentation(FlowCapableNodeConnector.class)
+            .child(State.class).build();
 
     private static final InstanceIdentifier<FlowCapableNodeConnector> II_TO_FLOW_CAPABLE_NODE_CONNECTOR
-        = InstanceIdentifier.builder(Nodes.class)
-            .child(Node.class)
-            .child(NodeConnector.class)
-            .augmentation(FlowCapableNodeConnector.class)
-            .build();
+            = InstanceIdentifier.builder(Nodes.class).child(Node.class).child(NodeConnector.class)
+            .augmentation(FlowCapableNodeConnector.class).build();
 
-    private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorInventoryEventTranslator.class);
+    private static final long STARTUP_LOOP_TICK = 500L;
+    private static final int STARTUP_LOOP_MAX_RETRIES = 8;
 
-    private final ListenerRegistration<DataChangeListener> dataChangeListenerRegistration;
-    private final ListenerRegistration<DataChangeListener> listenerOnPortStateRegistration;
+    private final ListenerRegistration<DataTreeChangeListener> listenerOnPortRegistration;
+    private final ListenerRegistration<DataTreeChangeListener> listenerOnPortStateRegistration;
     private final Set<NodeConnectorEventsObserver> observers;
-    private final Map<InstanceIdentifier<?>,FlowCapableNodeConnector> iiToDownFlowCapableNodeConnectors = new HashMap<>();
+    private final Map<InstanceIdentifier<?>, FlowCapableNodeConnector> iiToDownFlowCapableNodeConnectors
+            = new HashMap<>();
 
+    @SuppressWarnings("IllegalCatch")
     public NodeConnectorInventoryEventTranslator(DataBroker dataBroker, NodeConnectorEventsObserver... observers) {
         this.observers = ImmutableSet.copyOf(observers);
-        dataChangeListenerRegistration = dataBroker.registerDataChangeListener(
-                LogicalDatastoreType.OPERATIONAL,
-                II_TO_FLOW_CAPABLE_NODE_CONNECTOR,
-                this, AsyncDataBroker.DataChangeScope.BASE);
-        listenerOnPortStateRegistration = dataBroker.registerDataChangeListener(
-                LogicalDatastoreType.OPERATIONAL,
-                II_TO_STATE,
-                this, AsyncDataBroker.DataChangeScope.SUBTREE);
+        final DataTreeIdentifier dtiToNodeConnector = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
+                                                                                   II_TO_FLOW_CAPABLE_NODE_CONNECTOR);
+        final DataTreeIdentifier dtiToNodeConnectorState = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
+                                                                                   II_TO_STATE);
+        final SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
+        try {
+            listenerOnPortRegistration = looper.loopUntilNoException(() ->
+                    dataBroker.registerDataTreeChangeListener(dtiToNodeConnector,
+                            NodeConnectorInventoryEventTranslator.this));
+            listenerOnPortStateRegistration = looper.loopUntilNoException(() ->
+                    dataBroker.registerDataTreeChangeListener(dtiToNodeConnectorState,
+                            NodeConnectorInventoryEventTranslator.this));
+        } catch (Exception e) {
+            LOG.error("DataTreeChangeListeners registration failed: {}", e);
+            throw new IllegalStateException("NodeConnectorInventoryEventTranslator startup failed!", e);
+        }
+        LOG.info("NodeConnectorInventoryEventTranslator has started.");
     }
 
     @Override
     public void close() {
-        dataChangeListenerRegistration.close();
-        listenerOnPortStateRegistration.close();
+        if (listenerOnPortRegistration != null) {
+            listenerOnPortRegistration.close();
+        }
+        if (listenerOnPortStateRegistration != null) {
+            listenerOnPortStateRegistration.close();
+        }
     }
 
     @Override
-    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
-        LOG.trace("Node connectors in inventory changed: {} created, {} updated, {} removed",
-                change.getCreatedData().size(), change.getUpdatedData().size(), change.getRemovedPaths().size());
-
-        // Iterate over created node connectors
-        for (Map.Entry<InstanceIdentifier<?>, DataObject> entry : change.getCreatedData().entrySet()) {
-            InstanceIdentifier<NodeConnector> nodeConnectorInstanceId =
-                    entry.getKey().firstIdentifierOf(NodeConnector.class);
-            if (compareIITail(entry.getKey(),II_TO_FLOW_CAPABLE_NODE_CONNECTOR)) {
-                FlowCapableNodeConnector flowConnector = (FlowCapableNodeConnector) entry.getValue();
-                if (!isPortDown(flowConnector)) {
-                    notifyNodeConnectorAppeared(nodeConnectorInstanceId, flowConnector);
-                } else {
-                    iiToDownFlowCapableNodeConnectors.put(nodeConnectorInstanceId, flowConnector);
-                }
+    public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<T>> modifications) {
+        for (DataTreeModification modification : modifications) {
+            LOG.trace("Node connectors in inventory changed -> {}", modification.getRootNode().getModificationType());
+            switch (modification.getRootNode().getModificationType()) {
+                case WRITE:
+                    processAddedConnector(modification);
+                    break;
+                case SUBTREE_MODIFIED:
+                    processUpdatedConnector(modification);
+                    break;
+                case DELETE:
+                    processRemovedConnector(modification);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            "Unhandled modification type: {}" + modification.getRootNode().getModificationType());
             }
         }
+    }
 
-        // Iterate over updated node connectors (port down state may change)
-        for (Map.Entry<InstanceIdentifier<?>, DataObject> entry : change.getUpdatedData().entrySet()) {
-            InstanceIdentifier<NodeConnector> nodeConnectorInstanceId =
-                    entry.getKey().firstIdentifierOf(NodeConnector.class);
-            if (compareIITail(entry.getKey(),II_TO_FLOW_CAPABLE_NODE_CONNECTOR)) {
-                FlowCapableNodeConnector flowConnector = (FlowCapableNodeConnector) entry.getValue();
-                if (isPortDown(flowConnector)) {
-                    notifyNodeConnectorDisappeared(nodeConnectorInstanceId);
-                } else {
-                    notifyNodeConnectorAppeared(nodeConnectorInstanceId, flowConnector);
-                }
-            } else if (compareIITail(entry.getKey(),II_TO_STATE)) {
-                FlowCapableNodeConnector flowNodeConnector = iiToDownFlowCapableNodeConnectors.get(nodeConnectorInstanceId);
-                if (flowNodeConnector != null) {
-                    State state = (State)entry.getValue();
-                    if (!state.isLinkDown()) {
-                        FlowCapableNodeConnectorBuilder flowCapableNodeConnectorBuilder = new FlowCapableNodeConnectorBuilder(flowNodeConnector);
-                        flowCapableNodeConnectorBuilder.setState(state);
-                        notifyNodeConnectorAppeared(nodeConnectorInstanceId, flowCapableNodeConnectorBuilder.build());
-                        iiToDownFlowCapableNodeConnectors.remove(nodeConnectorInstanceId);
-                    }
-                }
+    private void processAddedConnector(final DataTreeModification<T> modification) {
+        final InstanceIdentifier<T> identifier = modification.getRootPath().getRootIdentifier();
+        InstanceIdentifier<NodeConnector> nodeConnectorInstanceId = identifier.firstIdentifierOf(NodeConnector.class);
+        if (compareIITail(identifier, II_TO_FLOW_CAPABLE_NODE_CONNECTOR)) {
+            FlowCapableNodeConnector flowConnector = (FlowCapableNodeConnector) modification.getRootNode()
+                    .getDataAfter();
+            if (!isPortDown(flowConnector)) {
+                notifyNodeConnectorAppeared(nodeConnectorInstanceId, flowConnector);
+            } else {
+                iiToDownFlowCapableNodeConnectors.put(nodeConnectorInstanceId, flowConnector);
             }
         }
+    }
 
-        // Iterate over removed node connectors
-        for (InstanceIdentifier<?> removed : change.getRemovedPaths()) {
-            if (compareIITail(removed,II_TO_FLOW_CAPABLE_NODE_CONNECTOR)) {
-                InstanceIdentifier<NodeConnector> nodeConnectorInstanceId = removed.firstIdentifierOf(NodeConnector.class);
+    private void processUpdatedConnector(final DataTreeModification<T> modification) {
+        final InstanceIdentifier<T> identifier = modification.getRootPath().getRootIdentifier();
+        InstanceIdentifier<NodeConnector> nodeConnectorInstanceId = identifier.firstIdentifierOf(NodeConnector.class);
+        if (compareIITail(identifier, II_TO_FLOW_CAPABLE_NODE_CONNECTOR)) {
+            FlowCapableNodeConnector flowConnector = (FlowCapableNodeConnector) modification.getRootNode()
+                    .getDataAfter();
+            if (isPortDown(flowConnector)) {
                 notifyNodeConnectorDisappeared(nodeConnectorInstanceId);
+            } else {
+                notifyNodeConnectorAppeared(nodeConnectorInstanceId, flowConnector);
+            }
+        } else if (compareIITail(identifier, II_TO_STATE)) {
+            FlowCapableNodeConnector flowNodeConnector = iiToDownFlowCapableNodeConnectors.get(nodeConnectorInstanceId);
+            if (flowNodeConnector != null) {
+                State state = (State) modification.getRootNode().getDataAfter();
+                if (!state.isLinkDown()) {
+                    FlowCapableNodeConnectorBuilder flowCapableNodeConnectorBuilder
+                            = new FlowCapableNodeConnectorBuilder(flowNodeConnector);
+                    flowCapableNodeConnectorBuilder.setState(state);
+                    notifyNodeConnectorAppeared(nodeConnectorInstanceId, flowCapableNodeConnectorBuilder.build());
+                    iiToDownFlowCapableNodeConnectors.remove(nodeConnectorInstanceId);
+                }
             }
         }
     }
 
-    /**
-     * @param key
-     * @param iiToFlowCapableNodeConnector
-     * @return
-     */
-    private boolean compareIITail(InstanceIdentifier<?> ii1,
-            InstanceIdentifier<?> ii2) {
+    private void processRemovedConnector(final DataTreeModification<T> modification) {
+        final InstanceIdentifier<T> identifier = modification.getRootPath().getRootIdentifier();
+        if (compareIITail(identifier, II_TO_FLOW_CAPABLE_NODE_CONNECTOR)) {
+            InstanceIdentifier<NodeConnector> nodeConnectorInstanceId = identifier
+                    .firstIdentifierOf(NodeConnector.class);
+            notifyNodeConnectorDisappeared(nodeConnectorInstanceId);
+        }
+    }
+
+    private boolean compareIITail(final InstanceIdentifier<?> ii1, final InstanceIdentifier<?> ii2) {
         return Iterables.getLast(ii1.getPathArguments()).equals(Iterables.getLast(ii2.getPathArguments()));
     }
 
-    private static boolean isPortDown(FlowCapableNodeConnector flowCapableNodeConnector) {
+    private static boolean isPortDown(final FlowCapableNodeConnector flowCapableNodeConnector) {
         PortState portState = flowCapableNodeConnector.getState();
         PortConfig portConfig = flowCapableNodeConnector.getConfiguration();
-        return portState != null && portState.isLinkDown() ||
-                portConfig != null && portConfig.isPORTDOWN();
+        return portState != null && portState.isLinkDown() || portConfig != null && portConfig.isPORTDOWN();
     }
 
-    private void notifyNodeConnectorAppeared(InstanceIdentifier<NodeConnector> nodeConnectorInstanceId,
-                                             FlowCapableNodeConnector flowConnector) {
+    private void notifyNodeConnectorAppeared(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId,
+                                             final FlowCapableNodeConnector flowConnector) {
         for (NodeConnectorEventsObserver observer : observers) {
             observer.nodeConnectorAdded(nodeConnectorInstanceId, flowConnector);
         }
     }
 
-    private void notifyNodeConnectorDisappeared(InstanceIdentifier<NodeConnector> nodeConnectorInstanceId) {
+    private void notifyNodeConnectorDisappeared(final InstanceIdentifier<NodeConnector> nodeConnectorInstanceId) {
         for (NodeConnectorEventsObserver observer : observers) {
             observer.nodeConnectorRemoved(nodeConnectorInstanceId);
         }