54d312e1e4426d3e816b93c45b1b90a0bb47294f
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.JdkFutureAdapters;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.CheckForNull;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
48 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
49 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
50 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
51 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
64  * On receipt of the ownership notification, makes an rpc call to SalRoleService.
65  *
66  * Hands over to StatisticsManager at the end.
67  */
68 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
69     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
70
71     // Maximum limit of timeout retries when cleaning DS, to prevent infinite recursive loops
72     private static final int MAX_CLEAN_DS_RETRIES = 3;
73
74     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
75     private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
76     private final DataBroker dataBroker;
77     private final EntityOwnershipService entityOwnershipService;
78     private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
79     private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
80     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
81     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
82     private List<RoleChangeListener> listeners = new ArrayList<>();
83
84     private final LifecycleConductor conductor;
85
86     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
87         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
88         this.dataBroker = Preconditions.checkNotNull(dataBroker);
89         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
90         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
91         this.conductor = lifecycleConductor;
92         LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
93     }
94
95     @Override
96     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
97         deviceInitializationPhaseHandler = handler;
98     }
99
100     @Override
101     public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
102         final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
103         final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
104         roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
105         Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
106         makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
107         /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
108         watchingEntities.put(roleContext.getEntity(), roleContext);
109         notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
110         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
111     }
112
113     @Override
114     public void close() {
115         LOG.debug("Close method on role manager was called.");
116         entityOwnershipListenerRegistration.close();
117         txEntityOwnershipListenerRegistration.close();
118         for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
119             // got here because last known role is LEADER and DS might need clearing up
120             final RoleContext roleContext = iterator.next();
121             watchingEntities.remove(roleContext.getEntity());
122             watchingEntities.remove(roleContext.getTxEntity());
123             contexts.remove(roleContext.getDeviceInfo());
124             if (roleContext.isTxCandidateRegistered()) {
125                 LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.");
126                 removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
127             } else {
128                 roleContext.close();
129             }
130         }
131     }
132
133     @Override
134     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
135         LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
136         final RoleContext roleContext = contexts.get(deviceInfo);
137         if (roleContext != null) {
138             LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
139             if (roleContext.isMainCandidateRegistered()) {
140                 roleContext.unregisterCandidate(roleContext.getEntity());
141             } else {
142                 contexts.remove(deviceInfo.getNodeId(), roleContext);
143                 roleContext.close();
144             }
145         }
146         deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
147     }
148
149     @VisibleForTesting
150     static Entity makeEntity(final NodeId nodeId) {
151         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
152     }
153
154     @VisibleForTesting
155     static Entity makeTxEntity(final NodeId nodeId) {
156         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
157     }
158
159     @Override
160     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
161
162         Preconditions.checkArgument(ownershipChange != null);
163         final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
164
165         LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
166                 ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
167                 ownershipChange.getEntity().getType(),
168                 roleContext != null ? roleContext.getDeviceInfo().getNodeId() : "-> no watching entity, disregarding notification <-");
169
170         if (roleContext != null) {
171             if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
172                 changeOwnershipForMainEntity(ownershipChange, roleContext);
173             } else {
174                 changeOwnershipForTxEntity(ownershipChange, roleContext);
175             }
176         } else {
177             LOG.debug("OwnershipChange {}", ownershipChange);
178         }
179
180     }
181
182     @VisibleForTesting
183     void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
184
185         if (roleContext.isMainCandidateRegistered()) {
186             LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
187                     ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
188             if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
189                 // SLAVE -> MASTER
190                 LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
191                 if (roleContext.registerCandidate(roleContext.getTxEntity())) {
192                     LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
193                     watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
194                 }
195             } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
196                 // MASTER -> SLAVE
197                 LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
198                 conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
199                 makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
200             }
201         } else {
202             LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
203                     ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
204             watchingEntities.remove(ownershipChange.getEntity(), roleContext);
205             if (roleContext.isTxCandidateRegistered()) {
206                 LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
207                 roleContext.unregisterCandidate(roleContext.getTxEntity());
208                 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
209                     LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
210                     removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
211                 }
212             } else {
213                 contexts.remove(roleContext.getDeviceInfo(), roleContext);
214                 roleContext.close();
215                 conductor.closeConnection(roleContext.getDeviceInfo());
216             }
217         }
218     }
219
220     @VisibleForTesting
221     void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
222             @Nonnull final RoleContext roleContext) {
223
224         if (roleContext.isTxCandidateRegistered()) {
225             LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
226                     ownershipChange.getEntity().getType(),
227                     roleContext.getDeviceInfo().getNodeId());
228             if (ownershipChange.inJeopardy()) {
229                 LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
230                 watchingEntities.remove(roleContext.getTxEntity());
231                 roleContext.unregisterCandidate(roleContext.getTxEntity());
232             } else {
233                 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
234                     // SLAVE -> MASTER
235                     LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
236                     makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
237                 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
238                     // MASTER -> SLAVE
239                     LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
240                     LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
241                             ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
242                     watchingEntities.remove(roleContext.getTxEntity(), roleContext);
243                     watchingEntities.remove(roleContext.getEntity(), roleContext);
244                     roleContext.unregisterCandidate(roleContext.getEntity());
245                     roleContext.unregisterCandidate(roleContext.getTxEntity());
246                     if (!ownershipChange.hasOwner()) {
247                         LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
248                         removeDeviceFromOperationalDS(roleContext.getDeviceInfo(), MAX_CLEAN_DS_RETRIES);
249                     } else {
250                         contexts.remove(roleContext.getDeviceInfo(), roleContext);
251                         roleContext.close();
252                         conductor.closeConnection(roleContext.getDeviceInfo());
253                     }
254                 }
255             }
256         } else {
257             LOG.debug("Tx-EntityOwnershipRegistration is not active for entity type {} and node {}", ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
258             watchingEntities.remove(roleContext.getTxEntity(), roleContext);
259             contexts.remove(roleContext.getDeviceInfo(), roleContext);
260             roleContext.close();
261             conductor.closeConnection(roleContext.getDeviceInfo());
262         }
263     }
264
265     @VisibleForTesting
266     void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
267         final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
268         Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
269             @Override
270             public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
271                 LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
272                 if (!init) {
273                     notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), role);
274                 }
275             }
276
277             @Override
278             public void onFailure(@Nonnull final Throwable throwable) {
279                 LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
280                 conductor.closeConnection(roleContext.getDeviceInfo());
281             }
282         });
283     }
284
285     @VisibleForTesting
286     ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
287         LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
288         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
289         final Short version = roleContext.getDeviceInfo().getVersion();
290         if (null == version) {
291             LOG.debug("Device version is null");
292             return Futures.immediateFuture(null);
293         }
294         if (version < OFConstants.OFP_VERSION_1_3) {
295             LOG.debug("Device version not support ROLE");
296             return Futures.immediateFuture(null);
297         } else {
298             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
299                     .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
300             setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
301             final TimerTask timerTask = timeout -> {
302                 if (!setRoleOutputFuture.isDone()) {
303                     LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
304                     setRoleOutputFuture.cancel(true);
305                 }
306             };
307             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
308         }
309         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
310     }
311
312     @VisibleForTesting
313     CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo, final int numRetries) {
314         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
315         delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
316         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
317
318         Futures.addCallback(delFuture, new FutureCallback<Void>() {
319             @Override
320             public void onSuccess(final Void result) {
321                 LOG.debug("Delete Node {} was successful", deviceInfo);
322                 final RoleContext roleContext = contexts.remove(deviceInfo);
323                 if (roleContext != null) {
324                     roleContext.close();
325                 }
326             }
327
328             @Override
329             public void onFailure(@Nonnull final Throwable t) {
330                 // If we have any retries left, we will try to clean the datastore again
331                 if (numRetries > 0) {
332                     // We "used" one retry here, so decrement it
333                     final int curRetries = numRetries - 1;
334                     LOG.debug("Delete node {} failed with exception {}. Trying again (retries left: {})", deviceInfo.getNodeId(), t, curRetries);
335                     // Recursive call to this method with "one less" retry
336                     removeDeviceFromOperationalDS(deviceInfo, curRetries);
337                     return;
338                 }
339
340                 // No retries left, so we will just close the role context, and ignore datastore cleanup
341                 LOG.warn("Delete node {} failed with exception {}. No retries left, aborting", deviceInfo.getNodeId(), t);
342                 final RoleContext roleContext = contexts.remove(deviceInfo);
343                 if (roleContext != null) {
344                     roleContext.close();
345                 }
346             }
347         });
348
349         return delFuture;
350     }
351
352     @Override
353     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
354         deviceTerminationPhaseHandler = handler;
355     }
356
357     @Override
358     public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
359         LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
360         final RoleContext roleContext = contexts.get(deviceInfo);
361         if (null != roleContext) {
362             /* Services stopped or failure */
363             roleContext.unregisterCandidate(roleContext.getTxEntity());
364         }
365     }
366
367     @VisibleForTesting
368     RoleContext getRoleContext(final DeviceInfo deviceInfo){
369         return contexts.get(deviceInfo);
370     }
371
372     /**
373      * This method is only for testing
374      */
375     @VisibleForTesting
376     void setRoleContext(DeviceInfo deviceInfo, RoleContext roleContext){
377         if(!contexts.containsKey(deviceInfo)) {
378             contexts.put(deviceInfo, roleContext);
379         }
380     }
381
382     @Override
383     public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
384         this.listeners.add(roleChangeListener);
385     }
386
387     /**
388      * Invoked when initialization phase is done
389      * @param deviceInfo node identification
390      * @param success true if initialization done ok, false otherwise
391      */
392     @VisibleForTesting
393     void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
394         LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
395         for (final RoleChangeListener listener : listeners) {
396             listener.roleInitializationDone(deviceInfo, success);
397         }
398     }
399
400     /**
401      * Notifies registered listener on role change. Role is the new role on device
402      * If initialization phase is true, we may skip service starting
403      * @param deviceInfo
404      * @param role new role meant to be set on device
405      */
406     @VisibleForTesting
407     void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole role){
408         LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
409         for (final RoleChangeListener listener : listeners) {
410             listener.roleChangeOnDevice(deviceInfo, role);
411         }
412     }
413
414     @Override
415     public <T extends OFPContext> T gainContext(DeviceInfo deviceInfo) {
416         return (T) contexts.get(deviceInfo);
417     }
418 }