2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.text.ParseException;
14 import java.text.SimpleDateFormat;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.List;
18 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
19 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
20 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
23 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
24 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
25 import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
26 import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
28 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
29 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Listens to operational new nodes and delegates add/remove/update/barrier to {@link SyncReactor}.
45 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
46 private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
47 public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
49 private final SyncReactor reactor;
50 private final FlowCapableNodeSnapshotDao operationalSnapshot;
51 private final FlowCapableNodeDao configDao;
52 private final ReconciliationRegistry reconciliationRegistry;
53 private final DeviceMastershipManager deviceMastershipManager;
55 public SimplifiedOperationalListener(final SyncReactor reactor,
56 final FlowCapableNodeSnapshotDao operationalSnapshot,
57 final FlowCapableNodeDao configDao,
58 final ReconciliationRegistry reconciliationRegistry,
59 final DeviceMastershipManager deviceMastershipManager) {
60 this.reactor = reactor;
61 this.operationalSnapshot = operationalSnapshot;
62 this.configDao = configDao;
63 this.reconciliationRegistry = reconciliationRegistry;
64 this.deviceMastershipManager = deviceMastershipManager;
68 public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
69 LOG.trace("Operational changes: {}", modifications.size());
70 super.onDataTreeChanged(modifications);
74 * This method behaves like this:
76 * <li>If node is added to operational store then reconciliation.</li>
77 * <li>Node is deleted from operational cache is removed.</li>
78 * <li>Skip this event otherwise.</li>
81 * @throws InterruptedException from syncup
83 protected Optional<ListenableFuture<Boolean>> processNodeModification(
84 DataTreeModification<Node> modification) throws InterruptedException {
85 final NodeId nodeId = ModificationUtil.nodeId(modification);
86 updateCache(modification);
88 if (isAdd(modification) || isAddLogical(modification)) {
89 deviceMastershipManager.onDeviceConnected(nodeId);
92 if (isRegisteredAndConsistentForReconcile(modification)) {
93 return reconciliation(modification);
95 return skipModification(modification);
100 * Remove if delete. Update only if FlowCapableNode Augmentation modified.
102 * @param modification Datastore modification
104 private void updateCache(DataTreeModification<Node> modification) {
105 NodeId nodeId = ModificationUtil.nodeId(modification);
106 if (isDelete(modification) || isDeleteLogical(modification)) {
107 operationalSnapshot.updateCache(nodeId, Optional.absent());
108 deviceMastershipManager.onDeviceDisconnected(nodeId);
111 operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
114 private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
115 LOG.trace("Skipping operational modification: {}, before {}, after {}",
116 ModificationUtil.nodeIdValue(modification),
117 modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
118 modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
119 return Optional.absent();
123 * ModificationType.DELETE.
125 private boolean isDelete(DataTreeModification<Node> modification) {
126 if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
127 LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
135 * All connectors disappeared from operational store (logical delete).
137 private boolean isDeleteLogical(DataTreeModification<Node> modification) {
138 final DataObjectModification<Node> rootNode = modification.getRootNode();
139 if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
140 LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
147 private boolean isAdd(DataTreeModification<Node> modification) {
148 final DataObjectModification<Node> rootNode = modification.getRootNode();
149 final Node dataAfter = rootNode.getDataAfter();
150 final Node dataBefore = rootNode.getDataBefore();
152 final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
153 if (nodeAppearedInOperational) {
154 LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
156 return nodeAppearedInOperational;
160 * All connectors appeared in operational store (logical add).
162 private boolean isAddLogical(DataTreeModification<Node> modification) {
163 final DataObjectModification<Node> rootNode = modification.getRootNode();
164 if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
165 LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
172 private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
173 final NodeId nodeId = ModificationUtil.nodeId(modification);
174 final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
176 if (nodeConfiguration.isPresent()) {
177 LOG.debug("Reconciliation: {}", nodeId.getValue());
178 final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
179 .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
180 .augmentation(FlowCapableNode.class);
181 final FlowCapableNode fcOperationalNode = ModificationUtil.flowCapableNodeAfter(modification);
182 final SyncupEntry syncupEntry = new SyncupEntry(nodeConfiguration.get(), LogicalDatastoreType.CONFIGURATION,
183 fcOperationalNode, dsType());
184 return Optional.of(reactor.syncup(nodePath, syncupEntry));
186 LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
187 return skipModification(modification);
191 private boolean isRegisteredAndConsistentForReconcile(DataTreeModification<Node> modification) {
192 final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
194 if (!reconciliationRegistry.isRegistered(nodeId)) {
198 final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
199 .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
201 if (gatheringStatus == null) {
202 LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
206 final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
208 if (gatheringStatusEnd == null) {
209 LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
213 if (!gatheringStatusEnd.isSucceeded()) {
214 LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
219 Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
220 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
221 Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
222 if (timestampOfStatistics.after(timestampOfRegistration)) {
223 LOG.debug("Fresh operational present: {}", nodeId.getValue());
226 } catch (ParseException e) {
227 LOG.error("Timestamp parsing error {}", e);
229 LOG.debug("Fresh operational not present: {}", nodeId.getValue());
233 private static boolean safeConnectorsEmpty(Node node) {
238 final List<NodeConnector> nodeConnectors = node.getNodeConnector();
240 return nodeConnectors == null || nodeConnectors.isEmpty();
244 public LogicalDatastoreType dsType() {
245 return LogicalDatastoreType.OPERATIONAL;