2e27d697411f722df0c0c7f3d6cf4c09cebd9825
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / DeviceMastershipManager.java
1 /**
2  * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frm.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Iterables;
14 import com.google.common.collect.Sets;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Set;
18 import java.util.concurrent.ConcurrentHashMap;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
27 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
28 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
29 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Manager for clustering service registrations of {@link DeviceMastership}.
47  */
48 public class DeviceMastershipManager
49         implements ClusteredDataTreeChangeListener<FlowCapableNode>, OpendaylightInventoryListener, AutoCloseable {
50     private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
51     private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
52             .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
53
54     private final ClusterSingletonServiceProvider clusterSingletonService;
55     private final ListenerRegistration<?> notifListenerRegistration;
56     private final FlowNodeReconciliation reconcliationAgent;
57     private final DataBroker dataBroker;
58     private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
59     private final Object lockObj = new Object();
60     private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
61     private Set<InstanceIdentifier<FlowCapableNode>> activeNodes = Collections.emptySet();
62
63     public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
64             final NotificationProviderService notificationService, final FlowNodeReconciliation reconcliationAgent,
65             final DataBroker dataBroker) {
66         this.clusterSingletonService = clusterSingletonService;
67         this.notifListenerRegistration = notificationService.registerNotificationListener(this);
68         this.reconcliationAgent = reconcliationAgent;
69         this.dataBroker = dataBroker;
70         registerNodeListener();
71     }
72
73     public boolean isDeviceMastered(final NodeId nodeId) {
74         return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
75     }
76
77     public boolean isNodeActive(final NodeId nodeId) {
78         final InstanceIdentifier<FlowCapableNode> flowNodeIdentifier = InstanceIdentifier.create(Nodes.class)
79                 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
80         return activeNodes.contains(flowNodeIdentifier);
81
82     }
83
84     @VisibleForTesting
85     ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
86         return deviceMasterships;
87     }
88
89     /**
90      * Temporary solution before Mastership manager from plugin. Remove notification
91      * after update. Update node notification should be send only when mastership in
92      * plugin was granted.
93      *
94      * @param notification
95      *            received notification
96      */
97     @Override
98     public void onNodeUpdated(NodeUpdated notification) {
99         LOG.debug("NodeUpdate notification received : {}", notification);
100         DeviceMastership membership = deviceMasterships.computeIfAbsent(notification.getId(),
101             device -> new DeviceMastership(notification.getId()));
102         membership.reconcile();
103     }
104
105     @Override
106     public void onNodeConnectorUpdated(NodeConnectorUpdated notification) {
107         // Not published by plugin
108     }
109
110     @Override
111     public void onNodeRemoved(NodeRemoved notification) {
112         LOG.debug("NodeRemoved notification received : {}", notification);
113         NodeId nodeId = notification.getNodeRef().getValue().firstKeyOf(Node.class).getId();
114         final DeviceMastership mastership = deviceMasterships.remove(nodeId);
115         if (mastership != null) {
116             mastership.close();
117             LOG.info("Unregistered FRM cluster singleton service for service id : {}", nodeId.getValue());
118         }
119     }
120
121     @Override
122     public void onNodeConnectorRemoved(NodeConnectorRemoved notification) {
123         // Not published by plugin
124     }
125
126     @Override
127     public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<FlowCapableNode>> changes) {
128         Preconditions.checkNotNull(changes, "Changes may not be null!");
129
130         for (DataTreeModification<FlowCapableNode> change : changes) {
131             final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().getRootIdentifier();
132             final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
133             final InstanceIdentifier<FlowCapableNode> nodeIdent = key.firstIdentifierOf(FlowCapableNode.class);
134
135             switch (mod.getModificationType()) {
136                 case DELETE:
137                     if (mod.getDataAfter() == null) {
138                         remove(key, mod.getDataBefore(), nodeIdent);
139                     }
140                     break;
141                 case SUBTREE_MODIFIED:
142                     // NO-OP since we do not need to reconcile on Node-updated
143                     break;
144                 case WRITE:
145                     if (mod.getDataBefore() == null) {
146                         add(key, mod.getDataAfter(), nodeIdent);
147                     }
148                     break;
149                 default:
150                     throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());
151             }
152         }
153     }
154
155     public void remove(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode del,
156             InstanceIdentifier<FlowCapableNode> nodeIdent) {
157         if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
158             if (LOG.isDebugEnabled()) {
159                 LOG.debug("Node removed: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
160             }
161
162             if (!nodeIdent.isWildcarded()) {
163                 if (activeNodes.contains(nodeIdent)) {
164                     synchronized (lockObj) {
165                         if (activeNodes.contains(nodeIdent)) {
166                             Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
167                             set.remove(nodeIdent);
168                             activeNodes = Collections.unmodifiableSet(set);
169                             setNodeOperationalStatus(nodeIdent, false);
170                         }
171                     }
172                 }
173             }
174
175         }
176     }
177
178     public void add(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode add,
179             InstanceIdentifier<FlowCapableNode> nodeIdent) {
180         if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
181             if (LOG.isDebugEnabled()) {
182                 LOG.debug("Node added: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
183             }
184
185             if (!nodeIdent.isWildcarded()) {
186                 if (!activeNodes.contains(nodeIdent)) {
187                     synchronized (lockObj) {
188                         if (!activeNodes.contains(nodeIdent)) {
189                             Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
190                             set.add(nodeIdent);
191                             activeNodes = Collections.unmodifiableSet(set);
192                             setNodeOperationalStatus(nodeIdent, true);
193                         }
194                     }
195                 }
196             }
197         }
198     }
199
200     @Override
201     public void close() {
202         if (listenerRegistration != null) {
203             listenerRegistration.close();
204             listenerRegistration = null;
205         }
206         if (notifListenerRegistration != null) {
207             notifListenerRegistration.close();
208         }
209     }
210
211     private boolean compareInstanceIdentifierTail(InstanceIdentifier<?> identifier1,
212             InstanceIdentifier<?> identifier2) {
213         return Iterables.getLast(identifier1.getPathArguments())
214                 .equals(Iterables.getLast(identifier2.getPathArguments()));
215     }
216
217     private void setNodeOperationalStatus(InstanceIdentifier<FlowCapableNode> nodeIid, boolean status) {
218         NodeId nodeId = nodeIid.firstKeyOf(Node.class).getId();
219         if (nodeId != null) {
220             if (deviceMasterships.containsKey(nodeId)) {
221                 deviceMasterships.get(nodeId).setDeviceOperationalStatus(status);
222                 LOG.debug("Operational status of device {} is set to {}", nodeId, status);
223             }
224         }
225     }
226
227     @SuppressWarnings("IllegalCatch")
228     private void registerNodeListener() {
229
230         final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
231                 .child(Node.class).augmentation(FlowCapableNode.class);
232
233         final DataTreeIdentifier<FlowCapableNode> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
234                 flowNodeWildCardIdentifier);
235
236         try {
237             SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
238                     ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
239
240             listenerRegistration = looper.loopUntilNoException(
241                 () -> dataBroker.registerDataTreeChangeListener(treeId, DeviceMastershipManager.this));
242         } catch (Exception e) {
243             LOG.warn("Data listener registration failed: {}", e.getMessage());
244             LOG.debug("Data listener registration failed ", e);
245             throw new IllegalStateException("Node listener registration failed!", e);
246         }
247     }
248 }