2 * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frm.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Iterables;
14 import com.google.common.collect.Sets;
15 import java.util.Collection;
16 import java.util.Collections;
18 import java.util.concurrent.ConcurrentHashMap;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
27 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
28 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
29 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Manager for clustering service registrations of {@link DeviceMastership}.
48 public class DeviceMastershipManager
49 implements ClusteredDataTreeChangeListener<FlowCapableNode>, OpendaylightInventoryListener, AutoCloseable {
50 private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
51 private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
52 .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
54 private final ClusterSingletonServiceProvider clusterSingletonService;
55 private final ListenerRegistration<?> notifListenerRegistration;
56 private final FlowNodeReconciliation reconcliationAgent;
57 private final DataBroker dataBroker;
58 private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
59 private final Object lockObj = new Object();
60 private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
61 private Set<InstanceIdentifier<FlowCapableNode>> activeNodes = Collections.emptySet();
63 public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
64 final NotificationProviderService notificationService, final FlowNodeReconciliation reconcliationAgent,
65 final DataBroker dataBroker) {
66 this.clusterSingletonService = clusterSingletonService;
67 this.notifListenerRegistration = notificationService.registerNotificationListener(this);
68 this.reconcliationAgent = reconcliationAgent;
69 this.dataBroker = dataBroker;
70 registerNodeListener();
73 public boolean isDeviceMastered(final NodeId nodeId) {
74 return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
77 public boolean isNodeActive(final NodeId nodeId) {
78 final InstanceIdentifier<FlowCapableNode> flowNodeIdentifier = InstanceIdentifier.create(Nodes.class)
79 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
80 return activeNodes.contains(flowNodeIdentifier);
85 ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
86 return deviceMasterships;
90 * Temporary solution before Mastership manager from plugin. Remove notification
91 * after update. Update node notification should be send only when mastership in
95 * received notification
98 public void onNodeUpdated(NodeUpdated notification) {
99 LOG.debug("NodeUpdate notification received : {}", notification);
100 DeviceMastership membership = deviceMasterships.computeIfAbsent(notification.getId(),
101 device -> new DeviceMastership(notification.getId()));
102 membership.reconcile();
106 public void onNodeConnectorUpdated(NodeConnectorUpdated notification) {
107 // Not published by plugin
111 public void onNodeRemoved(NodeRemoved notification) {
112 LOG.debug("NodeRemoved notification received : {}", notification);
113 NodeId nodeId = notification.getNodeRef().getValue().firstKeyOf(Node.class).getId();
114 final DeviceMastership mastership = deviceMasterships.remove(nodeId);
115 if (mastership != null) {
117 LOG.info("Unregistered FRM cluster singleton service for service id : {}", nodeId.getValue());
122 public void onNodeConnectorRemoved(NodeConnectorRemoved notification) {
123 // Not published by plugin
127 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<FlowCapableNode>> changes) {
128 Preconditions.checkNotNull(changes, "Changes may not be null!");
130 for (DataTreeModification<FlowCapableNode> change : changes) {
131 final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().getRootIdentifier();
132 final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
133 final InstanceIdentifier<FlowCapableNode> nodeIdent = key.firstIdentifierOf(FlowCapableNode.class);
135 switch (mod.getModificationType()) {
137 if (mod.getDataAfter() == null) {
138 remove(key, mod.getDataBefore(), nodeIdent);
141 case SUBTREE_MODIFIED:
142 // NO-OP since we do not need to reconcile on Node-updated
145 if (mod.getDataBefore() == null) {
146 add(key, mod.getDataAfter(), nodeIdent);
150 throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());
155 public void remove(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode del,
156 InstanceIdentifier<FlowCapableNode> nodeIdent) {
157 if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
158 if (LOG.isDebugEnabled()) {
159 LOG.debug("Node removed: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
162 if (!nodeIdent.isWildcarded()) {
163 if (activeNodes.contains(nodeIdent)) {
164 synchronized (lockObj) {
165 if (activeNodes.contains(nodeIdent)) {
166 Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
167 set.remove(nodeIdent);
168 activeNodes = Collections.unmodifiableSet(set);
169 setNodeOperationalStatus(nodeIdent, false);
178 public void add(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode add,
179 InstanceIdentifier<FlowCapableNode> nodeIdent) {
180 if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
181 if (LOG.isDebugEnabled()) {
182 LOG.debug("Node added: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
185 if (!nodeIdent.isWildcarded()) {
186 if (!activeNodes.contains(nodeIdent)) {
187 synchronized (lockObj) {
188 if (!activeNodes.contains(nodeIdent)) {
189 Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
191 activeNodes = Collections.unmodifiableSet(set);
192 setNodeOperationalStatus(nodeIdent, true);
201 public void close() {
202 if (listenerRegistration != null) {
203 listenerRegistration.close();
204 listenerRegistration = null;
206 if (notifListenerRegistration != null) {
207 notifListenerRegistration.close();
211 private boolean compareInstanceIdentifierTail(InstanceIdentifier<?> identifier1,
212 InstanceIdentifier<?> identifier2) {
213 return Iterables.getLast(identifier1.getPathArguments())
214 .equals(Iterables.getLast(identifier2.getPathArguments()));
217 private void setNodeOperationalStatus(InstanceIdentifier<FlowCapableNode> nodeIid, boolean status) {
218 NodeId nodeId = nodeIid.firstKeyOf(Node.class).getId();
219 if (nodeId != null) {
220 if (deviceMasterships.containsKey(nodeId)) {
221 deviceMasterships.get(nodeId).setDeviceOperationalStatus(status);
222 LOG.debug("Operational status of device {} is set to {}", nodeId, status);
227 @SuppressWarnings("IllegalCatch")
228 private void registerNodeListener() {
230 final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
231 .child(Node.class).augmentation(FlowCapableNode.class);
233 final DataTreeIdentifier<FlowCapableNode> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
234 flowNodeWildCardIdentifier);
237 SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
238 ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
240 listenerRegistration = looper.loopUntilNoException(
241 () -> dataBroker.registerDataTreeChangeListener(treeId, DeviceMastershipManager.this));
242 } catch (Exception e) {
243 LOG.warn("Data listener registration failed: {}", e.getMessage());
244 LOG.debug("Data listener registration failed ", e);
245 throw new IllegalStateException("Node listener registration failed!", e);