Optimized imports
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.JdkFutureAdapters;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20 import java.util.ArrayList;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.CheckForNull;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
48 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
49 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
50 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
51 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
64  * On receipt of the ownership notification, makes an rpc call to SalRoleService.
65  *
66  * Hands over to StatisticsManager at the end.
67  */
68 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
69     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
70
71     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
72     private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
73     private final DataBroker dataBroker;
74     private final EntityOwnershipService entityOwnershipService;
75     private final ConcurrentMap<DeviceInfo, RoleContext> contexts = new ConcurrentHashMap<>();
76     private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
77     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
78     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
79     private List<RoleChangeListener> listeners = new ArrayList<>();
80
81     private final LifecycleConductor conductor;
82
83     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
84         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
85         this.dataBroker = Preconditions.checkNotNull(dataBroker);
86         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
87         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
88         this.conductor = lifecycleConductor;
89         LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
90     }
91
92     @Override
93     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
94         deviceInitializationPhaseHandler = handler;
95     }
96
97     @Override
98     public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
99         final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
100         final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
101         roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
102         Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
103         makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
104         /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
105         notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
106         watchingEntities.put(roleContext.getEntity(), roleContext);
107         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
108     }
109
110     @Override
111     public void close() {
112         LOG.debug("Close method on role manager was called.");
113         entityOwnershipListenerRegistration.close();
114         txEntityOwnershipListenerRegistration.close();
115         for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
116             // got here because last known role is LEADER and DS might need clearing up
117             final RoleContext roleContext = iterator.next();
118             watchingEntities.remove(roleContext.getEntity());
119             watchingEntities.remove(roleContext.getTxEntity());
120             contexts.remove(roleContext.getDeviceInfo());
121             if (roleContext.isTxCandidateRegistered()) {
122                 LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.");
123                 removeDeviceFromOperationalDS(roleContext.getDeviceInfo());
124             } else {
125                 roleContext.close();
126             }
127         }
128     }
129
130     @Override
131     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
132         LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
133         final RoleContext roleContext = contexts.get(deviceInfo);
134         if (roleContext != null) {
135             LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
136             if (roleContext.isMainCandidateRegistered()) {
137                 roleContext.unregisterCandidate(roleContext.getEntity());
138             } else {
139                 contexts.remove(deviceInfo.getNodeId(), roleContext);
140                 roleContext.close();
141             }
142         }
143         deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
144     }
145
146     @VisibleForTesting
147     static Entity makeEntity(final NodeId nodeId) {
148         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
149     }
150
151     @VisibleForTesting
152     static Entity makeTxEntity(final NodeId nodeId) {
153         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
154     }
155
156     @Override
157     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
158
159         Preconditions.checkArgument(ownershipChange != null);
160         final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
161
162         LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
163                 ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
164                 ownershipChange.getEntity().getType(),
165                 roleContext != null ? roleContext.getDeviceInfo().getNodeId() : "-> no watching entity, disregarding notification <-");
166
167         if (roleContext != null) {
168             if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
169                 changeOwnershipForMainEntity(ownershipChange, roleContext);
170             } else {
171                 changeOwnershipForTxEntity(ownershipChange, roleContext);
172             }
173         } else {
174             LOG.debug("OwnershipChange {}", ownershipChange);
175         }
176
177     }
178
179     @VisibleForTesting
180     void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
181
182         if (roleContext.isMainCandidateRegistered()) {
183             LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
184                     ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
185             if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
186                 // SLAVE -> MASTER
187                 LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
188                 if (roleContext.registerCandidate(roleContext.getTxEntity())) {
189                     LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
190                     watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
191                 }
192             } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
193                 // MASTER -> SLAVE
194                 LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
195                 conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
196                 makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
197             }
198         } else {
199             LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
200                     ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
201             watchingEntities.remove(ownershipChange.getEntity(), roleContext);
202             if (roleContext.isTxCandidateRegistered()) {
203                 LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
204                 roleContext.unregisterCandidate(roleContext.getTxEntity());
205                 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
206                     LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
207                     removeDeviceFromOperationalDS(roleContext.getDeviceInfo());
208                 }
209             } else {
210                 contexts.remove(roleContext.getDeviceInfo(), roleContext);
211                 roleContext.close();
212                 conductor.closeConnection(roleContext.getDeviceInfo());
213             }
214         }
215     }
216
217     @VisibleForTesting
218     void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
219             @Nonnull final RoleContext roleContext) {
220
221         if (roleContext.isTxCandidateRegistered()) {
222             LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
223                     ownershipChange.getEntity().getType(),
224                     roleContext.getDeviceInfo().getNodeId());
225             if (ownershipChange.inJeopardy()) {
226                 LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
227                 watchingEntities.remove(roleContext.getTxEntity());
228                 roleContext.unregisterCandidate(roleContext.getTxEntity());
229             } else {
230                 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
231                     // SLAVE -> MASTER
232                     LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
233                     makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
234                 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
235                     // MASTER -> SLAVE
236                     LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
237                     LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
238                             ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
239                     watchingEntities.remove(roleContext.getTxEntity(), roleContext);
240                     watchingEntities.remove(roleContext.getEntity(), roleContext);
241                     roleContext.unregisterCandidate(roleContext.getEntity());
242                     roleContext.unregisterCandidate(roleContext.getTxEntity());
243                     if (!ownershipChange.hasOwner()) {
244                         LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
245                         removeDeviceFromOperationalDS(roleContext.getDeviceInfo());
246                     } else {
247                         final NodeId nodeId = roleContext.getDeviceInfo().getNodeId();
248                         contexts.remove(roleContext.getDeviceInfo(), roleContext);
249                         roleContext.close();
250                         conductor.closeConnection(roleContext.getDeviceInfo());
251                     }
252                 }
253             }
254         } else {
255             LOG.debug("Tx-EntityOwnershipRegistration is not active for entity {}", ownershipChange.getEntity().getType());
256             watchingEntities.remove(roleContext.getTxEntity(), roleContext);
257             contexts.remove(roleContext.getDeviceInfo(), roleContext);
258             roleContext.close();
259             conductor.closeConnection(roleContext.getDeviceInfo());
260         }
261     }
262
263     @VisibleForTesting
264     void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
265         final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
266         Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
267             @Override
268             public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
269                 LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
270                 notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), true, role, init);
271             }
272
273             @Override
274             public void onFailure(@Nonnull final Throwable throwable) {
275                 LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
276                 notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), false, role, init);
277             }
278         });
279     }
280
281     @VisibleForTesting
282     ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
283         LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
284         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
285         final Short version = roleContext.getDeviceInfo().getVersion();
286         if (null == version) {
287             LOG.debug("Device version is null");
288             return Futures.immediateFuture(null);
289         }
290         if (version < OFConstants.OFP_VERSION_1_3) {
291             LOG.debug("Device version not support ROLE");
292             return Futures.immediateFuture(null);
293         } else {
294             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
295                     .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
296             setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
297             final TimerTask timerTask = timeout -> {
298                 if (!setRoleOutputFuture.isDone()) {
299                     LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
300                     setRoleOutputFuture.cancel(true);
301                 }
302             };
303             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
304         }
305         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
306     }
307
308     @VisibleForTesting
309     CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
310
311         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
312         delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(deviceInfo.getNodeId()));
313         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
314         Futures.addCallback(delFuture, new FutureCallback<Void>() {
315
316             @Override
317             public void onSuccess(final Void result) {
318                 LOG.debug("Delete Node {} was successful", deviceInfo);
319                 final RoleContext roleContext = contexts.remove(deviceInfo);
320                 if (roleContext != null) {
321                     roleContext.close();
322                 }
323             }
324
325             @Override
326             public void onFailure(@Nonnull final Throwable t) {
327                 LOG.warn("Delete Node {} failed. {}", deviceInfo, t);
328                 contexts.remove(deviceInfo);
329                 final RoleContext roleContext = contexts.remove(deviceInfo);
330                 if (roleContext != null) {
331                     roleContext.close();
332                 }
333             }
334         });
335         return delFuture;
336     }
337
338     @Override
339     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
340         deviceTerminationPhaseHandler = handler;
341     }
342
343     @Override
344     public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
345         LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
346         final RoleContext roleContext = contexts.get(deviceInfo);
347         if (null != roleContext) {
348             /* Services stopped or failure */
349             roleContext.unregisterCandidate(roleContext.getTxEntity());
350         }
351     }
352
353     @VisibleForTesting
354     RoleContext getRoleContext(final DeviceInfo deviceInfo){
355         return contexts.get(deviceInfo);
356     }
357
358     /**
359      * This method is only for testing
360      */
361     @VisibleForTesting
362     void setRoleContext(DeviceInfo deviceInfo, RoleContext roleContext){
363         if(!contexts.containsKey(deviceInfo)) {
364             contexts.put(deviceInfo, roleContext);
365         }
366     }
367
368     @Override
369     public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
370         this.listeners.add(roleChangeListener);
371     }
372
373     /**
374      * Invoked when initialization phase is done
375      * @param deviceInfo node identification
376      * @param success true if initialization done ok, false otherwise
377      */
378     @VisibleForTesting
379     void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
380         LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
381         for (final RoleChangeListener listener : listeners) {
382             listener.roleInitializationDone(deviceInfo, success);
383         }
384     }
385
386     /**
387      * Notifies registered listener on role change. Role is the new role on device
388      * If initialization phase is true, we may skip service starting
389      * @param deviceInfo
390      * @param success true if role change on device done ok, false otherwise
391      * @param role new role meant to be set on device
392      * @param initializationPhase if true, then skipp services start
393      */
394     @VisibleForTesting
395     void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final boolean success, final OfpRole role, final boolean initializationPhase){
396         LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
397         for (final RoleChangeListener listener : listeners) {
398             listener.roleChangeOnDevice(deviceInfo, success, role, initializationPhase);
399         }
400     }
401
402     @Override
403     public <T extends OFPContext> T gainContext(DeviceInfo deviceInfo) {
404         return (T) contexts.get(deviceInfo);
405     }
406 }