2 * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.locks.ReentrantLock;
22 import org.checkerframework.checker.lock.qual.Holding;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.mdsal.binding.api.DataBroker;
25 import org.opendaylight.mdsal.binding.api.DataObjectModification;
26 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
27 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.mdsal.binding.api.DataTreeModification;
29 import org.opendaylight.mdsal.binding.api.RpcProviderService;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeService;
33 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
34 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutputBuilder;
44 import org.opendaylight.yangtools.concepts.Registration;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.ErrorType;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Manager for clustering service registrations of {@link DeviceMastership}.
55 public class DeviceMastershipManager implements DataTreeChangeListener<FlowCapableNode>, AutoCloseable,
56 MastershipChangeService {
57 private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
58 private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
59 .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
61 private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap<>();
62 private final Set<InstanceIdentifier<FlowCapableNode>> activeNodes = ConcurrentHashMap.newKeySet();
63 private final ReentrantLock lock = new ReentrantLock();
64 private final FlowNodeReconciliation reconcliationAgent;
65 private final RpcProviderService rpcProviderService;
67 private Registration listenerRegistration;
68 private Registration mastershipChangeServiceRegistration;
70 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Non-final for mocking")
71 public DeviceMastershipManager(final FlowNodeReconciliation reconcliationAgent,
72 final DataBroker dataBroker,
73 final MastershipChangeServiceManager mastershipChangeServiceManager,
74 final RpcProviderService rpcProviderService) {
75 this.reconcliationAgent = reconcliationAgent;
76 this.rpcProviderService = rpcProviderService;
77 listenerRegistration = dataBroker.registerTreeChangeListener(
78 DataTreeIdentifier.of(LogicalDatastoreType.OPERATIONAL,
79 InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)), this);
80 mastershipChangeServiceRegistration = mastershipChangeServiceManager.register(this);
83 public boolean isDeviceMastered(final NodeId nodeId) {
84 return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
87 public boolean isNodeActive(final NodeId nodeId) {
88 return activeNodes.contains(InstanceIdentifier.create(Nodes.class)
89 .child(Node.class, new NodeKey(nodeId))
90 .augmentation(FlowCapableNode.class));
94 ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
95 return deviceMasterships;
99 public void onDataTreeChanged(final List<DataTreeModification<FlowCapableNode>> changes) {
100 for (DataTreeModification<FlowCapableNode> change : changes) {
101 final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().path();
102 final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
103 final InstanceIdentifier<FlowCapableNode> nodeIdent = key.firstIdentifierOf(FlowCapableNode.class);
105 switch (mod.modificationType()) {
107 if (mod.dataAfter() == null) {
108 remove(key, mod.dataBefore(), nodeIdent);
111 case SUBTREE_MODIFIED:
112 // NO-OP since we do not need to reconcile on Node-updated
115 if (mod.dataBefore() == null) {
116 add(key, mod.dataAfter(), nodeIdent);
120 throw new IllegalArgumentException("Unhandled modification type " + mod.modificationType());
125 public void remove(final InstanceIdentifier<FlowCapableNode> identifier, final FlowCapableNode del,
126 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
127 if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
128 if (LOG.isDebugEnabled()) {
129 LOG.debug("Node removed: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
132 if (!nodeIdent.isWildcarded() && activeNodes.remove(nodeIdent)) {
135 reconcliationAgent.flowNodeDisconnected(nodeIdent);
136 setNodeOperationalStatus(nodeIdent, false);
144 public void add(final InstanceIdentifier<FlowCapableNode> identifier, final FlowCapableNode add,
145 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
146 if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
147 if (LOG.isDebugEnabled()) {
148 LOG.debug("Node added: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
151 if (!nodeIdent.isWildcarded() && activeNodes.add(nodeIdent)) {
154 setNodeOperationalStatus(nodeIdent, true);
163 public void close() throws Exception {
164 if (listenerRegistration != null) {
165 listenerRegistration.close();
166 listenerRegistration = null;
168 if (mastershipChangeServiceRegistration != null) {
169 mastershipChangeServiceRegistration.close();
170 mastershipChangeServiceRegistration = null;
174 private static boolean compareInstanceIdentifierTail(final InstanceIdentifier<?> identifier1,
175 final InstanceIdentifier<?> identifier2) {
176 return Iterables.getLast(identifier1.getPathArguments())
177 .equals(Iterables.getLast(identifier2.getPathArguments()));
181 private void setNodeOperationalStatus(final InstanceIdentifier<FlowCapableNode> nodeIid, final boolean status) {
182 final var nodeId = nodeIid.firstKeyOf(Node.class).getId();
183 if (nodeId == null) {
186 final var mastership = deviceMasterships.get(nodeId);
187 if (mastership != null) {
188 mastership.setDeviceOperationalStatus(status);
189 LOG.debug("Operational status of device {} is set to {}", nodeId, status);
194 public void onBecomeOwner(@NonNull final DeviceInfo deviceInfo) {
195 LOG.debug("Mastership role notification received for device : {}", deviceInfo.getDatapathId());
196 final var membership = deviceMasterships.computeIfAbsent(deviceInfo.getNodeId(),
197 device -> new DeviceMastership(deviceInfo.getNodeId()));
198 membership.reconcile();
199 membership.registerReconcileNode(rpcProviderService, this::reconcileNode);
202 private ListenableFuture<RpcResult<ReconcileNodeOutput>> reconcileNode(final ReconcileNodeInput input) {
203 final var nodeId = input.requireNodeId();
204 LOG.debug("Triggering reconciliation for node: {}", nodeId);
206 final var nodeDpn = new NodeBuilder().setId(new NodeId("openflow:" + nodeId)).build();
207 final var connectedNode = InstanceIdentifier.builder(Nodes.class)
208 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
210 final var rpcResult = SettableFuture.<RpcResult<ReconcileNodeOutput>>create();
211 Futures.addCallback(reconcliationAgent.reconcileConfiguration(connectedNode), new FutureCallback<>() {
213 public void onSuccess(final Boolean result) {
215 ? RpcResultBuilder.success(new ReconcileNodeOutputBuilder().setResult(result).build()).build()
216 : RpcResultBuilder.<ReconcileNodeOutput>failed()
217 .withError(ErrorType.APPLICATION, "Error while triggering reconciliation")
222 public void onFailure(final Throwable error) {
223 LOG.error("initReconciliation failed", error);
224 rpcResult.set(RpcResultBuilder.<ReconcileNodeOutput>failed()
225 .withError(ErrorType.RPC, "Error while calling RPC").build());
227 }, MoreExecutors.directExecutor());
229 LOG.debug("Completing reconciliation for node: {}", nodeId);
234 public void onLoseOwnership(@NonNull final DeviceInfo deviceInfo) {
235 final var mastership = deviceMasterships.remove(deviceInfo.getNodeId());
236 if (mastership != null) {
237 mastership.deregisterReconcileNode();
239 LOG.debug("Unregistered deviceMastership for device : {}", deviceInfo.getNodeId());