Switch to MD-SAL APIs
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SimplifiedOperationalListener.java
index 738a652e5e0e08741a0f47d37d850223e235f573..b1aa44c6e4e4c62bf8f2d93e0121115910be2179 100644 (file)
@@ -1,25 +1,35 @@
-/**
+/*
  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowplugin.applications.frsync.impl;
 
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Collection;
+import java.util.Date;
 import java.util.List;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
+import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
@@ -30,180 +40,182 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Listens to operational new nodes and delegates add/remove/update/barrier to {@link SyncReactor}.
+ * Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary.
  */
 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
-    private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
 
+    private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
+    public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
     private final SyncReactor reactor;
     private final FlowCapableNodeSnapshotDao operationalSnapshot;
     private final FlowCapableNodeDao configDao;
-
-    public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
-                                         FlowCapableNodeDao configDao) {
+    private final ReconciliationRegistry reconciliationRegistry;
+    private final DeviceMastershipManager deviceMastershipManager;
+
+    public SimplifiedOperationalListener(final SyncReactor reactor,
+                                         final FlowCapableNodeSnapshotDao operationalSnapshot,
+                                         final FlowCapableNodeDao configDao,
+                                         final ReconciliationRegistry reconciliationRegistry,
+                                         final DeviceMastershipManager deviceMastershipManager) {
         this.reactor = reactor;
         this.operationalSnapshot = operationalSnapshot;
         this.configDao = configDao;
+        this.reconciliationRegistry = reconciliationRegistry;
+        this.deviceMastershipManager = deviceMastershipManager;
     }
 
     @Override
-    public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
-        LOG.trace("Inventory Operational changes {}", modifications.size());
+    public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> modifications) {
         super.onDataTreeChanged(modifications);
     }
 
     /**
-     * This method behaves like this:
-     * <ul>
-     * <li>If node is added to operational store then reconciliation.</li>
-     * <li>Node is deleted from operational cache is removed.</li>
-     * <li>Skip this event otherwise.</li>
-     * </ul>
-     *
-     * @throws InterruptedException from syncup
+     * Update cache, register for device mastership when device connected and start reconciliation if device
+     * is registered and actual modification is consistent.Skip the event otherwise.
      */
+    @Override
     protected Optional<ListenableFuture<Boolean>> processNodeModification(
-            DataTreeModification<Node> modification) throws InterruptedException {
-
-        updateCache(modification);
-        if (isReconciliationNeeded(modification)) {
-            return reconciliation(modification);
+            final DataTreeModification<Node> modification) {
+        Optional<ListenableFuture<Boolean>> result;
+        final NodeId nodeId = ModificationUtil.nodeId(modification);
+        final DataObjectModification<Node> nodeModification = modification.getRootNode();
+
+        if (isDelete(nodeModification) || isDeleteLogical(nodeModification)) {
+            operationalSnapshot.updateCache(nodeId, Optional.empty());
+            deviceMastershipManager.onDeviceDisconnected(nodeId);
+            result = skipModification(modification);
+        } else {
+            operationalSnapshot.updateCache(nodeId, Optional.ofNullable(
+                    ModificationUtil.flowCapableNodeAfter(modification)));
+
+            final boolean isAdd = isAdd(nodeModification) || isAddLogical(nodeModification);
+
+            if (isAdd) {
+                deviceMastershipManager.onDeviceConnected(nodeId);
+            }
+
+            // if node is registered for reconcile we need consistent data from operational DS (skip partial
+            // collections) but we can accept first modification since all statistics are intentionally collected in
+            // one step on startup
+            if (reconciliationRegistry.isRegistered(nodeId) && (isAdd || isConsistentForReconcile(modification))) {
+                result = reconciliation(modification);
+            } else {
+                result = skipModification(modification);
+            }
         }
-        return skipModification(modification);
+        return result;
     }
 
-    /**
-     * Remove if delete. Update only if FlowCapableNode Augmentation modified.
-     *
-     * @param modification Datastore modification
-     * @return true for cache update, false for cache remove
-     */
-    protected boolean updateCache(DataTreeModification<Node> modification) {
-        if (isDelete(modification) || isDeleteLogical(modification)) {
-            operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
-            return false;
+    private Optional<ListenableFuture<Boolean>> skipModification(final DataTreeModification<Node> modification) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Skipping operational modification: {}, before {}, after {}",
+                    ModificationUtil.nodeIdValue(modification),
+                    modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
+                    modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
         }
-        operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
-        return true;
+        return Optional.empty();
     }
 
-    private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
-        LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}", nodeIdValue(modification),
-                modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
-                modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
-        return Optional.absent();
-    }
-
-    /**
-     * ModificationType.DELETE.
-     */
-    private boolean isDelete(DataTreeModification<Node> modification) {
-        if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
-            LOG.trace("Delete {} (physical)", nodeIdValue(modification));
-            return true;
-        }
-
-        return false;
+    private boolean isDelete(final DataObjectModification<Node> nodeModification) {
+        return Objects.nonNull(nodeModification.getDataBefore()) && Objects.isNull(nodeModification.getDataAfter());
     }
 
     /**
      * All connectors disappeared from operational store (logical delete).
      */
