2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.role;
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import javax.annotation.CheckForNull;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
29 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
41 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
42 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
43 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
51 * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
53 * Hands over to StatisticsManager at the end.
55 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
56 private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
58 private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
59 private final DataBroker dataBroker;
60 private final EntityOwnershipService entityOwnershipService;
61 private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
62 private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
63 private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
64 private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
65 private final boolean switchFeaturesMandatory;
67 public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
68 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
69 this.dataBroker = Preconditions.checkNotNull(dataBroker);
70 this.switchFeaturesMandatory = switchFeaturesMandatory;
71 this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
72 this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
73 LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
77 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
78 deviceInitializationPhaseHandler = handler;
82 public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
83 LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
85 final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
86 makeEntity(deviceContext.getDeviceState().getNodeId()),
87 makeTxEntity(deviceContext.getDeviceState().getNodeId()));
88 // if the device context gets closed (mostly on connection close), we would need to cleanup
89 deviceContext.addDeviceContextClosedHandler(this);
90 final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
91 Verify.verify(previousContext == null,
92 "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
94 roleContext.initialization();
97 void getRoleContextLevelUp(final DeviceContext deviceContext) {
98 LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
99 LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
101 deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
102 } catch (final Exception e) {
103 LOG.info("failed to complete levelUp on next handler for device {}",
104 deviceContext.getDeviceState().getNodeId());
105 deviceContext.close();
111 public void close() throws Exception {
112 entityOwnershipListenerRegistration.close();
113 txEntityOwnershipListenerRegistration.close();
114 for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
115 // got here because last known role is LEADER and DS might need clearing up
116 final Entity entity = roleContextEntry.getKey();
117 final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
118 final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
119 if (ownershipState.isPresent()) {
120 if ((!ownershipState.get().hasOwner())) {
121 LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
122 "cleaning DS as being probably the last owner", nodeId);
123 removeDeviceFromOperDS(roleContextEntry.getValue());
125 // NOOP - there is another owner
126 LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
127 "leaving DS untouched", nodeId);
130 // TODO: is this safe? When could this happen?
131 LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
132 "cleaning DS ANYWAY!", nodeId);
133 removeDeviceFromOperDS(roleContextEntry.getValue());
140 public void onDeviceContextClosed(final DeviceContext deviceContext) {
141 final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
142 LOG.debug("onDeviceContextClosed for node {}", nodeId);
143 final Entity entity = makeEntity(nodeId);
144 final RoleContext roleContext = contexts.get(entity);
145 if (roleContext != null) {
146 LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
147 final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
148 if (actState.isPresent()) {
149 if (actState.get().isOwner()) {
150 if (!txContexts.containsKey(roleContext.getTxEntity())) {
152 txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
153 roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
154 roleContext.setupTxCandidate();
155 // we'd like to wait for registration response
157 } catch (final CandidateAlreadyRegisteredException e) {
162 LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
163 contexts.remove(entity, roleContext);
164 // TODO : is there a chance to have TxEntity ?
167 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
173 private static Entity makeEntity(final NodeId nodeId) {
174 return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
177 private static Entity makeTxEntity(final NodeId nodeId) {
178 return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
182 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
183 Preconditions.checkArgument(ownershipChange != null);
184 RoleContext roleCtxForClose = null;
186 final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
187 if (roleContext != null) {
188 roleCtxForClose = roleContext;
189 changeForEntity(ownershipChange, roleContext);
193 final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
194 if (txRoleContext != null) {
195 roleCtxForClose = txRoleContext;
196 changeForTxEntity(ownershipChange, txRoleContext);
199 } catch (final InterruptedException e) {
200 LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
201 if (roleCtxForClose != null) {
202 roleCtxForClose.close();
206 LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
207 ownershipChange.getEntity(), ownershipChange);
210 private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext)
211 throws InterruptedException {
212 LOG.info("Received TX-EntityOwnershipChange:{}", ownershipChange);
213 final Semaphore txCandidateGuard = roleContext.getTxCandidateGuard();
214 LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
215 txCandidateGuard.acquire();
217 ListenableFuture<Void> processingClosure;
218 final DeviceContext deviceContext = roleContext.getDeviceContext();
219 final NodeId nodeId = roleContext.getDeviceState().getNodeId();
221 if (!roleContext.getDeviceState().isValid()
222 && RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
223 LOG.debug("Node {} ownership changed during closing process", roleContext.getDeviceState().getNodeId());
225 txCandidateGuard.release();
229 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
230 // SLAVE -> MASTER - acquired transition lock
231 LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
232 roleContext.setTxLockOwned(true);
233 final OfpRole role = roleContext.getDeviceState().getRole();
234 Verify.verify(OfpRole.BECOMEMASTER.equals(roleContext.getPropagatingRole()),
235 "Acquired tx-lock but current role = {}", role);
237 switch (roleContext.getState()) {
239 processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
240 // activate stats - accomplished automatically by chaging role in deviceState
241 // collect initial dynamic data from device
242 processingClosure = Futures.transform(processingClosure, new AsyncFunction<Void, Void>() {
245 public ListenableFuture<Void> apply(@Nullable final Void aVoid) {
246 deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
247 return DeviceInitializationUtils.initializeNodeInformation(
248 deviceContext, switchFeaturesMandatory);
253 // activate txChainManager, activate rpcs
254 processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
255 // activate stats - accomplished automatically by chaging role in deviceState
256 processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
259 public Void apply(@Nullable final Void aVoid) {
260 deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
267 //TODO: reconsider if there is really nothing to do when tearing down
268 processingClosure = Futures.immediateFuture(null);
271 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
272 // MASTER -> SLAVE - released tx-lock
273 LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
274 roleContext.setTxLockOwned(false);
275 txContexts.remove(roleContext.getTxEntity(), roleContext);
276 processingClosure = Futures.immediateFuture(null);
278 LOG.debug("NOOP state transition for TxEntity {} ", roleContext.getTxEntity());
279 processingClosure = Futures.immediateFuture(null);
282 // handle result of executed steps
283 Futures.addCallback(processingClosure, new FutureCallback<Void>()
287 public void onSuccess(@Nullable final Void aVoid) {
288 // propagating role must be BECOMEMASTER in order to run this processing
289 // removing it will disable redundand processing of BECOMEMASTER
290 roleContext.setPropagatingRole(null);
292 txCandidateGuard.release();
293 switch (roleContext.getState()) {
295 LOG.debug("init steps protected by tx-lock for node {} are done.", nodeId);
296 roleContext.promoteStateToWorking();
297 getRoleContextLevelUp(deviceContext);
300 LOG.debug("normal steps protected by tx-lock for node {} are done.", nodeId);
303 LOG.debug("teardown steps protected by tx-lock for node {} are done.", nodeId);
309 public void onFailure(final Throwable throwable) {
310 LOG.warn("Unexpected error for Node {}, state={}, txLock={} -> terminating device context",
311 nodeId, roleContext.getState(), roleContext.isTxLockOwned(), throwable);
312 txCandidateGuard.release();
313 deviceContext.close();
320 private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
321 return new Function<Void, Void>() {
323 public Void apply(final Void result) {
324 roleChangeListener.suspendTxCandidate();
330 private Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleContext) {
331 return new Function<Void, Void>() {
333 public Void apply(final Void result) {
334 final NodeId nodeId = roleContext.getDeviceState().getNodeId();
336 LOG.debug("Node {} is marked as LEADER", nodeId);
337 Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
338 "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
339 roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
341 // try to register tx-candidate via ownership service
342 roleContext.setupTxCandidate();
343 } catch (final CandidateAlreadyRegisteredException e) {
344 LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
346 // withdraw context from map in order to have it as before
347 txContexts.remove(roleContext.getTxEntity(), roleContext);
348 // no more propagating any role - there is no txCandidate lock approaching
349 roleContext.setPropagatingRole(null);
350 roleContext.getDeviceContext().close();
357 private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) throws InterruptedException {
358 final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
359 LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
360 mainCandidateGuard.acquire();
361 LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
363 if (roleContext.getDeviceState().isValid()) {
364 LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
365 final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
366 final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
368 // propagation start point
369 ListenableFuture<Void> rolePropagationFx = Futures.immediateFuture(null);
370 final Function<Void, Void> txProcessCallback;
372 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
374 rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
375 if (RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
376 txProcessCallback = makeTxEntitySuspendCallback(roleContext);
378 txProcessCallback = null;
380 } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
382 txProcessCallback = makeTxEntitySetupCallback(roleContext);
383 } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
384 if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
385 rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
387 txProcessCallback = null;
389 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
390 txProcessCallback = null;
393 if (txProcessCallback != null) {
394 rolePropagationFx = Futures.transform(rolePropagationFx, txProcessCallback);
398 Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
400 public void onSuccess(@Nullable final Void aVoid) {
401 LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
402 ownershipChange.getEntity(), oldRole, newRole);
403 roleContext.setPropagatingRole(newRole);
404 mainCandidateGuard.release();
408 public void onFailure(final Throwable throwable) {
409 LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
410 ownershipChange.getEntity(), oldRole, newRole);
411 mainCandidateGuard.release();
412 roleContext.getDeviceContext().close();
417 LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
418 mainCandidateGuard.release();
419 // expecting that this roleContext will get closed in a moment
420 // FIXME: reconsider location of following cleanup logic
421 if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
422 unregistrationHelper(ownershipChange, roleContext);
423 } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
424 contexts.remove(ownershipChange.getEntity(), roleContext);
425 roleContext.suspendTxCandidate();
427 LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
432 private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
433 final RoleChangeListener roleChangeListener) {
434 Preconditions.checkArgument(roleChangeListener != null);
435 final DeviceState deviceState = roleChangeListener.getDeviceState();
436 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
437 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
438 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
439 Futures.addCallback(delFuture, new FutureCallback<Void>() {
442 public void onSuccess(final Void result) {
443 LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
447 public void onFailure(final Throwable t) {
448 LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
455 private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
456 LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
457 Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
459 public void onSuccess(@Nullable final Void aVoid) {
460 LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
461 contexts.remove(ownershipChange.getEntity(), roleContext);
462 roleContext.suspendTxCandidate();
466 public void onFailure(final Throwable throwable) {
467 LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
468 .getNodeId(), throwable.getMessage());
469 contexts.remove(ownershipChange.getEntity(), roleContext);
470 roleContext.suspendTxCandidate();