2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.text.ParseException;
14 import java.text.SimpleDateFormat;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.List;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
20 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
21 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
24 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
25 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
26 import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
27 import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
28 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
29 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
30 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary.
46 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
48 private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
49 public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
50 private final SyncReactor reactor;
51 private final FlowCapableNodeSnapshotDao operationalSnapshot;
52 private final FlowCapableNodeDao configDao;
53 private final ReconciliationRegistry reconciliationRegistry;
54 private final DeviceMastershipManager deviceMastershipManager;
56 public SimplifiedOperationalListener(final SyncReactor reactor,
57 final FlowCapableNodeSnapshotDao operationalSnapshot,
58 final FlowCapableNodeDao configDao,
59 final ReconciliationRegistry reconciliationRegistry,
60 final DeviceMastershipManager deviceMastershipManager) {
61 this.reactor = reactor;
62 this.operationalSnapshot = operationalSnapshot;
63 this.configDao = configDao;
64 this.reconciliationRegistry = reconciliationRegistry;
65 this.deviceMastershipManager = deviceMastershipManager;
69 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> modifications) {
70 super.onDataTreeChanged(modifications);
74 * Update cache, register for device mastership when device connected and start reconciliation if device
75 * is registered and actual modification is consistent.Skip the event otherwise.
76 * @throws InterruptedException from syncup
78 protected Optional<ListenableFuture<Boolean>> processNodeModification(
79 final DataTreeModification<Node> modification) throws InterruptedException {
80 final NodeId nodeId = ModificationUtil.nodeId(modification);
81 updateCache(modification);
83 if (isAdd(modification) || isAddLogical(modification)) {
84 deviceMastershipManager.onDeviceConnected(nodeId);
87 if (reconciliationRegistry.isRegistered(nodeId) && isConsistentForReconcile(modification)) {
88 return reconciliation(modification);
90 return skipModification(modification);
95 * Remove if delete. Update only if FlowCapableNode Augmentation modified.
96 * Unregister for device mastership.
97 * @param modification Datastore modification
99 private void updateCache(final DataTreeModification<Node> modification) {
100 NodeId nodeId = ModificationUtil.nodeId(modification);
101 if (isDelete(modification) || isDeleteLogical(modification)) {
102 operationalSnapshot.updateCache(nodeId, Optional.absent());
103 deviceMastershipManager.onDeviceDisconnected(nodeId);
106 operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
109 private Optional<ListenableFuture<Boolean>> skipModification(final DataTreeModification<Node> modification) {
110 if (LOG.isTraceEnabled()) {
111 LOG.trace("Skipping operational modification: {}, before {}, after {}",
112 ModificationUtil.nodeIdValue(modification),
113 modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
114 modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
116 return Optional.absent();
120 * ModificationType.DELETE.
122 private boolean isDelete(final DataTreeModification<Node> modification) {
123 return ModificationType.DELETE == modification.getRootNode().getModificationType();
127 * All connectors disappeared from operational store (logical delete).
129 private boolean isDeleteLogical(final DataTreeModification<Node> modification) {
130 final DataObjectModification<Node> rootNode = modification.getRootNode();
131 return !safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter());
135 private boolean isAdd(final DataTreeModification<Node> modification) {
136 final DataObjectModification<Node> rootNode = modification.getRootNode();
137 return rootNode.getDataBefore() == null && rootNode.getDataAfter() != null;
141 * All connectors appeared in operational store (logical add).
143 private boolean isAddLogical(final DataTreeModification<Node> modification) {
144 final DataObjectModification<Node> rootNode = modification.getRootNode();
145 return safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter());
149 * If node is present in config DS diff between wanted configuration (in config DS) and actual device
150 * configuration (coming from operational) should be calculated and sent to device.
151 * @param modification from DS
152 * @return optional syncup future
153 * @throws InterruptedException from syncup
155 private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification)
156 throws InterruptedException {
157 final NodeId nodeId = ModificationUtil.nodeId(modification);
158 final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
160 if (nodeConfiguration.isPresent()) {
161 LOG.debug("Reconciliation {}: {}", dsType(), nodeId.getValue());
162 final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
163 .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
164 .augmentation(FlowCapableNode.class);
165 final FlowCapableNode fcOperationalNode = ModificationUtil.flowCapableNodeAfter(modification);
166 final SyncupEntry syncupEntry = new SyncupEntry(nodeConfiguration.get(), LogicalDatastoreType.CONFIGURATION,
167 fcOperationalNode, dsType());
168 return Optional.of(reactor.syncup(nodePath, syncupEntry));
170 LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
171 reconciliationRegistry.unregisterIfRegistered(nodeId);
172 return skipModification(modification);
176 private boolean isConsistentForReconcile(final DataTreeModification<Node> modification) {
177 final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
178 final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
179 .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
181 if (gatheringStatus == null) {
182 LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
186 final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
188 if (gatheringStatusEnd == null) {
189 LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
193 if (!gatheringStatusEnd.isSucceeded()) {
194 LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
199 Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
200 final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
201 Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
202 if (timestampOfStatistics.after(timestampOfRegistration)) {
203 LOG.debug("Fresh operational present: {}", nodeId.getValue());
206 } catch (ParseException e) {
207 LOG.warn("Timestamp parsing error {}", e);
209 LOG.debug("Fresh operational not present: {}", nodeId.getValue());
213 private static boolean safeConnectorsEmpty(final Node node) {
217 final List<NodeConnector> nodeConnectors = node.getNodeConnector();
218 return nodeConnectors == null || nodeConnectors.isEmpty();
222 public LogicalDatastoreType dsType() {
223 return LogicalDatastoreType.OPERATIONAL;