-    private boolean isDeleteLogical(DataTreeModification<Node> modification) {
-        final DataObjectModification<Node> rootNode = modification.getRootNode();
-        if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
-            LOG.trace("Delete {} (logical)", nodeIdValue(modification));
-            return true;
-        }
+    private boolean isDeleteLogical(final DataObjectModification<Node> nodeModification) {
+        return !safeConnectorsEmpty(nodeModification.getDataBefore())
+                && safeConnectorsEmpty(nodeModification.getDataAfter());
 
-        return false;
     }
 
-    private boolean isAdd(DataTreeModification<Node> modification) {
-        final DataObjectModification<Node> rootNode = modification.getRootNode();
-        final Node dataAfter = rootNode.getDataAfter();
-        final Node dataBefore = rootNode.getDataBefore();
-
-        final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
-        if (nodeAppearedInOperational) {
-            LOG.trace("Add {} (physical)", nodeIdValue(modification));
-        }
-        return nodeAppearedInOperational;
+    private boolean isAdd(final DataObjectModification<Node> nodeModification) {
+        return Objects.isNull(nodeModification.getDataBefore()) && Objects.nonNull(nodeModification.getDataAfter());
     }
 
     /**
      * All connectors appeared in operational store (logical add).
      */
-    private boolean isAddLogical(DataTreeModification<Node> modification) {
-        final DataObjectModification<Node> rootNode = modification.getRootNode();
-        if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
-            LOG.trace("Add {} (logical)", nodeIdValue(modification));
-            return true;
-        }
-
-        return false;
+    private boolean isAddLogical(final DataObjectModification<Node> nodeModification) {
+        return safeConnectorsEmpty(nodeModification.getDataBefore())
+                && !safeConnectorsEmpty(nodeModification.getDataAfter());
     }
 
-    protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
-        return isAdd(modification) || isAddLogical(modification);
-    }
-
-    private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
-        final NodeId nodeId = nodeId(modification);
+    /**
+     * If node is present in config DS diff between wanted configuration (in config DS) and actual device
+     * configuration (coming from operational) should be calculated and sent to device.
+     * @param modification from DS
+     * @return optional syncup future
+     */
+    private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification) {
+        final NodeId nodeId = ModificationUtil.nodeId(modification);
         final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
 
         if (nodeConfiguration.isPresent()) {
-            LOG.debug("Reconciliation: {}", nodeId.getValue());
+            LOG.debug("Reconciliation {}: {}", dsType(), nodeId.getValue());
             final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
-                    .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
-            return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification), dsType()));
+                    .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
+                    .augmentation(FlowCapableNode.class);
+            final FlowCapableNode fcOperationalNode = ModificationUtil.flowCapableNodeAfter(modification);
+            final SyncupEntry syncupEntry = new SyncupEntry(nodeConfiguration.get(), LogicalDatastoreType.CONFIGURATION,
+                                                            fcOperationalNode, dsType());
+            return Optional.of(reactor.syncup(nodePath, syncupEntry));
         } else {
+            LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
+            reconciliationRegistry.unregisterIfRegistered(nodeId);
             return skipModification(modification);
         }
     }
 
-    private static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
-        final Node dataAfter = modification.getRootNode().getDataAfter();
-        if (dataAfter == null) {
-            return null;
-        }
-        return dataAfter.getAugmentation(FlowCapableNode.class);
-    }
+    /**
+     * Check if modification is consistent for reconciliation. We need fresh data, which means that current statistics
+     * were collected after registration for reconcile and whole bunch of statistics was collected successfully.
+     * @param modification from DS
+     * @return status of modification
+     */
+    private boolean isConsistentForReconcile(final DataTreeModification<Node> modification) {
+        final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
+        final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
+                .augmentation(FlowCapableStatisticsGatheringStatus.class);
 
-    private static boolean safeConnectorsEmpty(Node node) {
-        if (node == null) {
-            return true;
+        if (gatheringStatus == null) {
+            LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
+            return false;
         }
 
-        final List<NodeConnector> nodeConnectors = node.getNodeConnector();
-
-        return nodeConnectors == null || nodeConnectors.isEmpty();
-    }
-
-    private static String nodeIdValue(DataTreeModification<Node> modification) {
-        final NodeId nodeId = nodeId(modification);
+        final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
 
-        if (nodeId == null) {
-            return null;
+        if (gatheringStatusEnd == null) {
+            LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
+            return false;
         }
 
-        return nodeId.getValue();
-    }
-
-    static NodeId nodeId(DataTreeModification<Node> modification) {
-        final DataObjectModification<Node> rootNode = modification.getRootNode();
-        final Node dataAfter = rootNode.getDataAfter();
-
-        if (dataAfter != null) {
-            return dataAfter.getId();
+        if (!gatheringStatusEnd.isSucceeded()) {
+            LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
+            return false;
         }
 
-        final Node dataBefore = rootNode.getDataBefore();
-        if (dataBefore != null) {
-            return dataBefore.getId();
+        try {
+            Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
+            final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
+            Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
+            if (timestampOfStatistics.after(timestampOfRegistration)) {
+                LOG.debug("Fresh operational present: {}", nodeId.getValue());
+                return true;
+            }
+        } catch (ParseException e) {
+            LOG.warn("Timestamp parsing error {}", e);
         }
+        LOG.debug("Fresh operational not present: {}", nodeId.getValue());
+        return false;
+    }
 
-        return null;
+    private static boolean safeConnectorsEmpty(final Node node) {
+        if (node == null) {
+            return true;
+        }
+        final List<NodeConnector> nodeConnectors = node.getNodeConnector();
+        return nodeConnectors == null || nodeConnectors.isEmpty();
     }
 
     @Override