2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.role;
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Throwables;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.Semaphore;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
31 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
43 import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
44 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
45 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
46 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
54 * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
56 * Hands over to StatisticsManager at the end.
58 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
59 private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
61 private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
62 private final DataBroker dataBroker;
63 private final EntityOwnershipService entityOwnershipService;
64 private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
65 private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
66 private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
67 private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
68 private final boolean switchFeaturesMandatory;
70 public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
71 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
72 this.dataBroker = Preconditions.checkNotNull(dataBroker);
73 this.switchFeaturesMandatory = switchFeaturesMandatory;
74 this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
75 this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
76 LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
80 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
81 deviceInitializationPhaseHandler = handler;
85 public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
86 LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
87 if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
88 // Roles are not supported before OF1.3, so move forward.
89 deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
93 final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
94 makeEntity(deviceContext.getDeviceState().getNodeId()),
95 makeTxEntity(deviceContext.getDeviceState().getNodeId()));
96 // if the device context gets closed (mostly on connection close), we would need to cleanup
97 deviceContext.addDeviceContextClosedHandler(this);
98 final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
99 Verify.verify(previousContext == null,
100 "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
102 roleContext.initialization();
104 final ListenableFuture<OfpRole> roleChangeFuture = SettableFuture.<OfpRole> create();
106 final ListenableFuture<Void> txFreeFuture = Futures.transform(roleChangeFuture, new AsyncFunction<OfpRole, Void>() {
108 public ListenableFuture<Void> apply(final OfpRole input) throws Exception {
109 final ListenableFuture<Void> nextFuture;
110 if (OfpRole.BECOMEMASTER.equals(input)) {
111 LOG.debug("Node {} has marked as LEADER", deviceContext.getDeviceState().getNodeId());
112 Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
113 "RoleCtx for TxEntity {} master Node {} is still not close.",
114 roleContext.getTxEntity(), deviceContext.getDeviceState().getNodeId());
115 // nextFuture = roleContext.setupTxCandidate();
116 nextFuture = Futures.immediateFuture(null);
118 LOG.debug("Node {} was marked as FOLLOWER", deviceContext.getDeviceState().getNodeId());
119 nextFuture = Futures.immediateFuture(null);
125 final ListenableFuture<Void> initDeviceFuture = Futures.transform(txFreeFuture, new AsyncFunction<Void, Void>() {
127 public ListenableFuture<Void> apply(final Void input) throws Exception {
128 LOG.debug("Node {} will be initialized", deviceContext.getDeviceState().getNodeId());
129 return DeviceInitializationUtils.initializeNodeInformation(deviceContext, switchFeaturesMandatory);
133 Futures.addCallback(initDeviceFuture, new FutureCallback<Void>() {
135 public void onSuccess(final Void result) {
136 LOG.debug("Initialization Node {} is done.", deviceContext.getDeviceState().getNodeId());
138 getRoleContextLevelUp(deviceContext);
139 } catch (final Exception e) {
140 deviceContext.close();
145 public void onFailure(final Throwable t) {
146 LOG.warn("Unexpected error for Node {} initialization", deviceContext.getDeviceState().getNodeId(), t);
147 deviceContext.close();
152 void getRoleContextLevelUp(final DeviceContext deviceContext) throws Exception {
153 LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
154 LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
155 deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
159 public void close() throws Exception {
160 entityOwnershipListenerRegistration.close();
161 txEntityOwnershipListenerRegistration.close();
162 for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
163 // got here because last known role is LEADER and DS might need clearing up
164 final Entity entity = roleContextEntry.getKey();
165 final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
166 final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
167 if (ownershipState.isPresent()) {
168 if ((!ownershipState.get().hasOwner())) {
169 LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
170 "cleaning DS as being probably the last owner", nodeId);
171 removeDeviceFromOperDS(roleContextEntry.getValue());
173 // NOOP - there is another owner
174 LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
175 "leaving DS untouched", nodeId);
178 // TODO: is this safe? When could this happen?
179 LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
180 "cleaning DS ANYWAY!", nodeId);
181 removeDeviceFromOperDS(roleContextEntry.getValue());
188 public void onDeviceContextClosed(final DeviceContext deviceContext) {
189 final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
190 LOG.debug("onDeviceContextClosed for node {}", nodeId);
191 final Entity entity = makeEntity(nodeId);
192 final RoleContext roleContext = contexts.get(entity);
193 if (roleContext != null) {
194 LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
195 final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
196 if (actState.isPresent()) {
197 if (!actState.get().isOwner()) {
198 LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
199 contexts.remove(entity, roleContext);
202 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
208 private static Entity makeEntity(final NodeId nodeId) {
209 return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
212 private static Entity makeTxEntity(final NodeId nodeId) {
213 return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
217 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
218 Preconditions.checkArgument(ownershipChange != null);
220 final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
221 if (roleContext != null) {
222 changeForEntity(ownershipChange, roleContext);
226 final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
227 if (txRoleContext != null) {
228 changeForTxEntity(ownershipChange, txRoleContext);
231 } catch (final InterruptedException e) {
232 LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
233 // FIXME: consider forcibly closing this connection
236 LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
237 ownershipChange.getEntity(), ownershipChange);
240 private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleChangeListener roleTxChangeListener) throws InterruptedException {
241 LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
242 final Semaphore txCandidateGuard = roleTxChangeListener.getTxCandidateGuard();
243 LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
244 txCandidateGuard.acquire();
246 if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
247 // MASTER -> SLAVE - left transition lock
248 txContexts.remove(roleTxChangeListener.getTxEntity(), roleTxChangeListener);
249 txCandidateGuard.release();
250 } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
251 // SLAVE -> MASTER - acquired transition lock
252 LOG.debug("TxRoleChange for entity {}", ownershipChange.getEntity());
253 final OfpRole role = roleTxChangeListener.getDeviceState().getRole();
254 Verify.verify(OfpRole.BECOMEMASTER.equals(role),
255 "Acquired txCandidate lock but current role = {}", role);
258 LOG.debug("NOOP state transition for TxEntity {} ", roleTxChangeListener.getTxEntity());
259 txCandidateGuard.release();
263 private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
264 return new Function<Void, Void>() {
266 public Void apply(final Void result) {
267 roleChangeListener.suspendTxCandidate();
273 private static Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleChangeListener) {
274 return new Function<Void, Void>() {
276 public Void apply(final Void result) {
278 roleChangeListener.setupTxCandidate();
279 } catch (final CandidateAlreadyRegisteredException e) {
280 LOG.debug("txCandidate registration failed");
281 Throwables.propagate(e);
288 private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleChangeListener) throws InterruptedException {
289 final Semaphore mainCandidateGuard = roleChangeListener.getMainCandidateGuard();
290 LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
291 mainCandidateGuard.acquire();
292 //FIXME : check again implementation for double candidate scenario
293 LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
295 if (roleChangeListener.getDeviceState().isValid()) {
296 LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
297 final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
298 final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
299 // send even if they are same. we do the check for duplicates in SalRoleService and maintain a lastKnownRole
300 ListenableFuture<Void> rolePropagatedFx = roleChangeListener.onRoleChanged(oldRole, newRole);
301 final Function<Void, Void> txProcessCallback;
302 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
304 txProcessCallback = makeTxEntitySuspendCallback(roleChangeListener);
305 } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
306 // FIXME : make different code path deviceContext.onClusterRoleChange(newRole); has to call from onTxRoleChange (for master)
308 txProcessCallback = makeTxEntitySetupCallback(roleChangeListener);
310 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
311 txProcessCallback = null;
314 if (txProcessCallback != null) {
315 rolePropagatedFx = Futures.transform(rolePropagatedFx, txProcessCallback);
318 Futures.addCallback(rolePropagatedFx, new FutureCallback<Void>() {
320 public void onSuccess(@Nullable final Void aVoid) {
321 LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
322 ownershipChange.getEntity(), oldRole, newRole);
323 mainCandidateGuard.release();
327 public void onFailure(final Throwable throwable) {
328 LOG.warn("Main candidate role propagation failed for entity: {}, {} -> {}",
329 ownershipChange.getEntity(), oldRole, newRole);
330 mainCandidateGuard.release();
331 // FIXME: here we shall disconnect probably - in order to avoid inconsistent state
337 LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
338 mainCandidateGuard.release();
339 // expecting that this roleContext will get closed in a moment
340 // FIXME: reconsider location of following cleanup logic
341 if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
342 unregistrationHelper(ownershipChange, roleChangeListener);
343 } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
344 contexts.remove(ownershipChange.getEntity(), roleChangeListener);
345 roleChangeListener.suspendTxCandidate();
347 LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
352 private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
353 final RoleChangeListener roleChangeListener) {
354 Preconditions.checkArgument(roleChangeListener != null);
355 final DeviceState deviceState = roleChangeListener.getDeviceState();
356 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
357 delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
358 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
359 Futures.addCallback(delFuture, new FutureCallback<Void>() {
362 public void onSuccess(final Void result) {
363 LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
367 public void onFailure(final Throwable t) {
368 LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
374 private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
375 LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
376 Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
378 public void onSuccess(@Nullable final Void aVoid) {
379 LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
380 contexts.remove(ownershipChange.getEntity(), roleChangeListener);
381 ((RoleContext) roleChangeListener).suspendTxCandidate();
385 public void onFailure(final Throwable throwable) {
386 LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
387 .getNodeId(), throwable.getMessage());
388 contexts.remove(ownershipChange.getEntity(), roleChangeListener);
389 ((RoleContext) roleChangeListener).suspendTxCandidate();