2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.role;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.JdkFutureAdapters;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.Timeout;
20 import io.netty.util.TimerTask;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.CheckForNull;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
37 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
38 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
39 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
40 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
42 import org.opendaylight.openflowplugin.api.OFConstants;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
49 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
50 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
51 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
52 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
59 import org.opendaylight.yangtools.yang.common.RpcResult;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
64 * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
65 * On receipt of the ownership notification, makes an rpc call to SalRoleService.
67 * Hands over to StatisticsManager at the end.
69 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
70 private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
72 private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
73 private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
74 private final DataBroker dataBroker;
75 private final EntityOwnershipService entityOwnershipService;
76 private final ConcurrentMap<NodeId, RoleContext> contexts = new ConcurrentHashMap<>();
77 private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
78 private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
79 private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
80 private List<RoleChangeListener> listeners = new ArrayList<>();
82 private final LifecycleConductor conductor;
84 public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
85 this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
86 this.dataBroker = Preconditions.checkNotNull(dataBroker);
87 this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
88 this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
89 this.conductor = lifecycleConductor;
90 LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
94 public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
95 deviceInitializationPhaseHandler = handler;
99 public void onDeviceContextLevelUp(@CheckForNull final NodeId nodeId) throws Exception {
100 final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(nodeId));
101 final RoleContext roleContext = new RoleContextImpl(nodeId, entityOwnershipService, makeEntity(nodeId), makeTxEntity(nodeId), conductor);
102 roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
103 Verify.verify(contexts.putIfAbsent(nodeId, roleContext) == null, "Role context for master Node %s is still not closed.", nodeId);
104 makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
105 /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
106 watchingEntities.put(roleContext.getEntity(), roleContext);
107 notifyListenersRoleInitializationDone(roleContext.getNodeId(), roleContext.initialization());
108 deviceInitializationPhaseHandler.onDeviceContextLevelUp(nodeId);
112 public void close() {
113 LOG.debug("Close method on role manager was called.");
114 entityOwnershipListenerRegistration.close();
115 txEntityOwnershipListenerRegistration.close();
116 for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
117 // got here because last known role is LEADER and DS might need clearing up
118 final RoleContext roleContext = iterator.next();
119 watchingEntities.remove(roleContext.getEntity());
120 watchingEntities.remove(roleContext.getTxEntity());
121 contexts.remove(roleContext.getNodeId());
122 if (roleContext.isTxCandidateRegistered()) {
123 LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.");
124 removeDeviceFromOperationalDS(roleContext.getNodeId());
132 public void onDeviceContextLevelDown(final DeviceContext deviceContext) {
133 final NodeId nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
134 LOG.trace("onDeviceContextLevelDown for node {}", nodeId);
135 final RoleContext roleContext = contexts.get(nodeId);
136 if (roleContext != null) {
137 LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", nodeId);
138 if (roleContext.isMainCandidateRegistered()) {
139 roleContext.unregisterCandidate(roleContext.getEntity());
141 contexts.remove(nodeId, roleContext);
145 deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceContext);
149 static Entity makeEntity(final NodeId nodeId) {
150 return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
154 static Entity makeTxEntity(final NodeId nodeId) {
155 return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
159 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
161 Preconditions.checkArgument(ownershipChange != null);
162 final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
164 LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} for entity type {} and node {}",
165 ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(),
166 ownershipChange.getEntity().getType(),
167 roleContext != null ? roleContext.getNodeId() : "-> no watching entity, disregarding notification <-");
169 if (roleContext != null) {
170 if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
171 changeOwnershipForMainEntity(ownershipChange, roleContext);
173 changeOwnershipForTxEntity(ownershipChange, roleContext);
176 LOG.debug("OwnershipChange {}", ownershipChange);
182 void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
184 if (roleContext.isMainCandidateRegistered()) {
185 LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
186 ownershipChange.getEntity().getType(), roleContext.getNodeId());
187 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
189 LOG.debug("SLAVE to MASTER for node {}", roleContext.getNodeId());
190 if (roleContext.registerCandidate(roleContext.getTxEntity())) {
191 LOG.debug("Starting watching tx entity for node {}", roleContext.getNodeId());
192 watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
194 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
196 LOG.debug("MASTER to SLAVE for node {}", roleContext.getNodeId());
197 conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getNodeId());
198 makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
201 LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
202 ownershipChange.getEntity(), roleContext.getNodeId());
203 watchingEntities.remove(ownershipChange.getEntity(), roleContext);
204 if (roleContext.isTxCandidateRegistered()) {
205 LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getNodeId());
206 roleContext.unregisterCandidate(roleContext.getTxEntity());
207 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
208 LOG.debug("Trying to remove from operational node: {}", roleContext.getNodeId());
209 removeDeviceFromOperationalDS(roleContext.getNodeId());
212 final NodeId nodeId = roleContext.getNodeId();
213 contexts.remove(nodeId, roleContext);
215 conductor.closeConnection(nodeId);
221 void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
222 @Nonnull final RoleContext roleContext) {
224 if (roleContext.isTxCandidateRegistered()) {
225 LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
226 ownershipChange.getEntity().getType(),
227 roleContext.getNodeId());
228 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
230 LOG.debug("SLAVE to MASTER for node {}", roleContext.getNodeId());
231 makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext,false);
232 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
234 LOG.debug("MASTER to SLAVE for node {}", roleContext.getNodeId());
235 LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
236 ownershipChange.getEntity().getType(),roleContext.getNodeId());
237 watchingEntities.remove(roleContext.getTxEntity(), roleContext);
238 watchingEntities.remove(roleContext.getEntity(), roleContext);
239 roleContext.unregisterCandidate(roleContext.getEntity());
240 roleContext.unregisterCandidate(roleContext.getTxEntity());
241 if (!ownershipChange.hasOwner()) {
242 LOG.debug("Trying to remove from operational node: {}", roleContext.getNodeId());
243 removeDeviceFromOperationalDS(roleContext.getNodeId());
245 final NodeId nodeId = roleContext.getNodeId();
246 contexts.remove(nodeId, roleContext);
248 conductor.closeConnection(nodeId);
252 LOG.debug("Tx-EntityOwnershipRegistration is not active for entity {}", ownershipChange.getEntity().getType());
253 watchingEntities.remove(roleContext.getTxEntity(), roleContext);
254 final NodeId nodeId = roleContext.getNodeId();
255 contexts.remove(nodeId, roleContext);
257 conductor.closeConnection(nodeId);
262 void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
263 final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
264 Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
266 public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
267 LOG.info("Role {} successfully set on device {}", role, roleContext.getNodeId());
268 notifyListenersRoleChangeOnDevice(roleContext.getNodeId(), true, role, init);
272 public void onFailure(@Nonnull final Throwable throwable) {
273 LOG.warn("Unable to set role {} on device {}", role, roleContext.getNodeId());
274 notifyListenersRoleChangeOnDevice(roleContext.getNodeId(), false, role, init);
280 ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
281 LOG.debug("Sending new role {} to device {}", newRole, roleContext.getNodeId());
282 final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
283 final Short version = conductor.gainVersionSafely(roleContext.getNodeId());
284 if (null == version) {
285 LOG.debug("Device version is null");
286 return Futures.immediateFuture(null);
288 if (version < OFConstants.OFP_VERSION_1_3) {
289 LOG.debug("Device version not support ROLE");
290 return Futures.immediateFuture(null);
292 final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
293 .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getNodeId()))).build();
294 setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
295 final TimerTask timerTask = new TimerTask() {
298 public void run(final Timeout timeout) throws Exception {
299 if (!setRoleOutputFuture.isDone()) {
300 LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getNodeId());
301 setRoleOutputFuture.cancel(true);
305 conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
307 return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
311 CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final NodeId nodeId) {
313 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
314 delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
315 final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
316 Futures.addCallback(delFuture, new FutureCallback<Void>() {
319 public void onSuccess(final Void result) {
320 LOG.debug("Delete Node {} was successful", nodeId);
321 final RoleContext roleContext = contexts.remove(nodeId);
322 if (roleContext != null) {
328 public void onFailure(@Nonnull final Throwable t) {
329 LOG.warn("Delete Node {} failed. {}", nodeId, t);
330 contexts.remove(nodeId);
331 final RoleContext roleContext = contexts.remove(nodeId);
332 if (roleContext != null) {
341 public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
342 deviceTerminationPhaseHandler = handler;
346 public void servicesChangeDone(final NodeId nodeId, final boolean success) {
347 LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), nodeId);
348 final RoleContext roleContext = contexts.get(nodeId);
349 if (null != roleContext) {
350 /* Services stopped or failure */
351 roleContext.unregisterCandidate(roleContext.getTxEntity());
356 RoleContext getRoleContext(final NodeId nodeId){
357 return contexts.get(nodeId);
361 * This method is only for testing
364 void setRoleContext(NodeId nodeId, RoleContext roleContext){
365 if(!contexts.containsKey(nodeId)) {
366 contexts.put(nodeId, roleContext);
371 public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
372 this.listeners.add(roleChangeListener);
376 * Invoked when initialization phase is done
377 * @param nodeId node identification
378 * @param success true if initialization done ok, false otherwise
381 void notifyListenersRoleInitializationDone(final NodeId nodeId, final boolean success){
382 LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
383 for (final RoleChangeListener listener : listeners) {
384 listener.roleInitializationDone(nodeId, success);
389 * Notifies registered listener on role change. Role is the new role on device
390 * If initialization phase is true, we may skip service starting
391 * @param success true if role change on device done ok, false otherwise
392 * @param role new role meant to be set on device
393 * @param initializationPhase if true, then skipp services start
396 void notifyListenersRoleChangeOnDevice(final NodeId nodeId, final boolean success, final OfpRole role, final boolean initializationPhase){
397 LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
398 for (final RoleChangeListener listener : listeners) {
399 listener.roleChangeOnDevice(nodeId, success, role, initializationPhase);