2 * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frm.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Iterables;
14 import com.google.common.collect.Sets;
15 import java.util.Collection;
16 import java.util.Collections;
18 import java.util.concurrent.ConcurrentHashMap;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
27 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
28 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
29 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
30 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Manager for clustering service registrations of {@link DeviceMastership}.
49 public class DeviceMastershipManager
50 implements ClusteredDataTreeChangeListener<FlowCapableNode>, OpendaylightInventoryListener, AutoCloseable {
51 private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
52 private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
53 .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
55 private final ClusterSingletonServiceProvider clusterSingletonService;
56 private final ListenerRegistration<?> notifListenerRegistration;
57 private final FlowNodeReconciliation reconcliationAgent;
58 private final DataBroker dataBroker;
59 private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
60 private final Object lockObj = new Object();
61 private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
62 private Set<InstanceIdentifier<FlowCapableNode>> activeNodes = Collections.emptySet();
63 private RoutedRpcRegistration routedRpcReg;
65 public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
66 final NotificationProviderService notificationService,
67 final FlowNodeReconciliation reconcliationAgent,
68 final DataBroker dataBroker) {
69 this.clusterSingletonService = clusterSingletonService;
70 this.notifListenerRegistration = notificationService.registerNotificationListener(this);
71 this.reconcliationAgent = reconcliationAgent;
72 this.dataBroker = dataBroker;
73 registerNodeListener();
76 public boolean isDeviceMastered(final NodeId nodeId) {
77 return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
80 public boolean isNodeActive(final NodeId nodeId) {
81 final InstanceIdentifier<FlowCapableNode> flowNodeIdentifier = InstanceIdentifier.create(Nodes.class)
82 .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
83 return activeNodes.contains(flowNodeIdentifier);
88 ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
89 return deviceMasterships;
93 * Temporary solution before Mastership manager from plugin. Remove notification
94 * after update. Update node notification should be send only when mastership in
98 * received notification
101 public void onNodeUpdated(NodeUpdated notification) {
102 LOG.debug("NodeUpdate notification received : {}", notification);
103 DeviceMastership membership = deviceMasterships.computeIfAbsent(notification.getId(),
104 device -> new DeviceMastership(notification.getId(), routedRpcReg));
105 membership.reconcile();
106 membership.registerReconciliationRpc();
110 public void onNodeConnectorUpdated(NodeConnectorUpdated notification) {
111 // Not published by plugin
115 public void onNodeRemoved(NodeRemoved notification) {
116 LOG.debug("NodeRemoved notification received : {}", notification);
117 NodeId nodeId = notification.getNodeRef().getValue().firstKeyOf(Node.class).getId();
118 final DeviceMastership mastership = deviceMasterships.remove(nodeId);
119 if (mastership != null) {
120 mastership.deregisterReconciliationRpc();
122 LOG.info("Unregistered FRM cluster singleton service for service id : {}", nodeId.getValue());
127 public void onNodeConnectorRemoved(NodeConnectorRemoved notification) {
128 // Not published by plugin
132 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<FlowCapableNode>> changes) {
133 Preconditions.checkNotNull(changes, "Changes may not be null!");
135 for (DataTreeModification<FlowCapableNode> change : changes) {
136 final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().getRootIdentifier();
137 final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
138 final InstanceIdentifier<FlowCapableNode> nodeIdent = key.firstIdentifierOf(FlowCapableNode.class);
140 switch (mod.getModificationType()) {
142 if (mod.getDataAfter() == null) {
143 remove(key, mod.getDataBefore(), nodeIdent);
146 case SUBTREE_MODIFIED:
147 // NO-OP since we do not need to reconcile on Node-updated
150 if (mod.getDataBefore() == null) {
151 add(key, mod.getDataAfter(), nodeIdent);
155 throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());
160 public void remove(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode del,
161 InstanceIdentifier<FlowCapableNode> nodeIdent) {
162 if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
163 if (LOG.isDebugEnabled()) {
164 LOG.debug("Node removed: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
167 if (!nodeIdent.isWildcarded() && activeNodes.contains(nodeIdent)) {
168 synchronized (lockObj) {
169 if (activeNodes.contains(nodeIdent)) {
170 Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
171 set.remove(nodeIdent);
172 activeNodes = Collections.unmodifiableSet(set);
173 setNodeOperationalStatus(nodeIdent, false);
180 public void add(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode add,
181 InstanceIdentifier<FlowCapableNode> nodeIdent) {
182 if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
183 if (LOG.isDebugEnabled()) {
184 LOG.debug("Node added: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
187 if (!nodeIdent.isWildcarded() && !activeNodes.contains(nodeIdent)) {
188 synchronized (lockObj) {
189 if (!activeNodes.contains(nodeIdent)) {
190 Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
192 activeNodes = Collections.unmodifiableSet(set);
193 setNodeOperationalStatus(nodeIdent, true);
201 public void close() {
202 if (listenerRegistration != null) {
203 listenerRegistration.close();
204 listenerRegistration = null;
206 if (notifListenerRegistration != null) {
207 notifListenerRegistration.close();
211 private boolean compareInstanceIdentifierTail(InstanceIdentifier<?> identifier1,
212 InstanceIdentifier<?> identifier2) {
213 return Iterables.getLast(identifier1.getPathArguments())
214 .equals(Iterables.getLast(identifier2.getPathArguments()));
217 private void setNodeOperationalStatus(InstanceIdentifier<FlowCapableNode> nodeIid, boolean status) {
218 NodeId nodeId = nodeIid.firstKeyOf(Node.class).getId();
219 if (nodeId != null && deviceMasterships.containsKey(nodeId)) {
220 deviceMasterships.get(nodeId).setDeviceOperationalStatus(status);
221 LOG.debug("Operational status of device {} is set to {}", nodeId, status);
225 public void setRoutedRpcReg(RoutedRpcRegistration routedRpcReg) {
226 this.routedRpcReg = routedRpcReg;
229 @SuppressWarnings("IllegalCatch")
230 private void registerNodeListener() {
232 final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
233 .child(Node.class).augmentation(FlowCapableNode.class);
235 final DataTreeIdentifier<FlowCapableNode> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
236 flowNodeWildCardIdentifier);
239 SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
240 ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
242 listenerRegistration = looper.loopUntilNoException(
243 () -> dataBroker.registerDataTreeChangeListener(treeId, DeviceMastershipManager.this));
244 } catch (Exception e) {
245 LOG.warn("Data listener registration failed: {}", e.getMessage());
246 LOG.debug("Data listener registration failed ", e);
247 throw new IllegalStateException("Node listener registration failed!", e);