2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.role;
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Throwables;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import java.util.Iterator;
21 import java.util.Map.Entry;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.Semaphore;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
31 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
43 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
44 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
51 * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
52 * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
54 * Hands over to StatisticsManager at the end.
56 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
57 private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
59 private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
60 private final DataBroker dataBroker;
61 private final EntityOwnershipService entityOwnershipService;
62 private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
63 private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
64 private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
65 private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
66 private final boolean switchFeaturesMandatory;
68 public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
69 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
70 this.dataBroker = Preconditions.checkNotNull(dataBroker);
71 this.switchFeaturesMandatory = switchFeaturesMandatory;
72 this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
73 this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
74 LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
78 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
79 deviceInitializationPhaseHandler = handler;
83 public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
84 LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
85 final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
86 makeEntity(deviceContext.getDeviceState().getNodeId()),
87 makeTxEntity(deviceContext.getDeviceState().getNodeId()));
89 Verify.verify(contexts.putIfAbsent(roleContext.getEntity(), roleContext) == null, "RoleCtx for master Node {} is still not closed.", deviceContext.getDeviceState().getNodeId());
90 Verify.verify(!txContexts.containsKey(roleContext.getTxEntity()),
91 "RoleCtx for master Node {} is still not closed. TxEntity was not unregistered yet.", deviceContext.getDeviceState().getNodeId());
93 // if the device context gets closed (mostly on connection close), we would need to cleanup
94 deviceContext.addDeviceContextClosedHandler(this);
95 roleContext.initialization();
96 deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
100 public void close() {
101 entityOwnershipListenerRegistration.close();
102 txEntityOwnershipListenerRegistration.close();
103 for (final Iterator<Entry<Entity, RoleContext>> iterator = Iterators.consumingIterator(contexts.entrySet()
104 .iterator()); iterator.hasNext();) {
105 // got here because last known role is LEADER and DS might need clearing up
106 final Entry<Entity, RoleContext> entry = iterator.next();
107 final RoleContext roleCtx = entry.getValue();
108 final NodeId nodeId = roleCtx.getDeviceState().getNodeId();
109 if (OfpRole.BECOMEMASTER.equals(roleCtx.getDeviceState().getRole())) {
110 LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; "
111 + "cleaning DS as being probably the last owner", nodeId);
112 removeDeviceFromOperDS(roleCtx);
114 // NOOP - there is another owner
115 LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; "
116 + "leaving DS untouched", nodeId);
118 roleCtx.suspendTxCandidate();
119 txContexts.remove(roleCtx.getTxEntity(), roleCtx);
125 public void onDeviceContextClosed(final DeviceContext deviceContext) {
126 final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
127 LOG.debug("onDeviceContextClosed for node {}", nodeId);
128 final Entity entity = makeEntity(nodeId);
129 final RoleContext roleContext = contexts.get(entity);
130 if (roleContext != null) {
131 LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
132 final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
133 if (actState.isPresent()) {
134 if (actState.get().isOwner()) {
135 if (!txContexts.containsKey(roleContext.getTxEntity())) {
137 txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
138 roleContext.setupTxCandidate();
139 // we'd like to wait for registration response
141 } catch (final CandidateAlreadyRegisteredException e) {
146 LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
147 contexts.remove(entity, roleContext);
148 // TODO : is there a chance to have TxEntity ?
151 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
157 private static Entity makeEntity(final NodeId nodeId) {
158 return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
161 private static Entity makeTxEntity(final NodeId nodeId) {
162 return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
166 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
167 Preconditions.checkArgument(ownershipChange != null);
168 RoleContext roleContext = null;
170 roleContext = contexts.get(ownershipChange.getEntity());
171 if (roleContext != null) {
172 changeForEntity(ownershipChange, roleContext);
176 roleContext = txContexts.get(ownershipChange.getEntity());
177 if (roleContext != null) {
178 changeForTxEntity(ownershipChange, roleContext);
181 } catch (final Exception e) {
182 LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity(), e);
183 if (roleContext != null) {
184 roleContext.getDeviceContext().close();
188 LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
189 ownershipChange.getEntity(), ownershipChange);
192 private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext)
193 throws InterruptedException {
194 LOG.info("Received TX-EntityOwnershipChange:{}", ownershipChange);
195 final Semaphore txCandidateGuard = roleContext.getTxCandidateGuard();
196 LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
197 txCandidateGuard.acquire();
199 ListenableFuture<Void> processingClosure;
200 final DeviceContext deviceContext = roleContext.getDeviceContext();
201 final NodeId nodeId = roleContext.getDeviceState().getNodeId();
203 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
204 // SLAVE -> MASTER - acquired transition lock
205 LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
207 // activate txChainManager, activate rpcs
208 if (roleContext.getDeviceState().isValid()) {
209 processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
211 // We are not able to send anything to device, but we need to handle closing state clearly
213 processingClosure = Futures.immediateFuture(null);
215 // activate stats - accomplished automatically by changing role in deviceState
216 processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
219 public Void apply(@Nullable final Void aVoid) {
220 deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
224 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
225 // MASTER -> SLAVE - released tx-lock
226 LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
227 txContexts.remove(roleContext.getTxEntity(), roleContext);
228 processingClosure = Futures.immediateFuture(null);
230 LOG.debug("NOOP state transition for TxEntity {} ", roleContext.getTxEntity());
231 processingClosure = Futures.immediateFuture(null);
234 // handle result of executed steps
235 Futures.addCallback(processingClosure, new FutureCallback<Void>()
239 public void onSuccess(@Nullable final Void aVoid) {
240 // propagating role must be BECOMEMASTER in order to run this processing
241 // removing it will disable redundant processing of BECOMEMASTER
242 txCandidateGuard.release();
246 public void onFailure(final Throwable throwable) {
247 LOG.warn("Unexpected error for Node {} -> terminating device context", nodeId, throwable);
248 txCandidateGuard.release();
249 deviceContext.close();
256 private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
257 return new Function<Void, Void>() {
259 public Void apply(final Void result) {
260 roleChangeListener.suspendTxCandidate();
266 private Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleContext) {
267 return new Function<Void, Void>() {
269 public Void apply(final Void result) {
270 final NodeId nodeId = roleContext.getDeviceState().getNodeId();
272 LOG.debug("Node {} is marked as LEADER", nodeId);
273 Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
274 "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
275 // try to register tx-candidate via ownership service
276 roleContext.setupTxCandidate();
277 } catch (final CandidateAlreadyRegisteredException e) {
278 LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
280 // withdraw context from map in order to have it as before
281 txContexts.remove(roleContext.getTxEntity(), roleContext);
282 // no more propagating any role - there is no txCandidate lock approaching
283 Throwables.propagate(e);
290 private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) throws InterruptedException {
291 final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
292 LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
293 mainCandidateGuard.acquire();
294 LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
296 if (roleContext.getDeviceState().isValid()) {
297 LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
298 final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
299 final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
301 // propagation start point
302 ListenableFuture<Void> rolePropagationFx = Futures.immediateFuture(null);
303 final Function<Void, Void> txProcessCallback;
305 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
307 rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
308 txProcessCallback = makeTxEntitySuspendCallback(roleContext);
309 } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
311 txProcessCallback = makeTxEntitySetupCallback(roleContext);
313 LOG.debug("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
314 txProcessCallback = null;
317 if (txProcessCallback != null) {
318 rolePropagationFx = Futures.transform(rolePropagationFx, txProcessCallback);
322 Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
324 public void onSuccess(@Nullable final Void aVoid) {
325 LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
326 ownershipChange.getEntity(), oldRole, newRole);
327 mainCandidateGuard.release();
331 public void onFailure(final Throwable throwable) {
332 LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
333 ownershipChange.getEntity(), oldRole, newRole, throwable);
334 mainCandidateGuard.release();
335 roleContext.getDeviceContext().close();
340 LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
341 mainCandidateGuard.release();
342 // expecting that this roleContext will get closed in a moment
343 // FIXME: reconsider location of following cleanup logic
344 if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
345 unregistrationHelper(ownershipChange, roleContext);
346 } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
347 contexts.remove(ownershipChange.getEntity(), roleContext);
348 roleContext.suspendTxCandidate();
350 LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
355 private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
356 final RoleChangeListener roleChangeListener) {
357 Preconditions.checkArgument(roleChangeListener != null);
358 final DeviceState deviceState = roleChangeListener.getDeviceState();
359 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
360 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
361 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
362 Futures.addCallback(delFuture, new FutureCallback<Void>() {
365 public void onSuccess(final Void result) {
366 LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
370 public void onFailure(final Throwable t) {
371 LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
378 private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
379 LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
380 Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
382 public void onSuccess(@Nullable final Void aVoid) {
383 LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
384 contexts.remove(ownershipChange.getEntity(), roleContext);
385 roleContext.suspendTxCandidate();
389 public void onFailure(final Throwable throwable) {
390 LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
391 .getNodeId(), throwable.getMessage());
392 contexts.remove(ownershipChange.getEntity(), roleContext);
393 roleContext.suspendTxCandidate();