Use ByteBuf.readRetainedSlice()
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / DeviceMastershipManager.java
1 /*
2  * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.Iterables;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.List;
19 import java.util.Set;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.locks.ReentrantLock;
22 import org.checkerframework.checker.lock.qual.Holding;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.mdsal.binding.api.DataBroker;
25 import org.opendaylight.mdsal.binding.api.DataObjectModification;
26 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
27 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.mdsal.binding.api.DataTreeModification;
29 import org.opendaylight.mdsal.binding.api.RpcProviderService;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeService;
33 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
34 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.frm.reconciliation.service.rev180227.ReconcileNodeOutputBuilder;
44 import org.opendaylight.yangtools.concepts.Registration;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.opendaylight.yangtools.yang.common.ErrorType;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Manager for clustering service registrations of {@link DeviceMastership}.
54  */
55 public class DeviceMastershipManager implements DataTreeChangeListener<FlowCapableNode>, AutoCloseable,
56         MastershipChangeService {
57     private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
58     private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE = InstanceIdentifier
59             .builder(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class).build();
60
61     private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap<>();
62     private final Set<InstanceIdentifier<FlowCapableNode>> activeNodes = ConcurrentHashMap.newKeySet();
63     private final ReentrantLock lock = new ReentrantLock();
64     private final FlowNodeReconciliation reconcliationAgent;
65     private final RpcProviderService rpcProviderService;
66
67     private Registration listenerRegistration;
68     private Registration mastershipChangeServiceRegistration;
69
70     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Non-final for mocking")
71     public DeviceMastershipManager(final FlowNodeReconciliation reconcliationAgent,
72                                    final DataBroker dataBroker,
73                                    final MastershipChangeServiceManager mastershipChangeServiceManager,
74                                    final RpcProviderService rpcProviderService) {
75         this.reconcliationAgent = reconcliationAgent;
76         this.rpcProviderService = rpcProviderService;
77         listenerRegistration = dataBroker.registerTreeChangeListener(
78             DataTreeIdentifier.of(LogicalDatastoreType.OPERATIONAL,
79                 InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)), this);
80         mastershipChangeServiceRegistration = mastershipChangeServiceManager.register(this);
81     }
82
83     public boolean isDeviceMastered(final NodeId nodeId) {
84         return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
85     }
86
87     public boolean isNodeActive(final NodeId nodeId) {
88         return activeNodes.contains(InstanceIdentifier.create(Nodes.class)
89             .child(Node.class, new NodeKey(nodeId))
90             .augmentation(FlowCapableNode.class));
91     }
92
93     @VisibleForTesting
94     ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
95         return deviceMasterships;
96     }
97
98     @Override
99     public void onDataTreeChanged(final List<DataTreeModification<FlowCapableNode>> changes) {
100         for (DataTreeModification<FlowCapableNode> change : changes) {
101             final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().path();
102             final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
103             final InstanceIdentifier<FlowCapableNode> nodeIdent = key.firstIdentifierOf(FlowCapableNode.class);
104
105             switch (mod.modificationType()) {
106                 case DELETE:
107                     if (mod.dataAfter() == null) {
108                         remove(key, mod.dataBefore(), nodeIdent);
109                     }
110                     break;
111                 case SUBTREE_MODIFIED:
112                     // NO-OP since we do not need to reconcile on Node-updated
113                     break;
114                 case WRITE:
115                     if (mod.dataBefore() == null) {
116                         add(key, mod.dataAfter(), nodeIdent);
117                     }
118                     break;
119                 default:
120                     throw new IllegalArgumentException("Unhandled modification type " + mod.modificationType());
121             }
122         }
123     }
124
125     public void remove(final InstanceIdentifier<FlowCapableNode> identifier, final FlowCapableNode del,
126             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
127         if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
128             if (LOG.isDebugEnabled()) {
129                 LOG.debug("Node removed: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
130             }
131
132             if (!nodeIdent.isWildcarded() && activeNodes.remove(nodeIdent)) {
133                 lock.lock();
134                 try {
135                     reconcliationAgent.flowNodeDisconnected(nodeIdent);
136                     setNodeOperationalStatus(nodeIdent, false);
137                 } finally {
138                     lock.unlock();
139                 }
140             }
141         }
142     }
143
144     public void add(final InstanceIdentifier<FlowCapableNode> identifier, final FlowCapableNode add,
145             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
146         if (compareInstanceIdentifierTail(identifier, II_TO_FLOW_CAPABLE_NODE)) {
147             if (LOG.isDebugEnabled()) {
148                 LOG.debug("Node added: {}", nodeIdent.firstKeyOf(Node.class).getId().getValue());
149             }
150
151             if (!nodeIdent.isWildcarded() && activeNodes.add(nodeIdent)) {
152                 lock.lock();
153                 try {
154                     setNodeOperationalStatus(nodeIdent, true);
155                 } finally {
156                     lock.unlock();
157                 }
158             }
159         }
160     }
161
162     @Override
163     public void close() throws Exception {
164         if (listenerRegistration != null) {
165             listenerRegistration.close();
166             listenerRegistration = null;
167         }
168         if (mastershipChangeServiceRegistration != null) {
169             mastershipChangeServiceRegistration.close();
170             mastershipChangeServiceRegistration = null;
171         }
172     }
173
174     private static boolean compareInstanceIdentifierTail(final InstanceIdentifier<?> identifier1,
175             final InstanceIdentifier<?> identifier2) {
176         return Iterables.getLast(identifier1.getPathArguments())
177                 .equals(Iterables.getLast(identifier2.getPathArguments()));
178     }
179
180     @Holding("lockObj")
181     private void setNodeOperationalStatus(final InstanceIdentifier<FlowCapableNode> nodeIid, final boolean status) {
182         final var nodeId = nodeIid.firstKeyOf(Node.class).getId();
183         if (nodeId == null) {
184             return;
185         }
186         final var mastership = deviceMasterships.get(nodeId);
187         if (mastership != null) {
188             mastership.setDeviceOperationalStatus(status);
189             LOG.debug("Operational status of device {} is set to {}", nodeId, status);
190         }
191     }
192
193     @Override
194     public void onBecomeOwner(@NonNull final DeviceInfo deviceInfo) {
195         LOG.debug("Mastership role notification received for device : {}", deviceInfo.getDatapathId());
196         final var membership = deviceMasterships.computeIfAbsent(deviceInfo.getNodeId(),
197             device -> new DeviceMastership(deviceInfo.getNodeId()));
198         membership.reconcile();
199         membership.registerReconcileNode(rpcProviderService, this::reconcileNode);
200     }
201
202     private ListenableFuture<RpcResult<ReconcileNodeOutput>> reconcileNode(final ReconcileNodeInput input) {
203         final var nodeId = input.requireNodeId();
204         LOG.debug("Triggering reconciliation for node: {}", nodeId);
205
206         final var nodeDpn = new NodeBuilder().setId(new NodeId("openflow:" + nodeId)).build();
207         final var connectedNode = InstanceIdentifier.builder(Nodes.class)
208                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
209                 .build();
210         final var rpcResult = SettableFuture.<RpcResult<ReconcileNodeOutput>>create();
211         Futures.addCallback(reconcliationAgent.reconcileConfiguration(connectedNode), new FutureCallback<>() {
212             @Override
213             public void onSuccess(final Boolean result) {
214                 rpcResult.set(result
215                     ? RpcResultBuilder.success(new ReconcileNodeOutputBuilder().setResult(result).build()).build()
216                         : RpcResultBuilder.<ReconcileNodeOutput>failed()
217                             .withError(ErrorType.APPLICATION, "Error while triggering reconciliation")
218                             .build());
219             }
220
221             @Override
222             public void onFailure(final Throwable error) {
223                 LOG.error("initReconciliation failed", error);
224                 rpcResult.set(RpcResultBuilder.<ReconcileNodeOutput>failed()
225                         .withError(ErrorType.RPC, "Error while calling RPC").build());
226             }
227         }, MoreExecutors.directExecutor());
228
229         LOG.debug("Completing reconciliation for node: {}", nodeId);
230         return rpcResult;
231     }
232
233     @Override
234     public void onLoseOwnership(@NonNull final DeviceInfo deviceInfo) {
235         final var mastership = deviceMasterships.remove(deviceInfo.getNodeId());
236         if (mastership != null) {
237             mastership.deregisterReconcileNode();
238             mastership.close();
239             LOG.debug("Unregistered deviceMastership for device : {}", deviceInfo.getNodeId());
240         }
241     }
242 }