Switch to MD-SAL APIs
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SimplifiedOperationalListener.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frsync.impl;
9
10 import com.google.common.util.concurrent.ListenableFuture;
11 import java.text.ParseException;
12 import java.text.SimpleDateFormat;
13 import java.util.Collection;
14 import java.util.Date;
15 import java.util.List;
16 import java.util.Objects;
17 import java.util.Optional;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.mdsal.binding.api.DataObjectModification;
20 import org.opendaylight.mdsal.binding.api.DataTreeModification;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
23 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
24 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
25 import org.opendaylight.openflowplugin.applications.frsync.impl.clustering.DeviceMastershipManager;
26 import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
28 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
29 import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Listens to operational changes and starts reconciliation through {@link SyncReactor} when necessary.
44  */
45 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
46
47     private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
48     public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
49     private final SyncReactor reactor;
50     private final FlowCapableNodeSnapshotDao operationalSnapshot;
51     private final FlowCapableNodeDao configDao;
52     private final ReconciliationRegistry reconciliationRegistry;
53     private final DeviceMastershipManager deviceMastershipManager;
54
55     public SimplifiedOperationalListener(final SyncReactor reactor,
56                                          final FlowCapableNodeSnapshotDao operationalSnapshot,
57                                          final FlowCapableNodeDao configDao,
58                                          final ReconciliationRegistry reconciliationRegistry,
59                                          final DeviceMastershipManager deviceMastershipManager) {
60         this.reactor = reactor;
61         this.operationalSnapshot = operationalSnapshot;
62         this.configDao = configDao;
63         this.reconciliationRegistry = reconciliationRegistry;
64         this.deviceMastershipManager = deviceMastershipManager;
65     }
66
67     @Override
68     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> modifications) {
69         super.onDataTreeChanged(modifications);
70     }
71
72     /**
73      * Update cache, register for device mastership when device connected and start reconciliation if device
74      * is registered and actual modification is consistent.Skip the event otherwise.
75      */
76     @Override
77     protected Optional<ListenableFuture<Boolean>> processNodeModification(
78             final DataTreeModification<Node> modification) {
79         Optional<ListenableFuture<Boolean>> result;
80         final NodeId nodeId = ModificationUtil.nodeId(modification);
81         final DataObjectModification<Node> nodeModification = modification.getRootNode();
82
83         if (isDelete(nodeModification) || isDeleteLogical(nodeModification)) {
84             operationalSnapshot.updateCache(nodeId, Optional.empty());
85             deviceMastershipManager.onDeviceDisconnected(nodeId);
86             result = skipModification(modification);
87         } else {
88             operationalSnapshot.updateCache(nodeId, Optional.ofNullable(
89                     ModificationUtil.flowCapableNodeAfter(modification)));
90
91             final boolean isAdd = isAdd(nodeModification) || isAddLogical(nodeModification);
92
93             if (isAdd) {
94                 deviceMastershipManager.onDeviceConnected(nodeId);
95             }
96
97             // if node is registered for reconcile we need consistent data from operational DS (skip partial
98             // collections) but we can accept first modification since all statistics are intentionally collected in
99             // one step on startup
100             if (reconciliationRegistry.isRegistered(nodeId) && (isAdd || isConsistentForReconcile(modification))) {
101                 result = reconciliation(modification);
102             } else {
103                 result = skipModification(modification);
104             }
105         }
106         return result;
107     }
108
109     private Optional<ListenableFuture<Boolean>> skipModification(final DataTreeModification<Node> modification) {
110         if (LOG.isTraceEnabled()) {
111             LOG.trace("Skipping operational modification: {}, before {}, after {}",
112                     ModificationUtil.nodeIdValue(modification),
113                     modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
114                     modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
115         }
116         return Optional.empty();
117     }
118
119     private boolean isDelete(final DataObjectModification<Node> nodeModification) {
120         return Objects.nonNull(nodeModification.getDataBefore()) && Objects.isNull(nodeModification.getDataAfter());
121     }
122
123     /**
124      * All connectors disappeared from operational store (logical delete).
125      */
126     private boolean isDeleteLogical(final DataObjectModification<Node> nodeModification) {
127         return !safeConnectorsEmpty(nodeModification.getDataBefore())
128                 && safeConnectorsEmpty(nodeModification.getDataAfter());
129
130     }
131
132     private boolean isAdd(final DataObjectModification<Node> nodeModification) {
133         return Objects.isNull(nodeModification.getDataBefore()) && Objects.nonNull(nodeModification.getDataAfter());
134     }
135
136     /**
137      * All connectors appeared in operational store (logical add).
138      */
139     private boolean isAddLogical(final DataObjectModification<Node> nodeModification) {
140         return safeConnectorsEmpty(nodeModification.getDataBefore())
141                 && !safeConnectorsEmpty(nodeModification.getDataAfter());
142     }
143
144     /**
145      * If node is present in config DS diff between wanted configuration (in config DS) and actual device
146      * configuration (coming from operational) should be calculated and sent to device.
147      * @param modification from DS
148      * @return optional syncup future
149      */
150     private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification) {
151         final NodeId nodeId = ModificationUtil.nodeId(modification);
152         final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
153
154         if (nodeConfiguration.isPresent()) {
155             LOG.debug("Reconciliation {}: {}", dsType(), nodeId.getValue());
156             final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
157                     .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
158                     .augmentation(FlowCapableNode.class);
159             final FlowCapableNode fcOperationalNode = ModificationUtil.flowCapableNodeAfter(modification);
160             final SyncupEntry syncupEntry = new SyncupEntry(nodeConfiguration.get(), LogicalDatastoreType.CONFIGURATION,
161                                                             fcOperationalNode, dsType());
162             return Optional.of(reactor.syncup(nodePath, syncupEntry));
163         } else {
164             LOG.debug("Config not present for reconciliation: {}", nodeId.getValue());
165             reconciliationRegistry.unregisterIfRegistered(nodeId);
166             return skipModification(modification);
167         }
168     }
169
170     /**
171      * Check if modification is consistent for reconciliation. We need fresh data, which means that current statistics
172      * were collected after registration for reconcile and whole bunch of statistics was collected successfully.
173      * @param modification from DS
174      * @return status of modification
175      */
176     private boolean isConsistentForReconcile(final DataTreeModification<Node> modification) {
177         final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
178         final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
179                 .augmentation(FlowCapableStatisticsGatheringStatus.class);
180
181         if (gatheringStatus == null) {
182             LOG.trace("Statistics gathering never started: {}", nodeId.getValue());
183             return false;
184         }
185
186         final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
187
188         if (gatheringStatusEnd == null) {
189             LOG.trace("Statistics gathering is not over yet: {}", nodeId.getValue());
190             return false;
191         }
192
193         if (!gatheringStatusEnd.isSucceeded()) {
194             LOG.trace("Statistics gathering was not successful: {}", nodeId.getValue());
195             return false;
196         }
197
198         try {
199             Date timestampOfRegistration = reconciliationRegistry.getRegistrationTimestamp(nodeId);
200             final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_AND_TIME_FORMAT);
201             Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
202             if (timestampOfStatistics.after(timestampOfRegistration)) {
203                 LOG.debug("Fresh operational present: {}", nodeId.getValue());
204                 return true;
205             }
206         } catch (ParseException e) {
207             LOG.warn("Timestamp parsing error {}", e);
208         }
209         LOG.debug("Fresh operational not present: {}", nodeId.getValue());
210         return false;
211     }
212
213     private static boolean safeConnectorsEmpty(final Node node) {
214         if (node == null) {
215             return true;
216         }
217         final List<NodeConnector> nodeConnectors = node.getNodeConnector();
218         return nodeConnectors == null || nodeConnectors.isEmpty();
219     }
220
221     @Override
222     public LogicalDatastoreType dsType() {
223         return LogicalDatastoreType.OPERATIONAL;
224     }
225 }