OPNFLWPLUG-986: Administrative Reconciliation for multiple/all Nodes
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / DeviceMastershipManager.java
1 /**
2  * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frm.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Iterables;
14 import com.google.common.collect.Sets;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Set;
18 import java.util.concurrent.ConcurrentHashMap;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
27 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
28 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
29 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
30 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Manager for clustering service registrations of {@link DeviceMastership}.
48  */
49 public class DeviceMastershipManager
50         implements ClusteredDataTreeChangeListener<FlowCapableNode>, OpendaylightInventoryListener, AutoCloseable {
51     private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
52     private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
53             .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
54
55     private final ClusterSingletonServiceProvider clusterSingletonService;
56     private final ListenerRegistration<?> notifListenerRegistration;
57     private final FlowNodeReconciliation reconcliationAgent;
58     private final DataBroker dataBroker;
59     private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
60     private final Object lockObj = new Object();
61     private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
62     private Set<InstanceIdentifier<FlowCapableNode>> activeNodes = Collections.emptySet();
63     private RoutedRpcRegistration routedRpcReg;
64
65     public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
66                                    final NotificationProviderService notificationService,
67                                    final FlowNodeReconciliation reconcliationAgent,
68                                    final DataBroker dataBroker) {
69         this.clusterSingletonService = clusterSingletonService;
70         this.notifListenerRegistration = notificationService.registerNotificationListener(this);
71         this.reconcliationAgent = reconcliationAgent;
72         this.dataBroker = dataBroker;
73         registerNodeListener();
74     }
75
76     public boolean isDeviceMastered(final NodeId nodeId) {
77         return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
78     }
79
80     public boolean isNodeActive(final NodeId nodeId) {
81         final InstanceIdentifier<FlowCapableNode> flowNodeIdentifier = InstanceIdentifier.create(Nodes.class)
82                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
83         return activeNodes.contains(flowNodeIdentifier);
84
85     }
86
87     @VisibleForTesting
88     ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
89         return deviceMasterships;
90     }
91
92     /**
93      * Temporary solution before Mastership manager from plugin. Remove notification
94      * after update. Update node notification should be send only when mastership in
95      * plugin was granted.
96      *
97      * @param notification
98      *            received notification
99      */
100     @Override
101     public void onNodeUpdated(NodeUpdated notification) {
102         LOG.debug("NodeUpdate notification received : {}", notification);
103         DeviceMastership membership = deviceMasterships.computeIfAbsent(notification.getId(),
104             device -> new DeviceMastership(notification.getId(), routedRpcReg));
105         membership.reconcile();
106         membership.registerReconciliationRpc();
107     }
108
109     @Override
110     public void onNodeConnectorUpdated(NodeConnectorUpdated notification) {
111         // Not published by plugin
112     }
113
114     @Override
115     public void onNodeRemoved(NodeRemoved notification) {
116         LOG.debug("NodeRemoved notification received : {}", notification);
117         NodeId nodeId = notification.getNodeRef().getValue().firstKeyOf(Node.class).getId();
118         final DeviceMastership mastership = deviceMasterships.remove(nodeId);
119         if (mastership != null) {
120             mastership.deregisterReconciliationRpc();
121             mastership.close();
122             LOG.info("Unregistered FRM cluster singleton service for service id : {}", nodeId.getValue());
123         }
124     }
125
126     @Override
127     public void onNodeConnectorRemoved(NodeConnectorRemoved notification) {
128         // Not published by plugin
129     }
130
131     @Override
132     public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<FlowCapableNode>> changes) {
133         Preconditions.checkNotNull(changes, "Changes may not be null!");
134
135         for (DataTreeModification<FlowCapableNode> change : changes) {
136             final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().getRootIdentifier();
137             final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
138             final InstanceIdentifier<FlowCapableNode> nodeIdent = key.firstIdentifierOf(FlowCapableNode.class);
139
140             switch (mod.getModificationType()) {
141                 case DELETE:
142                     if (mod.getDataAfter() == null) {
143                         remove(key, mod.getDataBefore(), nodeIdent);
144                     }
145                     break;
146                 case SUBTREE_MODIFIED:
147                     // NO-OP since we do not need to reconcile on Node-updated
148                     break;
149                 case WRITE:
150                     if (mod.getDataBefore() == null) {
151                         add(key, mod.getDataAfter(), nodeIdent);
152                     }
153                     break;
154                 default:
155                     throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());
156             }
157         }
158     }
159
160     public void remove(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode del,
161             InstanceIdentifier<FlowCapableNode> nodeIdent) {
162         if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
163             if (LOG.isDebugEnabled()) {
164                 LOG.debug("Node removed: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
165             }
166
167             if (!nodeIdent.isWildcarded() && activeNodes.contains(nodeIdent)) {
168                 synchronized (lockObj) {
169                     if (activeNodes.contains(nodeIdent)) {
170                         Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
171                         set.remove(nodeIdent);
172                         activeNodes = Collections.unmodifiableSet(set);
173                         setNodeOperationalStatus(nodeIdent, false);
174                     }
175                 }
176             }
177         }
178     }
179
180     public void add(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode add,
181             InstanceIdentifier<FlowCapableNode> nodeIdent) {
182         if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
183             if (LOG.isDebugEnabled()) {
184                 LOG.debug("Node added: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
185             }
186
187             if (!nodeIdent.isWildcarded() && !activeNodes.contains(nodeIdent)) {
188                 synchronized (lockObj) {
189                     if (!activeNodes.contains(nodeIdent)) {
190                         Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
191                         set.add(nodeIdent);
192                         activeNodes = Collections.unmodifiableSet(set);
193                         setNodeOperationalStatus(nodeIdent, true);
194                     }
195                 }
196             }
197         }
198     }
199
200     @Override
201     public void close() {
202         if (listenerRegistration != null) {
203             listenerRegistration.close();
204             listenerRegistration = null;
205         }
206         if (notifListenerRegistration != null) {
207             notifListenerRegistration.close();
208         }
209     }
210
211     private boolean compareInstanceIdentifierTail(InstanceIdentifier<?> identifier1,
212             InstanceIdentifier<?> identifier2) {
213         return Iterables.getLast(identifier1.getPathArguments())
214                 .equals(Iterables.getLast(identifier2.getPathArguments()));
215     }
216
217     private void setNodeOperationalStatus(InstanceIdentifier<FlowCapableNode> nodeIid, boolean status) {
218         NodeId nodeId = nodeIid.firstKeyOf(Node.class).getId();
219         if (nodeId != null && deviceMasterships.containsKey(nodeId)) {
220             deviceMasterships.get(nodeId).setDeviceOperationalStatus(status);
221             LOG.debug("Operational status of device {} is set to {}", nodeId, status);
222         }
223     }
224
225     public void setRoutedRpcReg(RoutedRpcRegistration routedRpcReg) {
226         this.routedRpcReg = routedRpcReg;
227     }
228
229     @SuppressWarnings("IllegalCatch")
230     private void registerNodeListener() {
231
232         final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
233                 .child(Node.class).augmentation(FlowCapableNode.class);
234
235         final DataTreeIdentifier<FlowCapableNode> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
236                 flowNodeWildCardIdentifier);
237
238         try {
239             SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
240                     ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
241
242             listenerRegistration = looper.loopUntilNoException(
243                 () -> dataBroker.registerDataTreeChangeListener(treeId, DeviceMastershipManager.this));
244         } catch (Exception e) {
245             LOG.warn("Data listener registration failed: {}", e.getMessage());
246             LOG.debug("Data listener registration failed ", e);
247             throw new IllegalStateException("Node listener registration failed!", e);
248         }
249     }
250 }