2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.text.ParseException;
14 import java.text.SimpleDateFormat;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.List;
18 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
19 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
20 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
23 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
24 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Listens to operational new nodes and delegates add/remove/update/barrier to {@link SyncReactor}.
43 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
44 private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
46 private final SyncReactor reactor;
47 private final FlowCapableNodeSnapshotDao operationalSnapshot;
48 private final FlowCapableNodeDao configDao;
49 private final ReconciliationRegistry reconciliationRegistry;
51 public SimplifiedOperationalListener(final SyncReactor reactor,
52 final FlowCapableNodeSnapshotDao operationalSnapshot,
53 final FlowCapableNodeDao configDao,
54 final ReconciliationRegistry reconciliationRegistry) {
55 this.reactor = reactor;
56 this.operationalSnapshot = operationalSnapshot;
57 this.configDao = configDao;
58 this.reconciliationRegistry = reconciliationRegistry;
62 public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
63 // TODO return for clustered listener if not master for device
64 LOG.trace("Inventory Operational changes {}", modifications.size());
65 super.onDataTreeChanged(modifications);
69 * This method behaves like this:
71 * <li>If node is added to operational store then reconciliation.</li>
72 * <li>Node is deleted from operational cache is removed.</li>
73 * <li>Skip this event otherwise.</li>
76 * @throws InterruptedException from syncup
78 protected Optional<ListenableFuture<Boolean>> processNodeModification(
79 DataTreeModification<Node> modification) throws InterruptedException {
81 updateCache(modification);
82 // TODO register cluster service if node added
83 if (isReconciliationNeeded(modification)) {
84 return reconciliation(modification);
86 return skipModification(modification);
90 * Remove if delete. Update only if FlowCapableNode Augmentation modified.
92 * @param modification Datastore modification
94 private void updateCache(DataTreeModification<Node> modification) {
95 NodeId nodeId = ModificationUtil.nodeId(modification);
96 if (isDelete(modification) || isDeleteLogical(modification)) {
97 operationalSnapshot.updateCache(nodeId, Optional.absent());
98 // TODO unregister/close cluster service if node deleted
99 reconciliationRegistry.unregisterIfRegistered(nodeId);
102 operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
105 private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
106 LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}",
107 ModificationUtil.nodeIdValue(modification),
108 modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
109 modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
110 return Optional.absent();
114 * ModificationType.DELETE.
116 private boolean isDelete(DataTreeModification<Node> modification) {
117 if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
118 LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
126 * All connectors disappeared from operational store (logical delete).
128 private boolean isDeleteLogical(DataTreeModification<Node> modification) {
129 final DataObjectModification<Node> rootNode = modification.getRootNode();
130 if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
131 LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
138 private boolean isAdd(DataTreeModification<Node> modification) {
139 final DataObjectModification<Node> rootNode = modification.getRootNode();
140 final Node dataAfter = rootNode.getDataAfter();
141 final Node dataBefore = rootNode.getDataBefore();
143 final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
144 if (nodeAppearedInOperational) {
145 LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
147 return nodeAppearedInOperational;
151 * All connectors appeared in operational store (logical add).
153 private boolean isAddLogical(DataTreeModification<Node> modification) {
154 final DataObjectModification<Node> rootNode = modification.getRootNode();
155 if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
156 LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
163 private boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
164 return isAdd(modification) || isAddLogical(modification) || isRegisteredAndConsistentForReconcile(modification);
167 private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
168 final NodeId nodeId = ModificationUtil.nodeId(modification);
169 final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
171 if (nodeConfiguration.isPresent()) {
172 LOG.debug("Reconciliation: {}", nodeId.getValue());
173 final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
174 .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
175 .augmentation(FlowCapableNode.class);
176 final FlowCapableNode fcNode = ModificationUtil.flowCapableNodeAfter(modification);
177 return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), fcNode, dsType()));
179 return skipModification(modification);
183 private boolean isRegisteredAndConsistentForReconcile(DataTreeModification<Node> modification) {
184 final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
186 if (!reconciliationRegistry.isRegistered(nodeId)) {
190 final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
191 .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
193 if (gatheringStatus == null) {
194 LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
198 final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
200 if (gatheringStatusEnd == null) {
201 LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
205 if (!gatheringStatusEnd.isSucceeded()) {
206 LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
211 Date timestampOfRegistration = reconciliationRegistry.getRegistration(nodeId);
212 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
213 Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
214 if (timestampOfStatistics.after(timestampOfRegistration)) {
215 LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
218 } catch (ParseException e) {
219 LOG.error("Timestamp parsing error {}", e);
221 LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
225 private static boolean safeConnectorsEmpty(Node node) {
230 final List<NodeConnector> nodeConnectors = node.getNodeConnector();
232 return nodeConnectors == null || nodeConnectors.isEmpty();
236 public LogicalDatastoreType dsType() {
237 return LogicalDatastoreType.OPERATIONAL;