Merge "Bug-4957 Cluster Role change fix"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Semaphore;
23 import javax.annotation.CheckForNull;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
29 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
41 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
42 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
43 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
51  * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
52  *
53  * Hands over to StatisticsManager at the end.
54  */
55 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
56     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
57
58     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
59     private final DataBroker dataBroker;
60     private final EntityOwnershipService entityOwnershipService;
61     private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
62     private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
63     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
64     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
65     private final boolean switchFeaturesMandatory;
66
67     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
68         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
69         this.dataBroker = Preconditions.checkNotNull(dataBroker);
70         this.switchFeaturesMandatory = switchFeaturesMandatory;
71         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
72         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
73         LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
74     }
75
76     @Override
77     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
78         deviceInitializationPhaseHandler = handler;
79     }
80
81     @Override
82     public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
83         LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
84
85         final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
86                 makeEntity(deviceContext.getDeviceState().getNodeId()),
87                 makeTxEntity(deviceContext.getDeviceState().getNodeId()));
88         // if the device context gets closed (mostly on connection close), we would need to cleanup
89         deviceContext.addDeviceContextClosedHandler(this);
90         final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
91         Verify.verify(previousContext == null,
92                 "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
93
94         roleContext.initialization();
95     }
96
97     void getRoleContextLevelUp(final DeviceContext deviceContext) {
98         LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
99         LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
100         try {
101             deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
102         } catch (final Exception e) {
103             LOG.info("failed to complete levelUp on next handler for device {}",
104                     deviceContext.getDeviceState().getNodeId());
105             deviceContext.close();
106             return;
107         }
108     }
109
110     @Override
111     public void close() throws Exception {
112         entityOwnershipListenerRegistration.close();
113         txEntityOwnershipListenerRegistration.close();
114         for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
115             // got here because last known role is LEADER and DS might need clearing up
116             final Entity entity = roleContextEntry.getKey();
117             final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
118             final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
119             if (ownershipState.isPresent()) {
120                 if ((!ownershipState.get().hasOwner())) {
121                     LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
122                             "cleaning DS as being probably the last owner", nodeId);
123                     removeDeviceFromOperDS(roleContextEntry.getValue());
124                 } else {
125                     // NOOP - there is another owner
126                     LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
127                             "leaving DS untouched", nodeId);
128                 }
129             } else {
130                 // TODO: is this safe? When could this happen?
131                 LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
132                         "cleaning DS ANYWAY!", nodeId);
133                 removeDeviceFromOperDS(roleContextEntry.getValue());
134             }
135         }
136         contexts.clear();
137     }
138
139     @Override
140     public void onDeviceContextClosed(final DeviceContext deviceContext) {
141         final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
142         LOG.debug("onDeviceContextClosed for node {}", nodeId);
143         final Entity entity = makeEntity(nodeId);
144         final RoleContext roleContext = contexts.get(entity);
145         if (roleContext != null) {
146             LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
147             final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
148             if (actState.isPresent()) {
149                 if (actState.get().isOwner()) {
150                     if (!txContexts.containsKey(roleContext.getTxEntity())) {
151                         try {
152                             txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
153                             roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
154                             roleContext.setupTxCandidate();
155                             // we'd like to wait for registration response
156                             return;
157                         } catch (final CandidateAlreadyRegisteredException e) {
158                             // NOOP
159                         }
160                     }
161                 } else {
162                     LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
163                     contexts.remove(entity, roleContext);
164                     // TODO : is there a chance to have TxEntity ?
165                 }
166             } else {
167                 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
168             }
169             roleContext.close();
170         }
171     }
172
173     private static Entity makeEntity(final NodeId nodeId) {
174         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
175     }
176
177     private static Entity makeTxEntity(final NodeId nodeId) {
178         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
179     }
180
181     @Override
182     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
183         Preconditions.checkArgument(ownershipChange != null);
184         RoleContext roleCtxForClose = null;
185         try {
186             final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
187             if (roleContext != null) {
188                 roleCtxForClose = roleContext;
189                 changeForEntity(ownershipChange, roleContext);
190                 return;
191             }
192
193             final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
194             if (txRoleContext != null) {
195                 roleCtxForClose = txRoleContext;
196                 changeForTxEntity(ownershipChange, txRoleContext);
197                 return;
198             }
199         } catch (final InterruptedException e) {
200             LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
201             if (roleCtxForClose != null) {
202                 roleCtxForClose.close();
203             }
204         }
205
206         LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
207                 ownershipChange.getEntity(), ownershipChange);
208     }
209
210     private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext)
211             throws InterruptedException {
212         LOG.info("Received TX-EntityOwnershipChange:{}", ownershipChange);
213         final Semaphore txCandidateGuard = roleContext.getTxCandidateGuard();
214         LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
215         txCandidateGuard.acquire();
216
217         ListenableFuture<Void> processingClosure;
218         final DeviceContext deviceContext = roleContext.getDeviceContext();
219         final NodeId nodeId = roleContext.getDeviceState().getNodeId();
220
221         if (!roleContext.getDeviceState().isValid()
222                 && RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
223             LOG.debug("Node {} ownership changed during closing process", roleContext.getDeviceState().getNodeId());
224             roleContext.close();
225             txCandidateGuard.release();
226             return;
227         }
228
229         if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
230             // SLAVE -> MASTER - acquired transition lock
231             LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
232             roleContext.setTxLockOwned(true);
233             final OfpRole role = roleContext.getDeviceState().getRole();
234             Verify.verify(OfpRole.BECOMEMASTER.equals(roleContext.getPropagatingRole()),
235                     "Acquired tx-lock but current role = {}", role);
236
237             switch (roleContext.getState()) {
238                 case STARTING:
239                     processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
240                     // activate stats - accomplished automatically by chaging role in deviceState
241                     // collect initial dynamic data from device
242                     processingClosure = Futures.transform(processingClosure, new AsyncFunction<Void, Void>() {
243                         @Nullable
244                         @Override
245                         public ListenableFuture<Void> apply(@Nullable final Void aVoid) {
246                             deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
247                             return DeviceInitializationUtils.initializeNodeInformation(
248                                     deviceContext, switchFeaturesMandatory);
249                         }
250                     });
251                     break;
252                 case WORKING:
253                     // activate txChainManager, activate rpcs
254                     processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
255                     // activate stats - accomplished automatically by chaging role in deviceState
256                     processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
257                         @Nullable
258                         @Override
259                         public Void apply(@Nullable final Void aVoid) {
260                             deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
261                             return null;
262                         }
263                     });
264                     break;
265                 //case TEARING_DOWN:
266                 default:
267                     //TODO: reconsider if there is really nothing to do when tearing down
268                     processingClosure = Futures.immediateFuture(null);
269                     break;
270             }
271         } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
272             // MASTER -> SLAVE - released tx-lock
273             LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
274             roleContext.setTxLockOwned(false);
275             txContexts.remove(roleContext.getTxEntity(), roleContext);
276             processingClosure = Futures.immediateFuture(null);
277         } else {
278             LOG.debug("NOOP state transition for TxEntity {} ", roleContext.getTxEntity());
279             processingClosure = Futures.immediateFuture(null);
280         }
281
282         // handle result of executed steps
283         Futures.addCallback(processingClosure, new FutureCallback<Void>()
284
285                 {
286                     @Override
287                     public void onSuccess(@Nullable final Void aVoid) {
288                         // propagating role must be BECOMEMASTER in order to run this processing
289                         // removing it will disable redundand processing of BECOMEMASTER
290                         roleContext.setPropagatingRole(null);
291
292                         txCandidateGuard.release();
293                         switch (roleContext.getState()) {
294                             case STARTING:
295                                 LOG.debug("init steps protected by tx-lock for node {} are done.", nodeId);
296                                 roleContext.promoteStateToWorking();
297                                 getRoleContextLevelUp(deviceContext);
298                                 break;
299                             case WORKING:
300                                 LOG.debug("normal steps protected by tx-lock for node {} are done.", nodeId);
301                                 break;
302                             case TEARING_DOWN:
303                                 LOG.debug("teardown steps protected by tx-lock for node {} are done.", nodeId);
304                                 break;
305                         }
306                     }
307
308                     @Override
309                     public void onFailure(final Throwable throwable) {
310                         LOG.warn("Unexpected error for Node {}, state={}, txLock={} -> terminating device context",
311                                 nodeId, roleContext.getState(), roleContext.isTxLockOwned(), throwable);
312                         txCandidateGuard.release();
313                         deviceContext.close();
314                     }
315                 }
316
317         );
318     }
319
320     private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
321         return new Function<Void, Void>() {
322             @Override
323             public Void apply(final Void result) {
324                 roleChangeListener.suspendTxCandidate();
325                 return null;
326             }
327         };
328     }
329
330     private Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleContext) {
331         return new Function<Void, Void>() {
332             @Override
333             public Void apply(final Void result) {
334                 final NodeId nodeId = roleContext.getDeviceState().getNodeId();
335                 try {
336                     LOG.debug("Node {} is marked as LEADER", nodeId);
337                     Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
338                             "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
339                     roleContext.setPropagatingRole(OfpRole.BECOMEMASTER);
340
341                     // try to register tx-candidate via ownership service
342                     roleContext.setupTxCandidate();
343                 } catch (final CandidateAlreadyRegisteredException e) {
344                     LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
345                     // --- CLEAN UP ---
346                     // withdraw context from map in order to have it as before
347                     txContexts.remove(roleContext.getTxEntity(), roleContext);
348                     // no more propagating any role - there is no txCandidate lock approaching
349                     roleContext.setPropagatingRole(null);
350                     roleContext.getDeviceContext().close();
351                 }
352                 return null;
353             }
354         };
355     }
356
357     private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) throws InterruptedException {
358         final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
359         LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
360         mainCandidateGuard.acquire();
361         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
362
363         if (roleContext.getDeviceState().isValid()) {
364             LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
365             final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
366             final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
367
368             // propagation start point
369             ListenableFuture<Void> rolePropagationFx = Futures.immediateFuture(null);
370             final Function<Void, Void> txProcessCallback;
371
372             if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
373                 // MASTER -> SLAVE
374                 rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
375                 if (RoleContext.ROLE_CONTEXT_STATE.WORKING.equals(roleContext.getState())) {
376                     txProcessCallback = makeTxEntitySuspendCallback(roleContext);
377                 } else {
378                     txProcessCallback = null;
379                 }
380             } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
381                 // SLAVE -> MASTER
382                 txProcessCallback = makeTxEntitySetupCallback(roleContext);
383             } else if (!ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
384                 if (RoleContext.ROLE_CONTEXT_STATE.STARTING.equals(roleContext.getState())) {
385                     rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
386                 }
387                 txProcessCallback = null;
388             } else {
389                 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
390                 txProcessCallback = null;
391             }
392
393             if (txProcessCallback != null) {
394                 rolePropagationFx = Futures.transform(rolePropagationFx, txProcessCallback);
395             }
396
397             // catching result
398             Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
399                 @Override
400                 public void onSuccess(@Nullable final Void aVoid) {
401                     LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
402                             ownershipChange.getEntity(), oldRole, newRole);
403                     roleContext.setPropagatingRole(newRole);
404                     mainCandidateGuard.release();
405                 }
406
407                 @Override
408                 public void onFailure(final Throwable throwable) {
409                     LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
410                             ownershipChange.getEntity(), oldRole, newRole);
411                     mainCandidateGuard.release();
412                     roleContext.getDeviceContext().close();
413                 }
414             });
415
416         } else {
417             LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
418             mainCandidateGuard.release();
419             // expecting that this roleContext will get closed in a moment
420             // FIXME: reconsider location of following cleanup logic
421             if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
422                 unregistrationHelper(ownershipChange, roleContext);
423             } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
424                 contexts.remove(ownershipChange.getEntity(), roleContext);
425                 roleContext.suspendTxCandidate();
426             } else {
427                 LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
428             }
429         }
430     }
431
432     private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
433             final RoleChangeListener roleChangeListener) {
434         Preconditions.checkArgument(roleChangeListener != null);
435         final DeviceState deviceState = roleChangeListener.getDeviceState();
436         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
437         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
438         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
439         Futures.addCallback(delFuture, new FutureCallback<Void>() {
440
441             @Override
442             public void onSuccess(final Void result) {
443                 LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
444             }
445
446             @Override
447             public void onFailure(final Throwable t) {
448                 LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
449             }
450         });
451         return delFuture;
452     }
453
454
455     private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
456         LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
457         Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
458             @Override
459             public void onSuccess(@Nullable final Void aVoid) {
460                 LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
461                 contexts.remove(ownershipChange.getEntity(), roleContext);
462                 roleContext.suspendTxCandidate();
463             }
464
465             @Override
466             public void onFailure(final Throwable throwable) {
467                 LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
468                         .getNodeId(), throwable.getMessage());
469                 contexts.remove(ownershipChange.getEntity(), roleContext);
470                 roleContext.suspendTxCandidate();
471             }
472         });
473     }
474 }