-/**
+/*
* Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowplugin.applications.frsync.impl;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
-import java.util.List;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import java.util.Map;
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
* Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary.
*/
public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
+
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
-
private final SyncReactor reactor;
private final FlowCapableNodeSnapshotDao operationalSnapshot;
private final FlowCapableNodeDao configDao;
}
@Override
- public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
- LOG.trace("Operational changes: {}", modifications.size());
+ public void onDataTreeChanged(@NonNull final Collection<DataTreeModification<Node>> modifications) {
super.onDataTreeChanged(modifications);
}
/**
- * Update cache, register for device masterhip when device connected and start reconciliation if device
+ * Update cache, register for device mastership when device connected and start reconciliation if device
* is registered and actual modification is consistent.Skip the event otherwise.
- * @throws InterruptedException from syncup
*/
+ @Override
protected Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<Node> modification) throws InterruptedException {
+ final DataTreeModification<Node> modification) {
+ Optional<ListenableFuture<Boolean>> result;
final NodeId nodeId = ModificationUtil.nodeId(modification);
- updateCache(modification);
+ final DataObjectModification<Node> nodeModification = modification.getRootNode();
- if (isAdd(modification) || isAddLogical(modification)) {
- deviceMastershipManager.onDeviceConnected(nodeId);
- }
-
- if (reconciliationRegistry.isRegistered(nodeId) && isConsistentForReconcile(modification)) {
- return reconciliation(modification);
+ if (isDelete(nodeModification) || isDeleteLogical(nodeModification)) {
+ operationalSnapshot.updateCache(nodeId, Optional.empty());
+ deviceMastershipManager.onDeviceDisconnected(nodeId);
+ result = skipModification(modification);
} else {
- return skipModification(modification);
- }
- }
+ operationalSnapshot.updateCache(nodeId, Optional.ofNullable(
+ ModificationUtil.flowCapableNodeAfter(modification)));
- /**
- * Remove if delete. Update only if FlowCapableNode Augmentation modified.
- * Unregister for device mastership.
- * @param modification Datastore modification
- */
- private void updateCache(DataTreeModification<Node> modification) {
- NodeId nodeId = ModificationUtil.nodeId(modification);
- if (isDelete(modification) || isDeleteLogical(modification)) {
- operationalSnapshot.updateCache(nodeId, Optional.absent());
- deviceMastershipManager.onDeviceDisconnected(nodeId);
- return;
- }
- operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
- }
+ final boolean isAdd = isAdd(nodeModification) || isAddLogical(nodeModification);
+
+ if (isAdd) {
+ deviceMastershipManager.onDeviceConnected(nodeId);
+ }
- private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
- LOG.trace("Skipping operational modification: {}, before {}, after {}",
- ModificationUtil.nodeIdValue(modification),
- modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
- modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
- return Optional.absent();
+ // if node is registered for reconcile we need consistent data from operational DS (skip partial
+ // collections) but we can accept first modification since all statistics are intentionally collected in
+ // one step on startup
+ if (reconciliationRegistry.isRegistered(nodeId) && (isAdd || isConsistentForReconcile(modification))) {
+ result = reconciliation(modification);
+ } else {
+ result = skipModification(modification);
+ }
+ }
+ return result;
}
- /**
- * ModificationType.DELETE.
- */
- private boolean isDelete(DataTreeModification<Node> modification) {
- if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
- LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
- return true;
+ private static Optional<ListenableFuture<Boolean>> skipModification(final DataTreeModification<Node> modification) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping operational modification: {}, before {}, after {}",
+ ModificationUtil.nodeIdValue(modification),
+ modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
+ modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
}
+ return Optional.empty();
+ }
- return false;
+ private static boolean isDelete(final DataObjectModification<Node> nodeModification) {
+ return nodeModification.getDataBefore() != null && nodeModification.getDataAfter() == null;
}
/**
* All connectors disappeared from operational store (logical delete).
*/
- private boolean isDeleteLogical(DataTreeModification<Node> modification) {
- final DataObjectModification<Node> rootNode = modification.getRootNode();
- if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
- return true;
- }
+ private static boolean isDeleteLogical(final DataObjectModification<Node> nodeModification) {
+ return !safeConnectorsEmpty(nodeModification.getDataBefore())
+ && safeConnectorsEmpty(nodeModification.getDataAfter());
- return false;
}
- private boolean isAdd(DataTreeModification<Node> modification) {
- final DataObjectModification<Node> rootNode = modification.getRootNode();
- final Node dataAfter = rootNode.getDataAfter();
- final Node dataBefore = rootNode.getDataBefore();
-
- final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
- if (nodeAppearedInOperational) {
- LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
- }
- return nodeAppearedInOperational;
+ private static boolean isAdd(final DataObjectModification<Node> nodeModification) {
+ return nodeModification.getDataBefore() == null && nodeModification.getDataAfter() != null;
}
/**
* All connectors appeared in operational store (logical add).
*/
- private boolean isAddLogical(DataTreeModification<Node> modification) {
- final DataObjectModification<Node> rootNode = modification.getRootNode();
- if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
- return true;
- }
-
- return false;
+ private static boolean isAddLogical(final DataObjectModification<Node> nodeModification) {
+ return safeConnectorsEmpty(nodeModification.getDataBefore())
+ && !safeConnectorsEmpty(nodeModification.getDataAfter());
}
/**
* configuration (coming from operational) should be calculated and sent to device.
* @param modification from DS
* @return optional syncup future
- * @throws InterruptedException from syncup
*/
- private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
+ private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification) {
final NodeId nodeId = ModificationUtil.nodeId(modification);
final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
if (nodeConfiguration.isPresent()) {
- LOG.debug("Reconciliation: {}", nodeId.getValue());
+ LOG.debug("Reconciliation {}: {}", dsType(), nodeId.getValue());
final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
.augmentation(FlowCapableNode.class);
return Optional.of(reactor.syncup(nodePath, syncupEntry));
} else {
LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
+ reconciliationRegistry.unregisterIfRegistered(nodeId);
return skipModification(modification);
}
}
- private boolean isConsistentForReconcile(DataTreeModification<Node> modification) {
+ /**
+ * Check if modification is consistent for reconciliation. We need fresh data, which means that current statistics
+ * were collected after registration for reconcile and whole bunch of statistics was collected successfully.
+ * @param modification from DS
+ * @return status of modification
+ */
+ private boolean isConsistentForReconcile(final DataTreeModification<Node> modification) {
final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
- .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
+ .augmentation(FlowCapableStatisticsGatheringStatus.class);
if (gatheringStatus == null) {
LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
return true;
}
} catch (ParseException e) {
- LOG.error("Timestamp parsing error {}", e);
+ LOG.warn("Timestamp parsing error", e);
}
LOG.debug("Fresh operational not present: {}", nodeId.getValue());
return false;
}
- private static boolean safeConnectorsEmpty(Node node) {
+ private static boolean safeConnectorsEmpty(final Node node) {
if (node == null) {
return true;
}
- final List<NodeConnector> nodeConnectors = node.getNodeConnector();
+ final Map<NodeConnectorKey, NodeConnector> nodeConnectors = node.getNodeConnector();
return nodeConnectors == null || nodeConnectors.isEmpty();
}
public LogicalDatastoreType dsType() {
return LogicalDatastoreType.OPERATIONAL;
}
-
}