X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-sync%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrsync%2Fimpl%2FSimplifiedOperationalListener.java;h=424a98daebc3b7456b864abb5a89a013ea305ea7;hb=f43cdb15170c04272e95e8b280e2d7ceace613a2;hp=2563fa9dedac977487d8595e02dd4a673c9fcb89;hpb=3c0e7bb26944f8fc50e08c3610e948026633cda0;p=openflowplugin.git diff --git a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java index 2563fa9ded..424a98daeb 100644 --- a/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java +++ b/applications/forwardingrules-sync/src/main/java/org/opendaylight/openflowplugin/applications/frsync/impl/SimplifiedOperationalListener.java @@ -10,17 +10,26 @@ package org.opendaylight.openflowplugin.applications.frsync.impl; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Collection; +import java.util.Date; import java.util.List; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType; import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.openflowplugin.applications.frsync.SyncReactor; import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao; import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao; +import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager; +import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil; +import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil; +import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry; +import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; @@ -31,180 +40,181 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Listens to operational new nodes and delegates add/remove/update/barrier to {@link SyncReactor}. + * Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary. */ public class SimplifiedOperationalListener extends AbstractFrmSyncListener { - private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class); - - protected final SyncReactor reactor; - protected final FlowCapableNodeSnapshotDao operationalSnapshot; - protected final FlowCapableNodeDao configDao; - public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot, - FlowCapableNodeDao configDao) { + private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class); + public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; + private final SyncReactor reactor; + private final FlowCapableNodeSnapshotDao operationalSnapshot; + private final FlowCapableNodeDao configDao; + private final ReconciliationRegistry reconciliationRegistry; + private final DeviceMastershipManager deviceMastershipManager; + + public SimplifiedOperationalListener(final SyncReactor reactor, + final FlowCapableNodeSnapshotDao operationalSnapshot, + final FlowCapableNodeDao configDao, + final ReconciliationRegistry reconciliationRegistry, + final DeviceMastershipManager deviceMastershipManager) { this.reactor = reactor; this.operationalSnapshot = operationalSnapshot; this.configDao = configDao; + this.reconciliationRegistry = reconciliationRegistry; + this.deviceMastershipManager = deviceMastershipManager; } @Override - public void onDataTreeChanged(Collection> modifications) { - LOG.trace("Inventory Operational changes {}", modifications.size()); + public void onDataTreeChanged(final Collection> modifications) { super.onDataTreeChanged(modifications); } /** - * This method behaves like this: - *
    - *
  • If node is added to operational store then reconciliation.
  • - *
  • Node is deleted from operational cache is removed.
  • - *
  • Skip this event otherwise.
  • - *
- * + * Update cache, register for device masterhip when device connected and start reconciliation if device + * is registered and actual modification is consistent.Skip the event otherwise. * @throws InterruptedException from syncup */ protected Optional> processNodeModification( - DataTreeModification modification) throws ReadFailedException, InterruptedException { - + final DataTreeModification modification) throws InterruptedException { + final NodeId nodeId = ModificationUtil.nodeId(modification); updateCache(modification); - if (isReconciliationNeeded(modification)) { + + if (isAdd(modification) || isAddLogical(modification)) { + deviceMastershipManager.onDeviceConnected(nodeId); + } + + if (reconciliationRegistry.isRegistered(nodeId) && isConsistentForReconcile(modification)) { return reconciliation(modification); + } else { + return skipModification(modification); } - return skipModification(modification); } /** * Remove if delete. Update only if FlowCapableNode Augmentation modified. - * + * Unregister for device mastership. * @param modification Datastore modification - * @return true for cache update, false for cache remove */ - protected boolean updateCache(DataTreeModification modification) { + private void updateCache(final DataTreeModification modification) { + NodeId nodeId = ModificationUtil.nodeId(modification); if (isDelete(modification) || isDeleteLogical(modification)) { - operationalSnapshot.updateCache(nodeId(modification), Optional.absent()); - return false; + operationalSnapshot.updateCache(nodeId, Optional.absent()); + deviceMastershipManager.onDeviceDisconnected(nodeId); + return; } - operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification))); - return true; + operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification))); } - private Optional> skipModification(DataTreeModification modification) { - LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}", nodeIdValue(modification), - modification.getRootNode().getDataBefore() == null ? "null" : "nonnull", - modification.getRootNode().getDataAfter() == null ? "null" : "nonnull"); + private Optional> skipModification(final DataTreeModification modification) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping operational modification: {}, before {}, after {}", + ModificationUtil.nodeIdValue(modification), + modification.getRootNode().getDataBefore() == null ? "null" : "nonnull", + modification.getRootNode().getDataAfter() == null ? "null" : "nonnull"); + } return Optional.absent(); } /** - * ModificationType.DELETE + * ModificationType.DELETE. */ - private boolean isDelete(DataTreeModification modification) { - if (ModificationType.DELETE == modification.getRootNode().getModificationType()) { - LOG.trace("Delete {} (physical)", nodeIdValue(modification)); - return true; - } - - return false; + private boolean isDelete(final DataTreeModification modification) { + return ModificationType.DELETE == modification.getRootNode().getModificationType(); } /** * All connectors disappeared from operational store (logical delete). */ - private boolean isDeleteLogical(DataTreeModification modification) { + private boolean isDeleteLogical(final DataTreeModification modification) { final DataObjectModification rootNode = modification.getRootNode(); - if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) { - LOG.trace("Delete {} (logical)", nodeIdValue(modification)); - return true; - } + return !safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter()); - return false; } - private boolean isAdd(DataTreeModification modification) { + private boolean isAdd(final DataTreeModification modification) { final DataObjectModification rootNode = modification.getRootNode(); - final Node dataAfter = rootNode.getDataAfter(); - final Node dataBefore = rootNode.getDataBefore(); - - final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null; - if (nodeAppearedInOperational) { - LOG.trace("Add {} (physical)", nodeIdValue(modification)); - } - return nodeAppearedInOperational; + return rootNode.getDataBefore() == null && rootNode.getDataAfter() != null; } /** * All connectors appeared in operational store (logical add). */ - private boolean isAddLogical(DataTreeModification modification) { + private boolean isAddLogical(final DataTreeModification modification) { final DataObjectModification rootNode = modification.getRootNode(); - if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) { - LOG.trace("Add {} (logical)", nodeIdValue(modification)); - return true; - } - - return false; - } - - protected boolean isReconciliationNeeded(DataTreeModification modification) { - return isAdd(modification) || isAddLogical(modification); + return safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter()); } - private Optional> reconciliation(DataTreeModification modification) throws InterruptedException { - final NodeId nodeId = nodeId(modification); + /** + * If node is present in config DS diff between wanted configuration (in config DS) and actual device + * configuration (coming from operational) should be calculated and sent to device. + * @param modification from DS + * @return optional syncup future + * @throws InterruptedException from syncup + */ + private Optional> reconciliation(final DataTreeModification modification) + throws InterruptedException { + final NodeId nodeId = ModificationUtil.nodeId(modification); final Optional nodeConfiguration = configDao.loadByNodeId(nodeId); if (nodeConfiguration.isPresent()) { - LOG.debug("Reconciliation: {}", nodeId.getValue()); + LOG.debug("Reconciliation {}: {}", dsType(), nodeId.getValue()); final InstanceIdentifier nodePath = InstanceIdentifier.create(Nodes.class) - .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class); - return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification), dsType())); + .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification))) + .augmentation(FlowCapableNode.class); + final FlowCapableNode fcOperationalNode = ModificationUtil.flowCapableNodeAfter(modification); + final SyncupEntry syncupEntry = new SyncupEntry(nodeConfiguration.get(), LogicalDatastoreType.CONFIGURATION, + fcOperationalNode, dsType()); + return Optional.of(reactor.syncup(nodePath, syncupEntry)); } else { + LOG.debug("Config not present for reconciliation: {}", nodeId.getValue()); + reconciliationRegistry.unregisterIfRegistered(nodeId); return skipModification(modification); } } - static FlowCapableNode flowCapableNodeAfter(DataTreeModification modification) { - final Node dataAfter = modification.getRootNode().getDataAfter(); - if (dataAfter == null) { - return null; - } - return dataAfter.getAugmentation(FlowCapableNode.class); - } + private boolean isConsistentForReconcile(final DataTreeModification modification) { + final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier()); + final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter() + .getAugmentation(FlowCapableStatisticsGatheringStatus.class); - static boolean safeConnectorsEmpty(Node node) { - if (node == null) { - return true; + if (gatheringStatus == null) { + LOG.trace("Statistics gathering never started: {}", nodeId.getValue()); + return false; } - final List nodeConnectors = node.getNodeConnector(); - - return nodeConnectors == null || nodeConnectors.isEmpty(); - } - - static String nodeIdValue(DataTreeModification modification) { - final NodeId nodeId = nodeId(modification); + final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd(); - if (nodeId == null) { - return null; + if (gatheringStatusEnd == null) { + LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue()); + return false; } - return nodeId.getValue(); - } - - static NodeId nodeId(DataTreeModification modification) { - final DataObjectModification rootNode = modification.getRootNode(); - final Node dataAfter = rootNode.getDataAfter(); - - if (dataAfter != null) { - return dataAfter.getId(); + if (!gatheringStatusEnd.isSucceeded()) { + LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue()); + return false; } - final Node dataBefore = rootNode.getDataBefore(); - if (dataBefore != null) { - return dataBefore.getId(); + try { + Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId); + final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT); + Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue()); + if (timestampOfStatistics.after(timestampOfRegistration)) { + LOG.debug("Fresh operational present: {}", nodeId.getValue()); + return true; + } + } catch (ParseException e) { + LOG.warn("Timestamp parsing error {}", e); } + LOG.debug("Fresh operational not present: {}", nodeId.getValue()); + return false; + } - return null; + private static boolean safeConnectorsEmpty(final Node node) { + if (node == null) { + return true; + } + final List nodeConnectors = node.getNodeConnector(); + return nodeConnectors == null || nodeConnectors.isEmpty(); } @Override