Merge "DeviceState changes"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterators;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.JdkFutureAdapters;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.TimerTask;
20
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.CheckForNull;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
37 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
38 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
39 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
40 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
41 import org.opendaylight.openflowplugin.api.OFConstants;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
49 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
50 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
51 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
52 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
59 import org.opendaylight.yangtools.yang.common.RpcResult;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
65  * On receipt of the ownership notification, makes an rpc call to SalRoleService.
66  *
67  * Hands over to StatisticsManager at the end.
68  */
69 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener, ServiceChangeListener {
70     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
71
72     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
73     private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
74     private final DataBroker dataBroker;
75     private final EntityOwnershipService entityOwnershipService;
76     private final ConcurrentMap<NodeId, RoleContext> contexts = new ConcurrentHashMap<>();
77     private final ConcurrentMap<Entity, RoleContext> watchingEntities = new ConcurrentHashMap<>();
78     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
79     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
80     private List<RoleChangeListener> listeners = new ArrayList<>();
81
82     private final LifecycleConductor conductor;
83
84     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final LifecycleConductor lifecycleConductor) {
85         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
86         this.dataBroker = Preconditions.checkNotNull(dataBroker);
87         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
88         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
89         this.conductor = lifecycleConductor;
90         LOG.debug("Register OpenflowOwnershipListener to all entity ownership changes");
91     }
92
93     @Override
94     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
95         deviceInitializationPhaseHandler = handler;
96     }
97
98     @Override
99     public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo) throws Exception {
100         final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(deviceInfo));
101         final RoleContext roleContext = new RoleContextImpl(deviceInfo, entityOwnershipService, makeEntity(deviceInfo.getNodeId()), makeTxEntity(deviceInfo.getNodeId()), conductor);
102         roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
103         Verify.verify(contexts.putIfAbsent(deviceInfo.getNodeId(), roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
104         makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, true);
105         /* First start to watch entity so we don't miss any notification, and then try to register in EOS */
106         notifyListenersRoleInitializationDone(roleContext.getDeviceInfo(), roleContext.initialization());
107         watchingEntities.put(roleContext.getEntity(), roleContext);
108         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo);
109     }
110
111     @Override
112     public void close() {
113         LOG.debug("Close method on role manager was called.");
114         entityOwnershipListenerRegistration.close();
115         txEntityOwnershipListenerRegistration.close();
116         for (final Iterator<RoleContext> iterator = Iterators.consumingIterator(contexts.values().iterator()); iterator.hasNext();) {
117             // got here because last known role is LEADER and DS might need clearing up
118             final RoleContext roleContext = iterator.next();
119             watchingEntities.remove(roleContext.getEntity());
120             watchingEntities.remove(roleContext.getTxEntity());
121             contexts.remove(roleContext.getDeviceInfo().getNodeId());
122             if (roleContext.isTxCandidateRegistered()) {
123                 LOG.info("Node {} was holder txEntity, so trying to remove device from operational DS.");
124                 removeDeviceFromOperationalDS(roleContext.getDeviceInfo().getNodeId());
125             } else {
126                 roleContext.close();
127             }
128         }
129     }
130
131     @Override
132     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
133         LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
134         final RoleContext roleContext = contexts.get(deviceInfo.getNodeId());
135         if (roleContext != null) {
136             LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
137             if (roleContext.isMainCandidateRegistered()) {
138                 roleContext.unregisterCandidate(roleContext.getEntity());
139             } else {
140                 contexts.remove(deviceInfo.getNodeId(), roleContext);
141                 roleContext.close();
142             }
143         }
144         deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
145     }
146
147     @VisibleForTesting
148     static Entity makeEntity(final NodeId nodeId) {
149         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
150     }
151
152     @VisibleForTesting
153     static Entity makeTxEntity(final NodeId nodeId) {
154         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
155     }
156
157     @Override
158     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
159
160         Preconditions.checkArgument(ownershipChange != null);
161         final RoleContext roleContext = watchingEntities.get(ownershipChange.getEntity());
162
163         LOG.debug("Received EOS message: wasOwner:{} isOwner:{} hasOwner:{} inJeopardy:{} for entity type {} and node {}",
164                 ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner(), ownershipChange.inJeopardy(),
165                 ownershipChange.getEntity().getType(),
166                 roleContext != null ? roleContext.getDeviceInfo().getNodeId() : "-> no watching entity, disregarding notification <-");
167
168         if (roleContext != null) {
169             if (ownershipChange.getEntity().equals(roleContext.getEntity())) {
170                 changeOwnershipForMainEntity(ownershipChange, roleContext);
171             } else {
172                 changeOwnershipForTxEntity(ownershipChange, roleContext);
173             }
174         } else {
175             LOG.debug("OwnershipChange {}", ownershipChange);
176         }
177
178     }
179
180     @VisibleForTesting
181     void changeOwnershipForMainEntity(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
182
183         if (roleContext.isMainCandidateRegistered()) {
184             LOG.debug("Main-EntityOwnershipRegistration is active for entity type {} and node {}",
185                     ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
186             if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && !ownershipChange.inJeopardy()) {
187                 // SLAVE -> MASTER
188                 LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
189                 if (roleContext.registerCandidate(roleContext.getTxEntity())) {
190                     LOG.debug("Starting watching tx entity for node {}", roleContext.getDeviceInfo().getNodeId());
191                     watchingEntities.putIfAbsent(roleContext.getTxEntity(), roleContext);
192                 }
193             } else if ((ownershipChange.wasOwner() && !ownershipChange.isOwner()) || (ownershipChange.inJeopardy())) {
194                 // MASTER -> SLAVE
195                 LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
196                 conductor.addOneTimeListenerWhenServicesChangesDone(this, roleContext.getDeviceInfo());
197                 makeDeviceRoleChange(OfpRole.BECOMESLAVE, roleContext, false);
198             }
199         } else {
200             LOG.debug("Main-EntityOwnershipRegistration is not active for entity type {} and node {}",
201                     ownershipChange.getEntity(), roleContext.getDeviceInfo().getNodeId());
202             watchingEntities.remove(ownershipChange.getEntity(), roleContext);
203             if (roleContext.isTxCandidateRegistered()) {
204                 LOG.debug("tx candidate still registered for node {}, probably connection lost, trying to unregister tx candidate", roleContext.getDeviceInfo().getNodeId());
205                 roleContext.unregisterCandidate(roleContext.getTxEntity());
206                 if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && !ownershipChange.hasOwner()) {
207                     LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
208                     removeDeviceFromOperationalDS(roleContext.getDeviceInfo().getNodeId());
209                 }
210             } else {
211                 final NodeId nodeId = roleContext.getDeviceInfo().getNodeId();
212                 contexts.remove(nodeId, roleContext);
213                 roleContext.close();
214                 conductor.closeConnection(roleContext.getDeviceInfo());
215             }
216         }
217     }
218
219     @VisibleForTesting
220     void changeOwnershipForTxEntity(final EntityOwnershipChange ownershipChange,
221             @Nonnull final RoleContext roleContext) {
222
223         if (roleContext.isTxCandidateRegistered()) {
224             LOG.debug("Tx-EntityOwnershipRegistration is active for entity type {} and node {}",
225                     ownershipChange.getEntity().getType(),
226                     roleContext.getDeviceInfo().getNodeId());
227             if (ownershipChange.inJeopardy()) {
228                 LOG.warn("Getting 'inJeopardy' flag from EOS. Removing txCandidate and stopping watching txCandidate.");
229                 watchingEntities.remove(roleContext.getTxEntity());
230                 roleContext.unregisterCandidate(roleContext.getTxEntity());
231             } else {
232                 if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
233                     // SLAVE -> MASTER
234                     LOG.debug("SLAVE to MASTER for node {}", roleContext.getDeviceInfo().getNodeId());
235                     makeDeviceRoleChange(OfpRole.BECOMEMASTER, roleContext, false);
236                 } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
237                     // MASTER -> SLAVE
238                     LOG.debug("MASTER to SLAVE for node {}", roleContext.getDeviceInfo().getNodeId());
239                     LOG.warn("Tx-EntityOwnershipRegistration lost leadership entity type {} and node {}",
240                             ownershipChange.getEntity().getType(), roleContext.getDeviceInfo().getNodeId());
241                     watchingEntities.remove(roleContext.getTxEntity(), roleContext);
242                     watchingEntities.remove(roleContext.getEntity(), roleContext);
243                     roleContext.unregisterCandidate(roleContext.getEntity());
244                     roleContext.unregisterCandidate(roleContext.getTxEntity());
245                     if (!ownershipChange.hasOwner()) {
246                         LOG.debug("Trying to remove from operational node: {}", roleContext.getDeviceInfo().getNodeId());
247                         removeDeviceFromOperationalDS(roleContext.getDeviceInfo().getNodeId());
248                     } else {
249                         final NodeId nodeId = roleContext.getDeviceInfo().getNodeId();
250                         contexts.remove(nodeId, roleContext);
251                         roleContext.close();
252                         conductor.closeConnection(roleContext.getDeviceInfo());
253                     }
254                 }
255             }
256         } else {
257             LOG.debug("Tx-EntityOwnershipRegistration is not active for entity {}", ownershipChange.getEntity().getType());
258             watchingEntities.remove(roleContext.getTxEntity(), roleContext);
259             final NodeId nodeId = roleContext.getDeviceInfo().getNodeId();
260             contexts.remove(nodeId, roleContext);
261             roleContext.close();
262             conductor.closeConnection(roleContext.getDeviceInfo());
263         }
264     }
265
266     @VisibleForTesting
267     void makeDeviceRoleChange(final OfpRole role, final RoleContext roleContext, final Boolean init) {
268         final ListenableFuture<RpcResult<SetRoleOutput>> roleChangeFuture = sendRoleChangeToDevice(role, roleContext);
269         Futures.addCallback(roleChangeFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
270             @Override
271             public void onSuccess(@Nullable final RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
272                 LOG.info("Role {} successfully set on device {}", role, roleContext.getDeviceInfo().getNodeId());
273                 notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), true, role, init);
274             }
275
276             @Override
277             public void onFailure(@Nonnull final Throwable throwable) {
278                 LOG.warn("Unable to set role {} on device {}", role, roleContext.getDeviceInfo().getNodeId());
279                 notifyListenersRoleChangeOnDevice(roleContext.getDeviceInfo(), false, role, init);
280             }
281         });
282     }
283
284     @VisibleForTesting
285     ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole, final RoleContext roleContext) {
286         LOG.debug("Sending new role {} to device {}", newRole, roleContext.getDeviceInfo().getNodeId());
287         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
288         final Short version = conductor.gainVersionSafely(roleContext.getDeviceInfo());
289         if (null == version) {
290             LOG.debug("Device version is null");
291             return Futures.immediateFuture(null);
292         }
293         if (version < OFConstants.OFP_VERSION_1_3) {
294             LOG.debug("Device version not support ROLE");
295             return Futures.immediateFuture(null);
296         } else {
297             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
298                     .setNode(new NodeRef(DeviceStateUtil.createNodeInstanceIdentifier(roleContext.getDeviceInfo().getNodeId()))).build();
299             setRoleOutputFuture = roleContext.getSalRoleService().setRole(setRoleInput);
300             final TimerTask timerTask = timeout -> {
301                 if (!setRoleOutputFuture.isDone()) {
302                     LOG.warn("New role {} was not propagated to device {} during 10 sec", newRole, roleContext.getDeviceInfo().getNodeId());
303                     setRoleOutputFuture.cancel(true);
304                 }
305             };
306             conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS);
307         }
308         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
309     }
310
311     @VisibleForTesting
312     CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final NodeId nodeId) {
313
314         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
315         delWtx.delete(LogicalDatastoreType.OPERATIONAL, DeviceStateUtil.createNodeInstanceIdentifier(nodeId));
316         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
317         Futures.addCallback(delFuture, new FutureCallback<Void>() {
318
319             @Override
320             public void onSuccess(final Void result) {
321                 LOG.debug("Delete Node {} was successful", nodeId);
322                 final RoleContext roleContext = contexts.remove(nodeId);
323                 if (roleContext != null) {
324                     roleContext.close();
325                 }
326             }
327
328             @Override
329             public void onFailure(@Nonnull final Throwable t) {
330                 LOG.warn("Delete Node {} failed. {}", nodeId, t);
331                 contexts.remove(nodeId);
332                 final RoleContext roleContext = contexts.remove(nodeId);
333                 if (roleContext != null) {
334                     roleContext.close();
335                 }
336             }
337         });
338         return delFuture;
339     }
340
341     @Override
342     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
343         deviceTerminationPhaseHandler = handler;
344     }
345
346     @Override
347     public void servicesChangeDone(final DeviceInfo deviceInfo, final boolean success) {
348         LOG.debug("Services stopping done for node {} as " + (success ? "successful" : "unsuccessful"), deviceInfo);
349         final RoleContext roleContext = contexts.get(deviceInfo.getNodeId());
350         if (null != roleContext) {
351             /* Services stopped or failure */
352             roleContext.unregisterCandidate(roleContext.getTxEntity());
353         }
354     }
355
356     @VisibleForTesting
357     RoleContext getRoleContext(final NodeId nodeId){
358         return contexts.get(nodeId);
359     }
360
361     /**
362      * This method is only for testing
363      */
364     @VisibleForTesting
365     void setRoleContext(NodeId nodeId, RoleContext roleContext){
366         if(!contexts.containsKey(nodeId)) {
367             contexts.put(nodeId, roleContext);
368         }
369     }
370
371     @Override
372     public void addRoleChangeListener(final RoleChangeListener roleChangeListener) {
373         this.listeners.add(roleChangeListener);
374     }
375
376     /**
377      * Invoked when initialization phase is done
378      * @param deviceInfo node identification
379      * @param success true if initialization done ok, false otherwise
380      */
381     @VisibleForTesting
382     void notifyListenersRoleInitializationDone(final DeviceInfo deviceInfo, final boolean success){
383         LOG.debug("Notifying registered listeners for role initialization done, no. of listeners {}", listeners.size());
384         for (final RoleChangeListener listener : listeners) {
385             listener.roleInitializationDone(deviceInfo, success);
386         }
387     }
388
389     /**
390      * Notifies registered listener on role change. Role is the new role on device
391      * If initialization phase is true, we may skip service starting
392      * @param deviceInfo
393      * @param success true if role change on device done ok, false otherwise
394      * @param role new role meant to be set on device
395      * @param initializationPhase if true, then skipp services start
396      */
397     @VisibleForTesting
398     void notifyListenersRoleChangeOnDevice(final DeviceInfo deviceInfo, final boolean success, final OfpRole role, final boolean initializationPhase){
399         LOG.debug("Notifying registered listeners for role change, no. of listeners {}", listeners.size());
400         for (final RoleChangeListener listener : listeners) {
401             listener.roleChangeOnDevice(deviceInfo, success, role, initializationPhase);
402         }
403     }
404
405 }