import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Collection;
+import java.util.Date;
import java.util.List;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
+import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.slf4j.LoggerFactory;
/**
- * Listens to operational new nodes and delegates add/remove/update/barrier to {@link SyncReactor}.
+ * Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary.
*/
public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
-
- protected final SyncReactor reactor;
- protected final FlowCapableNodeSnapshotDao operationalSnapshot;
- protected final FlowCapableNodeDao configDao;
-
- public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
- FlowCapableNodeDao configDao) {
+ public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
+ private final SyncReactor reactor;
+ private final FlowCapableNodeSnapshotDao operationalSnapshot;
+ private final FlowCapableNodeDao configDao;
+ private final ReconciliationRegistry reconciliationRegistry;
+ private final DeviceMastershipManager deviceMastershipManager;
+
+ public SimplifiedOperationalListener(final SyncReactor reactor,
+ final FlowCapableNodeSnapshotDao operationalSnapshot,
+ final FlowCapableNodeDao configDao,
+ final ReconciliationRegistry reconciliationRegistry,
+ final DeviceMastershipManager deviceMastershipManager) {
this.reactor = reactor;
this.operationalSnapshot = operationalSnapshot;
this.configDao = configDao;
+ this.reconciliationRegistry = reconciliationRegistry;
+ this.deviceMastershipManager = deviceMastershipManager;
}
@Override
public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
- LOG.trace("Inventory Operational changes {}", modifications.size());
+ LOG.trace("Operational changes: {}", modifications.size());
super.onDataTreeChanged(modifications);
}
/**
- * This method behaves like this:
- * <ul>
- * <li>If node is added to operational store then reconciliation.</li>
- * <li>Node is deleted from operational cache is removed.</li>
- * <li>Skip this event otherwise.</li>
- * </ul>
- *
+ * Update cache, register for device masterhip when device connected and start reconciliation if device
+ * is registered and actual modification is consistent.Skip the event otherwise.
* @throws InterruptedException from syncup
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<Node> modification) throws ReadFailedException, InterruptedException {
-
+ DataTreeModification<Node> modification) throws InterruptedException {
+ final NodeId nodeId = ModificationUtil.nodeId(modification);
updateCache(modification);
- if (isReconciliationNeeded(modification)) {
+
+ if (isAdd(modification) || isAddLogical(modification)) {
+ deviceMastershipManager.onDeviceConnected(nodeId);
+ }
+
+ if (reconciliationRegistry.isRegistered(nodeId) && isConsistentForReconcile(modification)) {
return reconciliation(modification);
+ } else {
+ return skipModification(modification);
}
- return skipModification(modification);
}
/**
* Remove if delete. Update only if FlowCapableNode Augmentation modified.
- *
+ * Unregister for device mastership.
* @param modification Datastore modification
- * @return true for cache update, false for cache remove
*/
- protected boolean updateCache(DataTreeModification<Node> modification) {
+ private void updateCache(DataTreeModification<Node> modification) {
+ NodeId nodeId = ModificationUtil.nodeId(modification);
if (isDelete(modification) || isDeleteLogical(modification)) {
- operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
- return false;
+ operationalSnapshot.updateCache(nodeId, Optional.absent());
+ deviceMastershipManager.onDeviceDisconnected(nodeId);
+ return;
}
- operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
- return true;
+ operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
}
private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
- LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}", nodeIdValue(modification),
+ LOG.trace("Skipping operational modification: {}, before {}, after {}",
+ ModificationUtil.nodeIdValue(modification),
modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
return Optional.absent();
}
/**
- * ModificationType.DELETE
+ * ModificationType.DELETE.
*/
private boolean isDelete(DataTreeModification<Node> modification) {
if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
- LOG.trace("Delete {} (physical)", nodeIdValue(modification));
+ LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
return true;
}
private boolean isDeleteLogical(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Delete {} (logical)", nodeIdValue(modification));
+ LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
return true;
}
final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
if (nodeAppearedInOperational) {
- LOG.trace("Add {} (physical)", nodeIdValue(modification));
+ LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
}
return nodeAppearedInOperational;
}
private boolean isAddLogical(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Add {} (logical)", nodeIdValue(modification));
+ LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
return true;
}
return false;
}
- protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
- return isAdd(modification) || isAddLogical(modification);
- }
-
+ /**
+ * If node is present in config DS diff between wanted configuration (in config DS) and actual device
+ * configuration (coming from operational) should be calculated and sent to device.
+ * @param modification from DS
+ * @return optional syncup future
+ * @throws InterruptedException from syncup
+ */
private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
- final NodeId nodeId = nodeId(modification);
+ final NodeId nodeId = ModificationUtil.nodeId(modification);
final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
if (nodeConfiguration.isPresent()) {
LOG.debug("Reconciliation: {}", nodeId.getValue());
final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
- return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification), dsType()));
+ .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
+ .augmentation(FlowCapableNode.class);
+ final FlowCapableNode fcOperationalNode = ModificationUtil.flowCapableNodeAfter(modification);
+ final SyncupEntry syncupEntry = new SyncupEntry(nodeConfiguration.get(), LogicalDatastoreType.CONFIGURATION,
+ fcOperationalNode, dsType());
+ return Optional.of(reactor.syncup(nodePath, syncupEntry));
} else {
+ LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
return skipModification(modification);
}
}
- static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
- final Node dataAfter = modification.getRootNode().getDataAfter();
- if (dataAfter == null) {
- return null;
- }
- return dataAfter.getAugmentation(FlowCapableNode.class);
- }
+ private boolean isConsistentForReconcile(DataTreeModification<Node> modification) {
+ final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
+ final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
+ .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
- static boolean safeConnectorsEmpty(Node node) {
- if (node == null) {
- return true;
+ if (gatheringStatus == null) {
+ LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
+ return false;
}
- final List<NodeConnector> nodeConnectors = node.getNodeConnector();
-
- return nodeConnectors == null || nodeConnectors.isEmpty();
- }
+ final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
- static String nodeIdValue(DataTreeModification<Node> modification) {
- final NodeId nodeId = nodeId(modification);
-
- if (nodeId == null) {
- return null;
+ if (gatheringStatusEnd == null) {
+ LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
+ return false;
}
- return nodeId.getValue();
- }
-
- static NodeId nodeId(DataTreeModification<Node> modification) {
- final DataObjectModification<Node> rootNode = modification.getRootNode();
- final Node dataAfter = rootNode.getDataAfter();
-
- if (dataAfter != null) {
- return dataAfter.getId();
+ if (!gatheringStatusEnd.isSucceeded()) {
+ LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
+ return false;
}
- final Node dataBefore = rootNode.getDataBefore();
- if (dataBefore != null) {
- return dataBefore.getId();
+ try {
+ Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
+ final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
+ Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
+ if (timestampOfStatistics.after(timestampOfRegistration)) {
+ LOG.debug("Fresh operational present: {}", nodeId.getValue());
+ return true;
+ }
+ } catch (ParseException e) {
+ LOG.error("Timestamp parsing error {}", e);
}
+ LOG.debug("Fresh operational not present: {}", nodeId.getValue());
+ return false;
+ }
- return null;
+ private static boolean safeConnectorsEmpty(Node node) {
+ if (node == null) {
+ return true;
+ }
+ final List<NodeConnector> nodeConnectors = node.getNodeConnector();
+ return nodeConnectors == null || nodeConnectors.isEmpty();
}
@Override
public LogicalDatastoreType dsType() {
return LogicalDatastoreType.OPERATIONAL;
}
+
}