604806da9d414ad93c7238d8474136cea59056ed
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.JdkFutureAdapters;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
37 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
40 import org.opendaylight.openflowplugin.api.OFConstants;
41 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
49 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
50 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
51 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
52 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
59 import org.opendaylight.yangtools.yang.common.RpcResult;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
65  * On receipt of the ownership notification, makes an rpc call to SalRoleService.
66  *
67  * Hands over to StatisticsManager at the end.
68  */
69 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
70     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
71
72     // Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
73     private static final int MAX_CLEAN_DS_RETRIES = 3;
74
75     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
76     private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
77     private final DataBroker dataBroker;
78     private final EntityOwnershipService entityOwnershipService;
79     private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
80     private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
81     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
82     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
83     private List<RoleChangeListener> listeners = new ArrayList<>();
84
85     private final LifecycleConductor conductor;
86
87     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
88         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
89         this.dataBroker = Preconditions.checkNotNull(dataBroker);
90         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
91         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
92         this.conductor = lifecycleConductor;
93         LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
94     }
95
96     @Override
97     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
98         deviceInitializationPhaseHandler = handler;
99     }
100
101     @Override
102     public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
103         final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
104         final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
105         roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
106         Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
107         makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
108         /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
109         watchingEntities.put(roleContext.getEntity(), roleContext);
110         notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
111         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
112     }
113
114     @Override
115     public void close() {
116         LOG.debug("Close method on role manager was called.");
117         entityOwnershipListenerRegistration.close();
118         txEntityOwnershipListenerRegistration.close();
119         for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
120             // got here because last known role is LEADER and DS might need clearing up
121             final RoleContext roleContext = iterator.next();
122             watchingEntities.remove(roleContext.getEntity());
123             watchingEntities.remove(roleContext.getTxEntity());
124             contexts.remove(roleContext.getDeviceInfo());
125             if (roleContext.isTxCandidateRegistered()) {
126                 LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", roleContext.getDeviceInfo().getNodeId().getValue());
127                 removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
128             }
129             roleContext.close();
130         }
131     }
132
133     @Override
134     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
135         LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
136         final RoleContext roleContext = contexts.remove(deviceInfo);
137         if (roleContext != null) {
138             LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
139             roleContext.setState(OFPContext.CONTEXT_STATE.TERMINATION);
140             roleContext.unregisterCandidate(roleContext.getEntity());
141             if (roleContext.isTxCandidateRegistered()) {
142                 LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.", deviceInfo.getNodeId().getValue());
143                 removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
144             }
145             roleContext.close();
146         }
147         deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
148     }
149
150     @VisibleForTesting
151     static Entity makeEntity(final NodeId nodeId) {
152         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
153     }
154
155     @VisibleForTesting
156     static Entity makeTxEntity(final NodeId nodeId) {
157         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
158     }
159
160     @Override
161     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
162
163         Preconditions.checkArgument(ownershipChange != null);
164         final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
165
166         if (Objects.nonNull(roleContext) && !roleContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
167
168             LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
169                     ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
170                     ownershipChange.getEntity().getType(),
171                     roleContext.getDeviceInfo().getNodeId());
172
173             if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
174                 changeOwnershipForMainEntity(ownershipChange, roleContext);
175             } else {
176                 changeOwnershipForTxEntity(ownershipChange, roleContext);
177             }
178
179         } else {
180
181             LOG.debug("Role context for entity type {} is in state closing, disregarding ownership change notification.", ownershipChange.getEntity().getType());
182             watchingEntities.remove(ownershipChange.getEntity());
183
184         }
185
186     }
187
188     @VisibleForTesting
189     void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
190
191         if (roleContext.isMainCandidateRegistered()) {
192             LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
193                     ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
194             if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
195                 // SLAVE -> MASTER
196                 LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
197                 if (roleContext.registerCandidate(roleContext.getTxEntity())) {
198                     LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
199                     watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
200                 }
201             } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
202                 // MASTER -> SLAVE
203                 LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
204                 conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
205                 makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
206             }
207         } else {
208             LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
209                     ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
210             watchingEntities.remove(ownershipChange.getEntity(), roleContext);
211             if (roleContext.isTxCandidateRegistered()) {
212                 LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
213                 roleContext.unregisterCandidate(roleContext.getTxEntity());
214                 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
215                     LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
216                     removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
217                 }
218             } else {
219                 contexts.remove(roleContext.getDeviceInfo(), roleContext);
220                 roleContext.close();
221                 conductor.closeConnection(roleContext.getDeviceInfo());
222             }
223         }
224     }
225
226     @VisibleForTesting
227     void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
228             @Nonnull final RoleContext roleContext) {
229
230         if (roleContext.isTxCandidateRegistered()) {
231             LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
232                     ownershipChange.getEntity().getType(),
233                     roleContext.getDeviceInfo().getNodeId());
234             if (ownershipChange.inJeopardy()) {
235                 LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
236                 watchingEntities.remove(roleContext.getTxEntity());
237                 roleContext.unregisterCandidate(roleContext.getTxEntity());
238             } else {
239                 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
240                     // SLAVE -> MASTER
241                     LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
242                     makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
243                 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
244                     // MASTER -> SLAVE
245                     LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
246                     LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
247                             ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
248                     watchingEntities.remove(roleContext.getTxEntity(), roleContext);
249                     watchingEntities.remove(roleContext.getEntity(), roleContext);
250                     roleContext.unregisterCandidate(roleContext.getEntity());
251                     roleContext.unregisterCandidate(roleContext.getTxEntity());
252                     if (!ownershipChange.hasOwner()) {
253                         LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
254                         removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
255                     } else {
256                         contexts.remove(roleContext.getDeviceInfo(), roleContext);
257                         roleContext.close();
258                         conductor.closeConnection(roleContext.getDeviceInfo());
259                     }
260                 }
261             }
262         } else {
263             LOG.debug("Tx-EntityOwnershipRegistration is not active for entity type {} and node {}", ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
264             watchingEntities.remove(roleContext.getTxEntity(), roleContext);
265             contexts.remove(roleContext.getDeviceInfo(), roleContext);
266             roleContext.close();
267             conductor.closeConnection(roleContext.getDeviceInfo());
268         }
269     }
270
271     @VisibleForTesting
272     void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
273         final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
274         Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
275             @Override
276             public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
277                 LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
278                 if (!init) {
279                     notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), role);
280                 }
281             }
282
283             @Override
284             public void onFailure(@Nonnull final Throwable throwable) {
285                 LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
286                 conductor.closeConnection(roleContext.getDeviceInfo());
287             }
288         });
289     }
290
291     @VisibleForTesting
292     ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
293         LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
294         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
295         final Short version = roleContext.getDeviceInfo().getVersion();
296         if (null == version) {
297             LOG.debug("Device version is null");
298             return Futures.immediateFuture(null);
299         }
300         if (version < OFConstants.OFP_VERSION_1_3) {
301             LOG.debug("Device version not support ROLE");
302             return Futures.immediateFuture(null);
303         } else {
304             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
305                     .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
306             setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
307             final TimerTask timerTask = timeout -> {
308                 if (!setRoleOutputFuture.isDone()) {
309                     LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
310                     setRoleOutputFuture.cancel(true);
311                 }
312             };
313             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
314         }
315         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
316     }
317
318     @VisibleForTesting
319     CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
320         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
321         delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
322         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
323
324         Futures.addCallback(delFuture, new FutureCallback<Void>() {
325             @Override
326             public void onSuccess(final Void result) {
327                 LOG.debug("Delete Node {} was successful", deviceInfo);
328                 final RoleContext roleContext = contexts.remove(deviceInfo);
329                 if (roleContext != null) {
330                     roleContext.close();
331                 }
332             }
333
334             @Override
335             public void onFailure(@Nonnull final Throwable t) {
336                 // If we have any retries left, we will try to clean the datastore again
337                 if (numRetries > 0) {
338                     // We "used" one retry here, so decrement it
339                     final int curRetries = numRetries - 1;
340                     LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getNodeId(), t, curRetries);
341                     // Recursive call to this method with "one less" retry
342                     removeDeviceFromOperationalDS(deviceInfo, curRetries);
343                     return;
344                 }
345
346                 // No retries left, so we will just close the role context, and ignore datastore cleanup
347                 LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getNodeId(), t);
348                 final RoleContext roleContext = contexts.remove(deviceInfo);
349                 if (roleContext != null) {
350                     roleContext.close();
351                 }
352             }
353         });
354
355         return delFuture;
356     }
357
358     @Override
359     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
360         deviceTerminationPhaseHandler = handler;
361     }
362
363     @Override
364     public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
365         LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
366         final RoleContext roleContext = contexts.get(deviceInfo);
367         if (null != roleContext) {
368             /* Services stopped or failure */
369             roleContext.unregisterCandidate(roleContext.getTxEntity());
370         }
371     }
372
373     @VisibleForTesting
374     RoleContext getRoleContext(final DeviceInfo deviceInfo){
375         return contexts.get(deviceInfo);
376     }
377
378     /**
379      * This method is only for testing
380      */
381     @VisibleForTesting
382     void setRoleContext(DeviceInfo deviceInfo, RoleContext roleContext){
383         if(!contexts.containsKey(deviceInfo)) {
384             contexts.put(deviceInfo, roleContext);
385         }
386     }
387
388     @Override
389     public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
390         this.listeners.add(roleChangeListener);
391     }
392
393     /**
394      * Invoked when initialization phase is done
395      * @param deviceInfo node identification
396      * @param success true if initialization done ok, false otherwise
397      */
398     @VisibleForTesting
399     void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
400         LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
401         for (final RoleChangeListener listener : listeners) {
402             listener.roleInitializationDone(deviceInfo, success);
403         }
404     }
405
406     /**
407      * Notifies registered listener on role change. Role is the new role on device
408      * If initialization phase is true, we may skip service starting
409      * @param deviceInfo
410      * @param role new role meant to be set on device
411      */
412     @VisibleForTesting
413     void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole role){
414         LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
415         for (final RoleChangeListener listener : listeners) {
416             listener.roleChangeOnDevice(deviceInfo, role);
417         }
418     }
419
420     @Override
421     public <T extends OFPContext> T gainContext(DeviceInfo deviceInfo) {
422         return (T) contexts.get(deviceInfo);
423     }
424 }