b365c797b7eadb1b298bd5e951bd82c02c04e9f9
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / role / RoleManagerImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.role;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Throwables;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import java.util.Iterator;
21 import java.util.Map.Entry;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.Semaphore;
25 import javax.annotation.CheckForNull;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
31 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
34 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
35 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
36 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
42 import org.opendaylight.openflowplugin.api.openflow.role.RoleChangeListener;
43 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
44 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /**
51  * Gets invoked from RpcManagerInitial, registers a candidate with EntityOwnershipService.
52  * On receipt of the ownership notification, makes an rpc call to SalRoleSevice.
53  *
54  * Hands over to StatisticsManager at the end.
55  */
56 public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
57     private static final Logger LOG = LoggerFactory.getLogger(RoleManagerImpl.class);
58
59     private DeviceInitializationPhaseHandler deviceInitializationPhaseHandler;
60     private final DataBroker dataBroker;
61     private final EntityOwnershipService entityOwnershipService;
62     private final ConcurrentMap<Entity, RoleContext> contexts = new ConcurrentHashMap<>();
63     private final ConcurrentMap<Entity, RoleContext> txContexts = new ConcurrentHashMap<>();
64     private final EntityOwnershipListenerRegistration entityOwnershipListenerRegistration;
65     private final EntityOwnershipListenerRegistration txEntityOwnershipListenerRegistration;
66     private final boolean switchFeaturesMandatory;
67
68     public RoleManagerImpl(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker, final boolean switchFeaturesMandatory) {
69         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
70         this.dataBroker = Preconditions.checkNotNull(dataBroker);
71         this.switchFeaturesMandatory = switchFeaturesMandatory;
72         this.entityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(RoleManager.ENTITY_TYPE, this));
73         this.txEntityOwnershipListenerRegistration = Preconditions.checkNotNull(entityOwnershipService.registerListener(TX_ENTITY_TYPE, this));
74         LOG.debug("Registering OpenflowOwnershipListener listening to all entity ownership changes");
75     }
76
77     @Override
78     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
79         deviceInitializationPhaseHandler = handler;
80     }
81
82     @Override
83     public void onDeviceContextLevelUp(@CheckForNull final DeviceContext deviceContext) throws Exception {
84         LOG.debug("RoleManager called for device:{}", deviceContext.getPrimaryConnectionContext().getNodeId());
85         final RoleContext roleContext = new RoleContextImpl(deviceContext, entityOwnershipService,
86                 makeEntity(deviceContext.getDeviceState().getNodeId()),
87                 makeTxEntity(deviceContext.getDeviceState().getNodeId()));
88
89         Verify.verify(contexts.putIfAbsent(roleContext.getEntity(), roleContext) == null, "RoleCtx for master Node {} is still not closed.", deviceContext.getDeviceState().getNodeId());
90         Verify.verify(!txContexts.containsKey(roleContext.getTxEntity()),
91                 "RoleCtx for master Node {} is still not closed. TxEntity was not unregistered yet.", deviceContext.getDeviceState().getNodeId());
92
93         // if the device context gets closed (mostly on connection close), we would need to cleanup
94         deviceContext.addDeviceContextClosedHandler(this);
95         roleContext.initialization();
96         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
97     }
98
99     @Override
100     public void close() {
101         entityOwnershipListenerRegistration.close();
102         txEntityOwnershipListenerRegistration.close();
103         for (final Iterator<Entry<Entity, RoleContext>> iterator = Iterators.consumingIterator(contexts.entrySet()
104                 .iterator()); iterator.hasNext();) {
105             // got here because last known role is LEADER and DS might need clearing up
106             final Entry<Entity, RoleContext> entry = iterator.next();
107             final RoleContext roleCtx = entry.getValue();
108             final NodeId nodeId = roleCtx.getDeviceState().getNodeId();
109             if (OfpRole.BECOMEMASTER.equals(roleCtx.getDeviceState().getRole())) {
110                 LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; "
111                         + "cleaning DS as being probably the last owner", nodeId);
112                 removeDeviceFromOperDS(roleCtx);
113             } else {
114                 // NOOP - there is another owner
115                 LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; "
116                         + "leaving DS untouched", nodeId);
117             }
118             roleCtx.suspendTxCandidate();
119             txContexts.remove(roleCtx.getTxEntity(), roleCtx);
120             roleCtx.close();
121         }
122     }
123
124     @Override
125     public void onDeviceContextClosed(final DeviceContext deviceContext) {
126         final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
127         LOG.debug("onDeviceContextClosed for node {}", nodeId);
128         final Entity entity = makeEntity(nodeId);
129         final RoleContext roleContext = contexts.get(entity);
130         if (roleContext != null) {
131             LOG.debug("Found roleContext associated to deviceContext: {}, now closing the roleContext", nodeId);
132             final Optional<EntityOwnershipState> actState = entityOwnershipService.getOwnershipState(entity);
133             if (actState.isPresent()) {
134                 if (actState.get().isOwner()) {
135                     if (!txContexts.containsKey(roleContext.getTxEntity())) {
136                         try {
137                             txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext);
138                             roleContext.setupTxCandidate();
139                             // we'd like to wait for registration response
140                             return;
141                         } catch (final CandidateAlreadyRegisteredException e) {
142                             // NOOP
143                         }
144                     }
145                 } else {
146                     LOG.debug("No DS commitment for device {} - LEADER is somewhere else", nodeId);
147                     contexts.remove(entity, roleContext);
148                     // TODO : is there a chance to have TxEntity ?
149                 }
150             } else {
151                 LOG.warn("EntityOwnershipService doesn't return state for entity: {} in close proces", entity);
152             }
153             roleContext.close();
154         }
155     }
156
157     private static Entity makeEntity(final NodeId nodeId) {
158         return new Entity(RoleManager.ENTITY_TYPE, nodeId.getValue());
159     }
160
161     private static Entity makeTxEntity(final NodeId nodeId) {
162         return new Entity(RoleManager.TX_ENTITY_TYPE, nodeId.getValue());
163     }
164
165     @Override
166     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
167         Preconditions.checkArgument(ownershipChange != null);
168         RoleContext roleContext = null;
169         try {
170             roleContext = contexts.get(ownershipChange.getEntity());
171             if (roleContext != null) {
172                 changeForEntity(ownershipChange, roleContext);
173                 return;
174             }
175
176             roleContext = txContexts.get(ownershipChange.getEntity());
177             if (roleContext != null) {
178                 changeForTxEntity(ownershipChange, roleContext);
179                 return;
180             }
181         } catch (final Exception e) {
182             LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity(), e);
183             if (roleContext != null) {
184                 roleContext.getDeviceContext().close();
185             }
186         }
187
188         LOG.debug("We are not able to find Entity {} ownershipChange {} - disregarding ownership notification",
189                 ownershipChange.getEntity(), ownershipChange);
190     }
191
192     private void changeForTxEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext)
193             throws InterruptedException {
194         LOG.info("Received TX-EntityOwnershipChange:{}", ownershipChange);
195         final Semaphore txCandidateGuard = roleContext.getTxCandidateGuard();
196         LOG.trace("txCandidate lock queue: " + txCandidateGuard.getQueueLength());
197         txCandidateGuard.acquire();
198
199         ListenableFuture<Void> processingClosure;
200         final DeviceContext deviceContext = roleContext.getDeviceContext();
201         final NodeId nodeId = roleContext.getDeviceState().getNodeId();
202
203         if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
204             // SLAVE -> MASTER - acquired transition lock
205             LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
206
207             // activate txChainManager, activate rpcs
208             if (roleContext.getDeviceState().isValid()) {
209                 processingClosure = roleContext.onRoleChanged(OfpRole.BECOMESLAVE, OfpRole.BECOMEMASTER);
210             } else {
211                 // We are not able to send anything to device, but we need to handle closing state clearly
212                 roleContext.close();
213                 processingClosure = Futures.immediateFuture(null);
214             }
215             // activate stats - accomplished automatically by changing role in deviceState
216             processingClosure = Futures.transform(processingClosure, new Function<Void, Void>() {
217                 @Nullable
218                 @Override
219                 public Void apply(@Nullable final Void aVoid) {
220                     deviceContext.getDeviceState().setRole(OfpRole.BECOMEMASTER);
221                     return null;
222                 }
223             });
224         } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
225             // MASTER -> SLAVE - released tx-lock
226             LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
227             txContexts.remove(roleContext.getTxEntity(), roleContext);
228             processingClosure = Futures.immediateFuture(null);
229         } else {
230             LOG.debug("NOOP state transition for TxEntity {} ", roleContext.getTxEntity());
231             processingClosure = Futures.immediateFuture(null);
232         }
233
234         // handle result of executed steps
235         Futures.addCallback(processingClosure, new FutureCallback<Void>()
236
237                 {
238                     @Override
239                     public void onSuccess(@Nullable final Void aVoid) {
240                         // propagating role must be BECOMEMASTER in order to run this processing
241                         // removing it will disable redundant processing of BECOMEMASTER
242                         txCandidateGuard.release();
243                     }
244
245                     @Override
246                     public void onFailure(final Throwable throwable) {
247                         LOG.warn("Unexpected error for Node {} -> terminating device context", nodeId, throwable);
248                         txCandidateGuard.release();
249                         deviceContext.close();
250                     }
251                 }
252
253         );
254     }
255
256     private static Function<Void, Void> makeTxEntitySuspendCallback(final RoleContext roleChangeListener) {
257         return new Function<Void, Void>() {
258             @Override
259             public Void apply(final Void result) {
260                 roleChangeListener.suspendTxCandidate();
261                 return null;
262             }
263         };
264     }
265
266     private Function<Void, Void> makeTxEntitySetupCallback(final RoleContext roleContext) {
267         return new Function<Void, Void>() {
268             @Override
269             public Void apply(final Void result) {
270                 final NodeId nodeId = roleContext.getDeviceState().getNodeId();
271                 try {
272                     LOG.debug("Node {} is marked as LEADER", nodeId);
273                     Verify.verify(txContexts.putIfAbsent(roleContext.getTxEntity(), roleContext) == null,
274                             "RoleCtx for TxEntity {} master Node {} is still not closed.", roleContext.getTxEntity(), nodeId);
275                     // try to register tx-candidate via ownership service
276                     roleContext.setupTxCandidate();
277                 } catch (final CandidateAlreadyRegisteredException e) {
278                     LOG.warn("txCandidate registration failed {}", roleContext.getDeviceState().getNodeId(), e);
279                     // --- CLEAN UP ---
280                     // withdraw context from map in order to have it as before
281                     txContexts.remove(roleContext.getTxEntity(), roleContext);
282                     // no more propagating any role - there is no txCandidate lock approaching
283                     Throwables.propagate(e);
284                 }
285                 return null;
286             }
287         };
288     }
289
290     private void changeForEntity(final EntityOwnershipChange ownershipChange, @Nonnull final RoleContext roleContext) throws InterruptedException {
291         final Semaphore mainCandidateGuard = roleContext.getMainCandidateGuard();
292         LOG.trace("mainCandidate lock queue: " + mainCandidateGuard.getQueueLength());
293         mainCandidateGuard.acquire();
294         LOG.info("Received EntityOwnershipChange:{}", ownershipChange);
295
296         if (roleContext.getDeviceState().isValid()) {
297             LOG.debug("RoleChange for entity {}", ownershipChange.getEntity());
298             final OfpRole newRole = ownershipChange.isOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
299             final OfpRole oldRole = ownershipChange.wasOwner() ? OfpRole.BECOMEMASTER : OfpRole.BECOMESLAVE;
300
301             // propagation start point
302             ListenableFuture<Void> rolePropagationFx = Futures.immediateFuture(null);
303             final Function<Void, Void> txProcessCallback;
304
305             if (ownershipChange.wasOwner() && !ownershipChange.isOwner() && ownershipChange.hasOwner()) {
306                 // MASTER -> SLAVE
307                 rolePropagationFx = roleContext.onRoleChanged(oldRole, newRole);
308                 txProcessCallback = makeTxEntitySuspendCallback(roleContext);
309             } else if (!ownershipChange.wasOwner() && ownershipChange.isOwner() && ownershipChange.hasOwner()) {
310                 // SLAVE -> MASTER
311                 txProcessCallback = makeTxEntitySetupCallback(roleContext);
312             } else {
313                 LOG.debug("Main candidate role change case not covered: {} -> {} .. NOOP", oldRole, newRole);
314                 txProcessCallback = null;
315             }
316
317             if (txProcessCallback != null) {
318                 rolePropagationFx = Futures.transform(rolePropagationFx, txProcessCallback);
319             }
320
321             // catching result
322             Futures.addCallback(rolePropagationFx, new FutureCallback<Void>() {
323                 @Override
324                 public void onSuccess(@Nullable final Void aVoid) {
325                     LOG.debug("Role of main candidate successfully propagated: {}, {} -> {}",
326                             ownershipChange.getEntity(), oldRole, newRole);
327                     mainCandidateGuard.release();
328                 }
329
330                 @Override
331                 public void onFailure(final Throwable throwable) {
332                     LOG.warn("Main candidate role propagation FAILED for entity: {}, {} -> {}",
333                             ownershipChange.getEntity(), oldRole, newRole, throwable);
334                     mainCandidateGuard.release();
335                     roleContext.getDeviceContext().close();
336                 }
337             });
338
339         } else {
340             LOG.debug("We are closing connection for entity {}", ownershipChange.getEntity());
341             mainCandidateGuard.release();
342             // expecting that this roleContext will get closed in a moment
343             // FIXME: reconsider location of following cleanup logic
344             if (!ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
345                 unregistrationHelper(ownershipChange, roleContext);
346             } else if (ownershipChange.hasOwner() && !ownershipChange.isOwner() && ownershipChange.wasOwner()) {
347                 contexts.remove(ownershipChange.getEntity(), roleContext);
348                 roleContext.suspendTxCandidate();
349             } else {
350                 LOG.info("Unexpected role change msg {} for entity {}", ownershipChange, ownershipChange.getEntity());
351             }
352         }
353     }
354
355     private CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperDS(
356             final RoleChangeListener roleChangeListener) {
357         Preconditions.checkArgument(roleChangeListener != null);
358         final DeviceState deviceState = roleChangeListener.getDeviceState();
359         final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
360         delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
361         final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
362         Futures.addCallback(delFuture, new FutureCallback<Void>() {
363
364             @Override
365             public void onSuccess(final Void result) {
366                 LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
367             }
368
369             @Override
370             public void onFailure(final Throwable t) {
371                 LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
372             }
373         });
374         return delFuture;
375     }
376
377
378     private void unregistrationHelper(final EntityOwnershipChange ownershipChange, final RoleContext roleContext) {
379         LOG.info("Initiate removal from operational. Possibly the last node to be disconnected for :{}. ", ownershipChange);
380         Futures.addCallback(removeDeviceFromOperDS(roleContext), new FutureCallback<Void>() {
381             @Override
382             public void onSuccess(@Nullable final Void aVoid) {
383                 LOG.debug("Freeing roleContext slot for device: {}", roleContext.getDeviceState().getNodeId());
384                 contexts.remove(ownershipChange.getEntity(), roleContext);
385                 roleContext.suspendTxCandidate();
386             }
387
388             @Override
389             public void onFailure(final Throwable throwable) {
390                 LOG.warn("NOT freeing roleContext slot for device: {}, {}", roleContext.getDeviceState()
391                         .getNodeId(), throwable.getMessage());
392                 contexts.remove(ownershipChange.getEntity(), roleContext);
393                 roleContext.suspendTxCandidate();
394             }
395         });
396     }
397 }