Merge "Bug-4957: Double candidate, clean up"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Throwables;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.Semaphore;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
31 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
43 import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
44 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
45 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
46 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
54  * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
55  *
56  * Hands over to StatisticsManager at the end.
57  */
58 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
59     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
60
61     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
62     private final DataBroker dataBroker;
63     private final EntityOwnershipService entityOwnershipService;
64     private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
65     private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
66     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
67     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
68     private final boolean switchFeaturesMandatory;
69
70     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
71         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
72         this.dataBroker = Preconditions.checkNotNull(dataBroker);
73         this.switchFeaturesMandatory = switchFeaturesMandatory;
74         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
75         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
76         LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
77     }
78
79     @Override
80     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
81         deviceInitializationPhaseHandler = handler;
82     }
83
84     @Override
85     public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
86         LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
87         if (deviceContext.getDeviceState().getFeatures().getVersion() < OFConstants.OFP_VERSION_1_3) {
88             // Roles are not supported before OF1.3, so move forward.
89             deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
90             return;
91         }
92
93         final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
94                 makeEntity(deviceContext.getDeviceState().getNodeId()),
95                 makeTxEntity(deviceContext.getDeviceState().getNodeId()));
96         // if the device context gets closed (mostly on connection close), we would need to cleanup
97         deviceContext.addDeviceContextClosedHandler(this);
98         final RoleContext previousContext = contexts.putIfAbsent(roleContext.getEntity(), roleContext);
99         Verify.verify(previousContext == null,
100                 "RoleCtx for master Node {} is still not close.", deviceContext.getDeviceState().getNodeId());
101
102         roleContext.initialization();
103         @Deprecated
104         final ListenableFuture<OfpRole> roleChangeFuture = SettableFuture.<OfpRole> create();
105
106         final ListenableFuture<Void> txFreeFuture = Futures.transform(roleChangeFuture, new AsyncFunction<OfpRole, Void>() {
107             @Override
108             public ListenableFuture<Void> apply(final OfpRole input) throws Exception {
109                 final ListenableFuture<Void> nextFuture;
110                 if (OfpRole.BECOMEMASTER.equals(input)) {
111                     LOG.debug("Node {} has marked as LEADER", deviceContext.getDeviceState().getNodeId());
112                     Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
113                             "RoleCtx for TxEntity {} master Node {} is still not close.",
114                             roleContext.getTxEntity(), deviceContext.getDeviceState().getNodeId());
115 //                    nextFuture = roleContext.setupTxCandidate();
116                             nextFuture = Futures.immediateFuture(null);
117                 } else {
118                     LOG.debug("Node {} was marked as FOLLOWER", deviceContext.getDeviceState().getNodeId());
119                     nextFuture = Futures.immediateFuture(null);
120                 }
121                 return nextFuture;
122             }
123         });
124
125         final ListenableFuture<Void> initDeviceFuture = Futures.transform(txFreeFuture, new AsyncFunction<Void, Void>() {
126             @Override
127             public ListenableFuture<Void> apply(final Void input) throws Exception {
128                 LOG.debug("Node {} will be initialized", deviceContext.getDeviceState().getNodeId());
129                 return DeviceInitializationUtils.initializeNodeInformation(deviceContext, switchFeaturesMandatory);
130             }
131         });
132
133         Futures.addCallback(initDeviceFuture, new FutureCallback<Void>() {
134             @Override
135             public void onSuccess(final Void result) {
136                 LOG.debug("Initialization Node {} is done.", deviceContext.getDeviceState().getNodeId());
137                 try {
138                     getRoleContextLevelUp(deviceContext);
139                 } catch (final Exception e) {
140                     deviceContext.close();
141                 }
142             }
143
144             @Override
145             public void onFailure(final Throwable t) {
146                 LOG.warn("Unexpected error for Node {} initialization", deviceContext.getDeviceState().getNodeId(), t);
147                 deviceContext.close();
148             }
149         });
150     }
151
152     void getRoleContextLevelUp(final DeviceContext deviceContext) throws Exception {
153         LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
154         LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
155         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
156     }
157
158     @Override
159     public void close() throws Exception {
160         entityOwnershipListenerRegistration.close();
161         txEntityOwnershipListenerRegistration.close();
162         for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
163             // got here because last known role is LEADER and DS might need clearing up
164             final Entity entity = roleContextEntry.getKey();
165             final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
166             final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
167             if (ownershipState.isPresent()) {
168                 if ((!ownershipState.get().hasOwner())) {
169                     LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
170                             "cleaning DS as being probably the last owner", nodeId);
171                     removeDeviceFromOperDS(roleContextEntry.getValue());
172                 } else {
173                     // NOOP - there is another owner
174                     LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
175                             "leaving DS untouched", nodeId);
176                 }
177             } else {
178                 // TODO: is this safe? When could this happen?
179                 LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
180                         "cleaning DS ANYWAY!", nodeId);
181                 removeDeviceFromOperDS(roleContextEntry.getValue());
182             }
183         }
184         contexts.clear();
185     }
186
187     @Override
188     public void onDeviceContextClosed(final DeviceContext deviceContext) {
189         final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
190         LOG.debug("onDeviceContextClosed for node {}", nodeId);
191         final Entity entity = makeEntity(nodeId);
192         final RoleContext roleContext = contexts.get(entity);
193         if (roleContext != null) {
194             LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
195             final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
196             if (actState.isPresent()) {
197                 if (!actState.get().isOwner()) {
198                     LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
199                     contexts.remove(entity, roleContext);
200                 }
201             } else {
202                 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
203             }
204             roleContext.close();
205         }
206     }
207
208     private static Entity makeEntity(final NodeId nodeId) {
209         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
210     }
211
212     private static Entity makeTxEntity(final NodeId nodeId) {
213         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
214     }
215
216     @Override
217     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
218         Preconditions.checkArgument(ownershipChange != null);
219         try {
220             final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
221             if (roleContext != null) {
222                 changeForEntity(ownershipChange, roleContext);
223                 return;
224             }
225
226             final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
227             if (txRoleContext != null) {
228                 changeForTxEntity(ownershipChange, txRoleContext);
229                 return;
230             }
231         } catch (final InterruptedException e) {
232             LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
233             // FIXME: consider forcibly closing this connection
234         }
235
236         LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
237                 ownershipChange.getEntity(), ownershipChange);
238     }
239
240     private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleChangeListener roleTxChangeListener) throws InterruptedException {
241         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
242         final Semaphore txCandidateGuard = roleTxChangeListener.getTxCandidateGuard();
243         LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
244         txCandidateGuard.acquire();
245
246         if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
247             // MASTER -> SLAVE - left transition lock
248             txContexts.remove(roleTxChangeListener.getTxEntity(), roleTxChangeListener);
249             txCandidateGuard.release();
250         } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
251             // SLAVE -> MASTER - acquired transition lock
252             LOG.debug("TxRoleChange for entity {}", ownershipChange.getEntity());
253             final OfpRole role = roleTxChangeListener.getDeviceState().getRole();
254             Verify.verify(OfpRole.BECOMEMASTER.equals(role),
255                     "Acquired txCandidate lock but current role = {}", role);
256
257         } else {
258             LOG.debug("NOOP state transition for TxEntity {} ", roleTxChangeListener.getTxEntity());
259             txCandidateGuard.release();
260         }
261     }
262
263     private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
264         return new Function<Void, Void>() {
265             @Override
266             public Void apply(final Void result) {
267                 roleChangeListener.suspendTxCandidate();
268                 return null;
269             }
270         };
271     }
272
273     private static Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleChangeListener) {
274         return new Function<Void, Void>() {
275             @Override
276             public Void apply(final Void result) {
277                 try {
278                     roleChangeListener.setupTxCandidate();
279                 } catch (final CandidateAlreadyRegisteredException e) {
280                     LOG.debug("txCandidate registration failed");
281                     Throwables.propagate(e);
282                 }
283                 return null;
284             }
285         };
286     }
287
288     private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleChangeListener) throws InterruptedException {
289         final Semaphore mainCandidateGuard = roleChangeListener.getMainCandidateGuard();
290         LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
291         mainCandidateGuard.acquire();
292         //FIXME : check again implementation for double candidate scenario
293         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
294
295         if (roleChangeListener.getDeviceState().isValid()) {
296             LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
297             final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
298             final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
299             // send even if they are same. we do the check for duplicates in SalRoleService and maintain a lastKnownRole
300             ListenableFuture<Void> rolePropagatedFx = roleChangeListener.onRoleChanged(oldRole, newRole);
301             final Function<Void, Void> txProcessCallback;
302             if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
303                 // MASTER -> SLAVE
304                 txProcessCallback = makeTxEntitySuspendCallback(roleChangeListener);
305             } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.isOwner()) {
306                 // FIXME : make different code path deviceContext.onClusterRoleChange(newRole); has to call from onTxRoleChange (for master)
307                 // SLAVE -> MASTER
308                 txProcessCallback = makeTxEntitySetupCallback(roleChangeListener);
309             } else {
310                 LOG.trace("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
311                 txProcessCallback = null;
312             }
313
314             if (txProcessCallback != null) {
315                 rolePropagatedFx = Futures.transform(rolePropagatedFx, txProcessCallback);
316             }
317
318             Futures.addCallback(rolePropagatedFx, new FutureCallback<Void>() {
319                         @Override
320                         public void onSuccess(@Nullable final Void aVoid) {
321                             LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
322                                     ownershipChange.getEntity(), oldRole, newRole);
323                             mainCandidateGuard.release();
324                         }
325
326                         @Override
327                         public void onFailure(final Throwable throwable) {
328                             LOG.warn("Main candidate role propagation failed for entity: {}, {} -> {}",
329                                     ownershipChange.getEntity(), oldRole, newRole);
330                             mainCandidateGuard.release();
331                             // FIXME: here we shall disconnect probably - in order to avoid inconsistent state
332                         }
333                     }
334             );
335
336         } else {
337             LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
338             mainCandidateGuard.release();
339             // expecting that this roleContext will get closed in a moment
340             // FIXME: reconsider location of following cleanup logic
341             if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
342                 unregistrationHelper(ownershipChange, roleChangeListener);
343             } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
344                 contexts.remove(ownershipChange.getEntity(), roleChangeListener);
345                 roleChangeListener.suspendTxCandidate();
346             } else {
347                 LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
348             }
349         }
350     }
351
352     private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
353             final RoleChangeListener roleChangeListener) {
354         Preconditions.checkArgument(roleChangeListener != null);
355         final DeviceState deviceState = roleChangeListener.getDeviceState();
356         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
357         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
358         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
359         Futures.addCallback(delFuture, new FutureCallback<Void>() {
360
361             @Override
362             public void onSuccess(final Void result) {
363                 LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
364             }
365
366             @Override
367             public void onFailure(final Throwable t) {
368                 LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
369             }
370         });
371         return delFuture;
372     }
373
374     private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleChangeListener roleChangeListener) {
375         LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
376         Futures.addCallback(removeDeviceFromOperDS(roleChangeListener), new FutureCallback<Void>() {
377             @Override
378             public void onSuccess(@Nullable final Void aVoid) {
379                 LOG.debug("Freeing roleContext slot for device: {}", roleChangeListener.getDeviceState().getNodeId());
380                 contexts.remove(ownershipChange.getEntity(), roleChangeListener);
381                 ((RoleContext) roleChangeListener).suspendTxCandidate();
382             }
383
384             @Override
385             public void onFailure(final Throwable throwable) {
386                 LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleChangeListener.getDeviceState()
387                         .getNodeId(), throwable.getMessage());
388                 contexts.remove(ownershipChange.getEntity(), roleChangeListener);
389                 ((RoleContext) roleChangeListener).suspendTxCandidate();
390             }
391         });
392     }
393 }