9e66ef9682e7f35f3646bae2c23c28702dbed061
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / role / OfEntityManager.java
1 /**
2  * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.role;
10
11 import java.math.BigInteger;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
27 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
30 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
33 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushTask;
34 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushException;
35 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
36 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.ConcurrentHashMap;
39 import com.google.common.util.concurrent.ListeningExecutorService;
40 import com.google.common.util.concurrent.MoreExecutors;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.FutureCallback;
43 import java.util.concurrent.ArrayBlockingQueue;
44 import com.google.common.util.concurrent.ListenableFuture;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 public class OfEntityManager implements TransactionChainListener{
50     private static final Logger LOG = LoggerFactory.getLogger(OfEntityManager.class);
51
52     private static final QName ENTITY_QNAME =
53         org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.core.general.entity.rev150820.Entity.QNAME;
54     private static final QName ENTITY_NAME = QName.create(ENTITY_QNAME, "name");
55
56     private DataBroker dataBroker;
57     private EntityOwnershipService entityOwnershipService;
58     private final OpenflowOwnershipListener ownershipListener;
59     private final AtomicBoolean registeredListener = new AtomicBoolean();
60     private ConcurrentHashMap<Entity, MDSwitchMetaData> entsession;
61     private ConcurrentHashMap<Entity, EntityOwnershipCandidateRegistration> entRegistrationMap;
62     private final String DEVICE_TYPE = "openflow";
63
64     private final ListeningExecutorService pool;
65
66     public OfEntityManager( EntityOwnershipService entityOwnershipService ) {
67         this.entityOwnershipService = entityOwnershipService;
68         ownershipListener = new OpenflowOwnershipListener(this);
69         entsession = new ConcurrentHashMap<>();
70         entRegistrationMap = new ConcurrentHashMap<>();
71         ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
72             20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "ofEntity");
73         pool =  MoreExecutors.listeningDecorator(delegate);
74     }
75
76     public void setDataBroker(DataBroker dbBroker) {
77         this.dataBroker = dbBroker;
78     }
79
80     public void requestOpenflowEntityOwnership(final ModelDrivenSwitch ofSwitch,
81                                                final SessionContext context,
82                                                final NotificationQueueWrapper wrappedNotification,
83                                                final RpcProviderRegistry rpcProviderRegistry) {
84         MDSwitchMetaData entityMetaData =
85                 new MDSwitchMetaData(ofSwitch,context,wrappedNotification,rpcProviderRegistry);
86
87         if (registeredListener.compareAndSet(false, true)) {
88             entityOwnershipService.registerListener(DEVICE_TYPE, ownershipListener);
89         }
90         final Entity entity = new Entity(DEVICE_TYPE, ofSwitch.getNodeId().getValue());
91         entsession.put(entity, entityMetaData);
92
93         //Register as soon as possible to avoid missing any entity ownership change event
94         final EntityOwnershipCandidateRegistration entityRegistration;
95         try {
96             entityRegistration = entityOwnershipService.registerCandidate(entity);
97             entRegistrationMap.put(entity, entityRegistration);
98             LOG.info("requestOpenflowEntityOwnership: Registered controller for the ownership of {}", ofSwitch.getNodeId() );
99         } catch (CandidateAlreadyRegisteredException e) {
100             // we can log and move for this error, as listener is present and role changes will be served.
101             LOG.error("requestOpenflowEntityOwnership : Controller registration for ownership of {} failed ", ofSwitch.getNodeId(), e );
102         }
103
104         Optional <EntityOwnershipState> entityOwnershipStateOptional =
105                 entityOwnershipService.getOwnershipState(entity);
106
107         if (entityOwnershipStateOptional.isPresent()) {
108             final EntityOwnershipState entityOwnershipState = entityOwnershipStateOptional.get();
109             if (entityOwnershipState.hasOwner()) {
110                 final OfpRole newRole ;
111                 if (entityOwnershipState.isOwner()) {
112                     LOG.info("requestOpenflowEntityOwnership: Set controller as a MASTER controller " +
113                             "because it's the OWNER of the {}", ofSwitch.getNodeId());
114                     newRole =  OfpRole.BECOMEMASTER;
115                     entsession.get(entity).getOfSwitch().setEntityOwnership(true);
116                     registerRoutedRPCForSwitch(entsession.get(entity));
117                 } else {
118                     LOG.info("requestOpenflowEntityOwnership: Set controller as a SLAVE controller " +
119                             "because it's is not the owner of the {}", ofSwitch.getNodeId());
120                     newRole = OfpRole.BECOMESLAVE;
121                     entsession.get(entity).getOfSwitch().setEntityOwnership(false);
122                 }
123                 RolePushTask task = new RolePushTask(newRole, context);
124                 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
125                 CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
126                     RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
127                 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
128                     @Override
129                     public void onSuccess(Boolean result){
130                         LOG.info("requestOpenflowEntityOwnership: Controller is now {} of the {}",
131                                 newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE",ofSwitch.getNodeId() );
132
133                         sendNodeAddedNotification(entsession.get(entity));
134                     }
135                     @Override
136                     public void onFailure(Throwable t){
137                         LOG.warn("requestOpenflowEntityOwnership: Controller is not able to set " +
138                                 "the role for {}",ofSwitch.getNodeId(), t);
139
140                         if(newRole == OfpRole.BECOMEMASTER) {
141                             LOG.info("requestOpenflowEntityOwnership: ..and controller is the owner of the " +
142                                     "device {}. Closing the registration, so other controllers can try to " +
143                                     "become owner and attempt to be master controller.",ofSwitch.getNodeId());
144
145                             EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
146                             if (ownershipRegistrent != null) {
147                                 ownershipRegistrent.close();
148                                 entRegistrationMap.remove(entity);
149                             }
150
151                             LOG.info("requestOpenflowEntityOwnership: ..and registering it back to participate" +
152                                     " in ownership of the entity.");
153
154                             EntityOwnershipCandidateRegistration entityRegistration;
155                             try {
156                                 entityRegistration = entityOwnershipService.registerCandidate(entity);
157                                 entRegistrationMap.put(entity, entityRegistration);
158                                 LOG.info("requestOpenflowEntityOwnership: re-registered controller for " +
159                                         "ownership of the {}", ofSwitch.getNodeId() );
160                             } catch (CandidateAlreadyRegisteredException e) {
161                                 // we can log and move for this error, as listener is present and role changes will be served.
162                                 LOG.error("requestOpenflowEntityOwnership: *Surprisingly* Entity is already " +
163                                         "registered with EntityOwnershipService : {}", ofSwitch.getNodeId(), e );
164                             }
165
166                         } else {
167                                 LOG.error("requestOpenflowEntityOwnership : Not able to set role {} for {}"
168                                         , newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", ofSwitch.getNodeId());
169                         }
170                     }
171                  });
172              }
173          }
174     }
175
176     public void setSlaveRole(SessionContext sessionContext) {
177         OfpRole newRole ;
178         newRole = OfpRole.BECOMESLAVE;
179         if (sessionContext != null) {
180             final BigInteger targetSwitchDPId = sessionContext.getFeatures().getDatapathId();
181             LOG.debug("setSlaveRole: Set controller as a SLAVE controller for {}", targetSwitchDPId.toString());
182
183             RolePushTask task = new RolePushTask(newRole, sessionContext);
184             ListenableFuture<Boolean> rolePushResult = pool.submit(task);
185             final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
186                 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
187             Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
188                 @Override
189                 public void onSuccess(Boolean result){
190                     LOG.debug("setSlaveRole: Controller is set as a SLAVE for {}", targetSwitchDPId.toString());
191                 }
192                 @Override
193                 public void onFailure(Throwable e){
194                     LOG.error("setSlaveRole: Role request to set controller as a SLAVE failed for {}",
195                             targetSwitchDPId.toString(), e);
196                 }
197             });
198         } else {
199             LOG.warn("setSlaveRole: sessionContext is not set. Device is not connected anymore");
200         }
201     }
202
203     public void onDeviceOwnershipChanged(final EntityOwnershipChange ownershipChange) {
204         final OfpRole newRole;
205         final Entity entity = ownershipChange.getEntity();
206         SessionContext sessionContext = entsession.get(entity)!=null?entsession.get(entity).getContext():null;
207         if (ownershipChange.isOwner()) {
208             LOG.info("onDeviceOwnershipChanged: Set controller as a MASTER controller because " +
209                     "it's the OWNER of the {}", entity);
210             newRole =  OfpRole.BECOMEMASTER;
211         }
212         else {
213
214             newRole =  OfpRole.BECOMESLAVE;
215             if(sessionContext != null && ownershipChange.hasOwner()) {
216                 LOG.info("onDeviceOwnershipChanged: Set controller as a SLAVE controller because " +
217                         "it's not the OWNER of the {}", entity);
218                 entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
219                 setSlaveRole(sessionContext);
220                 sendNodeAddedNotification(entsession.get(entity));
221                 if(ownershipChange.wasOwner()) {
222                     deregisterRoutedRPCForSwitch(entsession.get(entity));
223                     // You don't have to explicitly set role to Slave in this case,
224                     // because other controller will be taking over the master role
225                     // and that will force other controller to become slave.
226                 }
227             }
228             return;
229         }
230         if (sessionContext != null) {
231             //Register the RPC, given *this* controller instance is going to be master owner.
232             //If role registration fails for this node, it will deregister as a candidate for
233             //ownership and that will make this controller non-owner and it will deregister the
234             // router rpc.
235             entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
236             registerRoutedRPCForSwitch(entsession.get(entity));
237
238             final String targetSwitchDPId = sessionContext.getFeatures().getDatapathId().toString();
239             RolePushTask task = new RolePushTask(newRole, sessionContext);
240             ListenableFuture<Boolean> rolePushResult = pool.submit(task);
241             final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
242                 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
243             Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
244                 @Override
245                 public void onSuccess(Boolean result){
246                     LOG.info("onDeviceOwnershipChanged: Controller is successfully set as a " +
247                             "MASTER controller for {}", targetSwitchDPId);
248                     entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
249                     entsession.get(entity).getOfSwitch().sendEmptyTableFeatureRequest();
250                     sendNodeAddedNotification(entsession.get(entity));
251
252                 }
253                 @Override
254                 public void onFailure(Throwable e){
255
256                     LOG.warn("onDeviceOwnershipChanged: Controller is not able to set the " +
257                             "MASTER role for {}.", targetSwitchDPId,e);
258                     if(newRole == OfpRole.BECOMEMASTER) {
259                         LOG.info("onDeviceOwnershipChanged: ..and this *instance* is owner of the device {}. " +
260                                 "Closing the registration, so other entity can become owner " +
261                                 "and attempt to be master controller.",targetSwitchDPId);
262
263                         EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
264                         if (ownershipRegistrent != null) {
265                             ownershipRegistrent.close();
266                             MDSwitchMetaData switchMetadata = entsession.get(entity);
267                             if(switchMetadata != null){
268                                 //We can probably leave deregistration till the node ownerhsip change.
269                                 //But that can probably cause some race condition.
270                                 deregisterRoutedRPCForSwitch(switchMetadata);
271                             }
272                         }
273
274                         LOG.info("onDeviceOwnershipChanged: ..and registering it back to participate in " +
275                                 "ownership and re-try");
276
277                         EntityOwnershipCandidateRegistration entityRegistration;
278                         try {
279                             entityRegistration = entityOwnershipService.registerCandidate(entity);
280                             entRegistrationMap.put(entity, entityRegistration);
281                             LOG.info("onDeviceOwnershipChanged: re-registered candidate for " +
282                                     "ownership of the {}", targetSwitchDPId );
283                         } catch (CandidateAlreadyRegisteredException ex) {
284                             // we can log and move for this error, as listener is present and role changes will be served.
285                             LOG.error("onDeviceOwnershipChanged: *Surprisingly* Entity is already " +
286                                     "registered with EntityOwnershipService : {}", targetSwitchDPId, ex );
287                         }
288
289                     } else {
290                         LOG.error("onDeviceOwnershipChanged : Not able to set role {} for " +
291                                 " {}", newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", targetSwitchDPId);
292                     }
293                 }
294             });
295         } else {
296             LOG.warn("onDeviceOwnershipChanged: sessionContext is not available. Releasing ownership of the device");
297             EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
298             if (ownershipRegistrent != null) {
299                 ownershipRegistrent.close();
300             }
301         }
302     }
303
304     public void unregisterEntityOwnershipRequest(NodeId nodeId) {
305         Entity entity = new Entity(DEVICE_TYPE, nodeId.getValue());
306         entsession.remove(entity);
307         EntityOwnershipCandidateRegistration entRegCandidate = entRegistrationMap.get(entity);
308         if(entRegCandidate != null){
309             LOG.info("unregisterEntityOwnershipRequest: Unregister controller entity ownership " +
310                     "request for {}", nodeId);
311             entRegCandidate.close();
312             entRegistrationMap.remove(entity);
313         }
314     }
315
316     @Override
317     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
318            final Throwable cause) {
319     }
320
321     @Override
322     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
323        // NOOP
324     }
325
326     private void registerRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
327         // Routed RPC registration is only done when *this* instance is owner of
328         // the entity.
329         if(entityMetadata.getOfSwitch().isEntityOwner()) {
330             if (!entityMetadata.isRPCRegistrationDone.get()) {
331                 entityMetadata.setIsRPCRegistrationDone(true);
332                 CompositeObjectRegistration<ModelDrivenSwitch> registration =
333                         entityMetadata.getOfSwitch().register(entityMetadata.getRpcProviderRegistry());
334
335                 entityMetadata.getContext().setProviderRegistration(registration);
336
337                 LOG.info("registerRoutedRPCForSwitch: Registered routed rpc for ModelDrivenSwitch {}",
338                         entityMetadata.getOfSwitch().getNodeId().getValue());
339             }
340         } else {
341             LOG.info("registerRoutedRPCForSwitch: Skipping routed rpc registration for ModelDrivenSwitch {}",
342                     entityMetadata.getOfSwitch().getNodeId().getValue());
343         }
344     }
345
346     private void deregisterRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
347
348         CompositeObjectRegistration<ModelDrivenSwitch> registration = entityMetadata.getContext().getProviderRegistration();
349         if (null != registration) {
350             registration.close();
351             entityMetadata.getContext().setProviderRegistration(null);
352         }
353         LOG.info("deregisterRoutedRPCForSwitch: De-registered routed rpc for ModelDrivenSwitch {}",
354                 entityMetadata.getOfSwitch().getNodeId().getValue());
355     }
356
357     private void sendNodeAddedNotification(MDSwitchMetaData entityMetadata) {
358         //Node added notification need to be sent irrespective of whether
359         // *this* instance is owner of the entity or not. Because yang notifications
360         // are local, and we should maintain the behavior across the application.
361         LOG.info("sendNodeAddedNotification: Node Added notification is sent for ModelDrivenSwitch {}",
362                 entityMetadata.getOfSwitch().getNodeId().getValue());
363
364         entityMetadata.getContext().getNotificationEnqueuer().enqueueNotification(
365                 entityMetadata.getWrappedNotification());
366
367         //Send multipart request to get other details of the switch.
368         entityMetadata.getOfSwitch().requestSwitchDetails();
369     }
370
371     private class MDSwitchMetaData {
372
373         final private ModelDrivenSwitch ofSwitch;
374         final private SessionContext context;
375         final private NotificationQueueWrapper wrappedNotification;
376         final private RpcProviderRegistry rpcProviderRegistry;
377         final private AtomicBoolean isRPCRegistrationDone = new AtomicBoolean(false);
378
379         MDSwitchMetaData(ModelDrivenSwitch ofSwitch,
380                          SessionContext context,
381                          NotificationQueueWrapper wrappedNotification,
382                          RpcProviderRegistry rpcProviderRegistry) {
383             this.ofSwitch = ofSwitch;
384             this.context = context;
385             this.wrappedNotification = wrappedNotification;
386             this.rpcProviderRegistry = rpcProviderRegistry;
387         }
388
389         public ModelDrivenSwitch getOfSwitch() {
390             return ofSwitch;
391         }
392
393         public SessionContext getContext() {
394             return context;
395         }
396
397         public NotificationQueueWrapper getWrappedNotification() {
398             return wrappedNotification;
399         }
400
401         public RpcProviderRegistry getRpcProviderRegistry() {
402             return rpcProviderRegistry;
403         }
404
405         public AtomicBoolean getIsRPCRegistrationDone() {
406             return isRPCRegistrationDone;
407         }
408
409         public void setIsRPCRegistrationDone(boolean isRPCRegistrationDone) {
410             this.isRPCRegistrationDone.set(isRPCRegistrationDone);
411         }
412     }
413 }