Cleaning and preparation before bug 6170
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / SimplifiedOperationalListener.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.text.ParseException;
14 import java.text.SimpleDateFormat;
15 import java.util.Collection;
16 import java.util.Date;
17 import java.util.List;
18 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
19 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
20 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
23 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
24 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
25 import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
26 import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
27 import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Listens to operational new nodes and delegates add/remove/update/barrier to {@link SyncReactor}.
42  */
43 public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
44     private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
45
46     private final SyncReactor reactor;
47     private final FlowCapableNodeSnapshotDao operationalSnapshot;
48     private final FlowCapableNodeDao configDao;
49     private final ReconciliationRegistry reconciliationRegistry;
50
51     public SimplifiedOperationalListener(final SyncReactor reactor,
52                                          final FlowCapableNodeSnapshotDao operationalSnapshot,
53                                          final FlowCapableNodeDao configDao,
54                                          final ReconciliationRegistry reconciliationRegistry) {
55         this.reactor = reactor;
56         this.operationalSnapshot = operationalSnapshot;
57         this.configDao = configDao;
58         this.reconciliationRegistry = reconciliationRegistry;
59     }
60
61     @Override
62     public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
63         // TODO return for clustered listener if not master for device
64         LOG.trace("Inventory Operational changes {}", modifications.size());
65         super.onDataTreeChanged(modifications);
66     }
67
68     /**
69      * This method behaves like this:
70      * <ul>
71      * <li>If node is added to operational store then reconciliation.</li>
72      * <li>Node is deleted from operational cache is removed.</li>
73      * <li>Skip this event otherwise.</li>
74      * </ul>
75      *
76      * @throws InterruptedException from syncup
77      */
78     protected Optional<ListenableFuture<Boolean>> processNodeModification(
79             DataTreeModification<Node> modification) throws InterruptedException {
80
81         updateCache(modification);
82         // TODO register cluster service if node added
83         if (isReconciliationNeeded(modification)) {
84             return reconciliation(modification);
85         }
86         return skipModification(modification);
87     }
88
89     /**
90      * Remove if delete. Update only if FlowCapableNode Augmentation modified.
91      *
92      * @param modification Datastore modification
93      */
94     private void updateCache(DataTreeModification<Node> modification) {
95         NodeId nodeId = ModificationUtil.nodeId(modification);
96         if (isDelete(modification) || isDeleteLogical(modification)) {
97             operationalSnapshot.updateCache(nodeId, Optional.absent());
98             // TODO unregister/close cluster service if node deleted
99             reconciliationRegistry.unregisterIfRegistered(nodeId);
100             return;
101         }
102         operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
103     }
104
105     private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
106         LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}",
107                 ModificationUtil.nodeIdValue(modification),
108                 modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
109                 modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
110         return Optional.absent();
111     }
112
113     /**
114      * ModificationType.DELETE.
115      */
116     private boolean isDelete(DataTreeModification<Node> modification) {
117         if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
118             LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
119             return true;
120         }
121
122         return false;
123     }
124
125     /**
126      * All connectors disappeared from operational store (logical delete).
127      */
128     private boolean isDeleteLogical(DataTreeModification<Node> modification) {
129         final DataObjectModification<Node> rootNode = modification.getRootNode();
130         if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
131             LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
132             return true;
133         }
134
135         return false;
136     }
137
138     private boolean isAdd(DataTreeModification<Node> modification) {
139         final DataObjectModification<Node> rootNode = modification.getRootNode();
140         final Node dataAfter = rootNode.getDataAfter();
141         final Node dataBefore = rootNode.getDataBefore();
142
143         final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
144         if (nodeAppearedInOperational) {
145             LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
146         }
147         return nodeAppearedInOperational;
148     }
149
150     /**
151      * All connectors appeared in operational store (logical add).
152      */
153     private boolean isAddLogical(DataTreeModification<Node> modification) {
154         final DataObjectModification<Node> rootNode = modification.getRootNode();
155         if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
156             LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
157             return true;
158         }
159
160         return false;
161     }
162
163     private boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
164         return isAdd(modification) || isAddLogical(modification) || isRegisteredAndConsistentForReconcile(modification);
165     }
166
167     private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
168         final NodeId nodeId = ModificationUtil.nodeId(modification);
169         final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
170
171         if (nodeConfiguration.isPresent()) {
172             LOG.debug("Reconciliation: {}", nodeId.getValue());
173             final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
174                     .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
175                     .augmentation(FlowCapableNode.class);
176             final FlowCapableNode fcNode = ModificationUtil.flowCapableNodeAfter(modification);
177             return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), fcNode, dsType()));
178         } else {
179             return skipModification(modification);
180         }
181     }
182
183     private boolean isRegisteredAndConsistentForReconcile(DataTreeModification<Node> modification) {
184         final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
185
186         if (!reconciliationRegistry.isRegistered(nodeId)) {
187             return false;
188         }
189
190         final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
191                 .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
192
193         if (gatheringStatus == null) {
194             LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
195             return false;
196         }
197
198         final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
199
200         if (gatheringStatusEnd == null) {
201             LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
202             return false;
203         }
204
205         if (!gatheringStatusEnd.isSucceeded()) {
206             LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
207             return false;
208         }
209
210         try {
211             Date timestampOfRegistration = reconciliationRegistry.getRegistration(nodeId);
212             final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
213             Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
214             if (timestampOfStatistics.after(timestampOfRegistration)) {
215                 LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
216                 return true;
217             }
218         } catch (ParseException e) {
219             LOG.error("Timestamp parsing error {}", e);
220         }
221         LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
222         return false;
223     }
224
225     private static boolean safeConnectorsEmpty(Node node) {
226         if (node == null) {
227             return true;
228         }
229
230         final List<NodeConnector> nodeConnectors = node.getNodeConnector();
231
232         return nodeConnectors == null || nodeConnectors.isEmpty();
233     }
234
235     @Override
236     public LogicalDatastoreType dsType() {
237         return LogicalDatastoreType.OPERATIONAL;
238     }
239     
240 }