Eliminate the use of CompositeObjectRegistration
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / role / OfEntityManager.java
1 /**
2  * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.role;
10
11 import java.math.BigInteger;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
26 import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitchRegistration;
27 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
28 import org.opendaylight.openflowplugin.openflow.md.core.sal.OpenflowPluginConfig;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
31 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
34 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushTask;
35 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushException;
36 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
37 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.ConcurrentHashMap;
40 import com.google.common.util.concurrent.ListeningExecutorService;
41 import com.google.common.util.concurrent.MoreExecutors;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.FutureCallback;
44 import com.google.common.util.concurrent.ListenableFuture;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 public class OfEntityManager implements TransactionChainListener{
50     private static final Logger LOG = LoggerFactory.getLogger(OfEntityManager.class);
51
52     private static final QName ENTITY_QNAME =
53         org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.core.general.entity.rev150820.Entity.QNAME;
54     private static final QName ENTITY_NAME = QName.create(ENTITY_QNAME, "name");
55
56     private DataBroker dataBroker;
57     private EntityOwnershipService entityOwnershipService;
58     private final OpenflowOwnershipListener ownershipListener;
59     private final AtomicBoolean registeredListener = new AtomicBoolean();
60     private ConcurrentHashMap<Entity, MDSwitchMetaData> entsession;
61     private ConcurrentHashMap<Entity, EntityOwnershipCandidateRegistration> entRegistrationMap;
62     private final String DEVICE_TYPE = "openflow";
63
64     private final ListeningExecutorService pool;
65
66     private final OpenflowPluginConfig openflowPluginConfig;
67
68     public OfEntityManager(EntityOwnershipService entityOwnershipService, OpenflowPluginConfig ofPluginConfig) {
69         this.entityOwnershipService = entityOwnershipService;
70         openflowPluginConfig = ofPluginConfig;
71         ownershipListener = new OpenflowOwnershipListener(this);
72         entsession = new ConcurrentHashMap<>();
73         entRegistrationMap = new ConcurrentHashMap<>();
74         ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
75             20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "ofEntity");
76         pool =  MoreExecutors.listeningDecorator(delegate);
77     }
78
79     public void setDataBroker(DataBroker dbBroker) {
80         this.dataBroker = dbBroker;
81     }
82
83     public void init(){
84         registerEntityOwnershipChangeListener();
85     }
86
87     public void registerEntityOwnershipChangeListener() {
88         if(entityOwnershipService!=null) {
89             if(LOG.isDebugEnabled()) {
90                 LOG.debug("registerEntityOwnershipChangeListener: Registering entity ownership change listener for entitier of type {}", DEVICE_TYPE);
91             }
92         entityOwnershipService.registerListener(DEVICE_TYPE, ownershipListener);
93         }
94     }
95
96     public void requestOpenflowEntityOwnership(final ModelDrivenSwitch ofSwitch,
97                                                final SessionContext context,
98                                                final NotificationQueueWrapper wrappedNotification,
99                                                final RpcProviderRegistry rpcProviderRegistry) {
100         MDSwitchMetaData entityMetaData =
101                 new MDSwitchMetaData(ofSwitch,context,wrappedNotification,rpcProviderRegistry);
102
103         final Entity entity = new Entity(DEVICE_TYPE, ofSwitch.getNodeId().getValue());
104         entsession.put(entity, entityMetaData);
105
106         //Register as soon as possible to avoid missing any entity ownership change event
107         final EntityOwnershipCandidateRegistration entityRegistration;
108         try {
109             entityRegistration = entityOwnershipService.registerCandidate(entity);
110             entRegistrationMap.put(entity, entityRegistration);
111             LOG.info("requestOpenflowEntityOwnership: Registered controller for the ownership of {}", ofSwitch.getNodeId() );
112         } catch (CandidateAlreadyRegisteredException e) {
113             // we can log and move for this error, as listener is present and role changes will be served.
114             LOG.error("requestOpenflowEntityOwnership : Controller registration for ownership of {} failed ", ofSwitch.getNodeId(), e );
115         }
116
117         Optional <EntityOwnershipState> entityOwnershipStateOptional =
118                 entityOwnershipService.getOwnershipState(entity);
119
120         if (entityOwnershipStateOptional.isPresent()) {
121             final EntityOwnershipState entityOwnershipState = entityOwnershipStateOptional.get();
122             if (entityOwnershipState.hasOwner()) {
123                 final OfpRole newRole ;
124                 if (entityOwnershipState.isOwner()) {
125                     LOG.info("requestOpenflowEntityOwnership: Set controller as a MASTER controller " +
126                             "because it's the OWNER of the {}", ofSwitch.getNodeId());
127                     newRole =  OfpRole.BECOMEMASTER;
128                     setDeviceOwnershipState(entity,true);
129                     registerRoutedRPCForSwitch(entsession.get(entity));
130                 } else {
131                     LOG.info("requestOpenflowEntityOwnership: Set controller as a SLAVE controller " +
132                             "because it's is not the owner of the {}", ofSwitch.getNodeId());
133                     newRole = OfpRole.BECOMESLAVE;
134                     setDeviceOwnershipState(entity,false);
135                 }
136                 RolePushTask task = new RolePushTask(newRole, context);
137                 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
138                 CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
139                     RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
140                 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
141                     @Override
142                     public void onSuccess(Boolean result){
143                         LOG.info("requestOpenflowEntityOwnership: Controller is now {} of the {}",
144                                 newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE",ofSwitch.getNodeId() );
145
146                         sendNodeAddedNotification(entsession.get(entity));
147                     }
148                     @Override
149                     public void onFailure(Throwable t){
150                         LOG.warn("requestOpenflowEntityOwnership: Controller is not able to set " +
151                                 "the role for {}",ofSwitch.getNodeId(), t);
152
153                         if(newRole == OfpRole.BECOMEMASTER) {
154                             LOG.info("requestOpenflowEntityOwnership: ..and controller is the owner of the " +
155                                     "device {}. Closing the registration, so other controllers can try to " +
156                                     "become owner and attempt to be master controller.",ofSwitch.getNodeId());
157
158                             EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
159                             if (ownershipRegistrent != null) {
160                                 ownershipRegistrent.close();
161                                 entRegistrationMap.remove(entity);
162                             }
163
164                             LOG.info("requestOpenflowEntityOwnership: ..and registering it back to participate" +
165                                     " in ownership of the entity.");
166
167                             EntityOwnershipCandidateRegistration entityRegistration;
168                             try {
169                                 entityRegistration = entityOwnershipService.registerCandidate(entity);
170                                 entRegistrationMap.put(entity, entityRegistration);
171                                 LOG.info("requestOpenflowEntityOwnership: re-registered controller for " +
172                                         "ownership of the {}", ofSwitch.getNodeId() );
173                             } catch (CandidateAlreadyRegisteredException e) {
174                                 // we can log and move for this error, as listener is present and role changes will be served.
175                                 LOG.error("requestOpenflowEntityOwnership: *Surprisingly* Entity is already " +
176                                         "registered with EntityOwnershipService : {}", ofSwitch.getNodeId(), e );
177                             }
178
179                         } else {
180                                 LOG.error("requestOpenflowEntityOwnership : Not able to set role {} for {}"
181                                         , newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", ofSwitch.getNodeId());
182                         }
183                     }
184                  });
185              }
186          }
187     }
188
189     public void setSlaveRole(SessionContext sessionContext) {
190         OfpRole newRole = OfpRole.BECOMESLAVE;
191         if (sessionContext != null) {
192             final BigInteger targetSwitchDPId = sessionContext.getFeatures().getDatapathId();
193             LOG.debug("setSlaveRole: Set controller as a SLAVE controller for {}", targetSwitchDPId.toString());
194
195             RolePushTask task = new RolePushTask(newRole, sessionContext);
196             ListenableFuture<Boolean> rolePushResult = pool.submit(task);
197             final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
198                 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
199             Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
200                 @Override
201                 public void onSuccess(Boolean result){
202                     LOG.debug("setSlaveRole: Controller is set as a SLAVE for {}", targetSwitchDPId.toString());
203                 }
204                 @Override
205                 public void onFailure(Throwable e){
206                     LOG.error("setSlaveRole: Role request to set controller as a SLAVE failed for {}",
207                             targetSwitchDPId.toString(), e);
208                 }
209             });
210         } else {
211             LOG.warn("setSlaveRole: sessionContext is not set. Device is not connected anymore");
212         }
213     }
214
215     public void onDeviceOwnershipChanged(final EntityOwnershipChange ownershipChange) {
216         final OfpRole newRole;
217         final Entity entity = ownershipChange.getEntity();
218         SessionContext sessionContext = entsession.get(entity)!=null?entsession.get(entity).getContext():null;
219         if (ownershipChange.isOwner()) {
220             LOG.info("onDeviceOwnershipChanged: Set controller as a MASTER controller because " +
221                     "it's the OWNER of the {}", entity);
222             newRole =  OfpRole.BECOMEMASTER;
223         }
224         else {
225
226             newRole =  OfpRole.BECOMESLAVE;
227             if(sessionContext != null && ownershipChange.hasOwner()) {
228                 LOG.info("onDeviceOwnershipChanged: Set controller as a SLAVE controller because " +
229                         "it's not the OWNER of the {}", entity);
230
231                 if(ownershipChange.wasOwner()) {
232                     setDeviceOwnershipState(entity,false);
233                     deregisterRoutedRPCForSwitch(entsession.get(entity));
234                     // You don't have to explicitly set role to Slave in this case,
235                     // because other controller will be taking over the master role
236                     // and that will force other controller to become slave.
237                 } else {
238                     boolean isOwnershipInitialized = entsession.get(entity).getIsOwnershipInitialized();
239                     setDeviceOwnershipState(entity,false);
240                     if (!isOwnershipInitialized) {
241                         setSlaveRole(sessionContext);
242                         sendNodeAddedNotification(entsession.get(entity));
243                     }
244                 }
245             }
246             return;
247         }
248         if (sessionContext != null) {
249             //Register the RPC, given *this* controller instance is going to be master owner.
250             //If role registration fails for this node, it will deregister as a candidate for
251             //ownership and that will make this controller non-owner and it will deregister the
252             // router rpc.
253             setDeviceOwnershipState(entity,newRole==OfpRole.BECOMEMASTER);
254             registerRoutedRPCForSwitch(entsession.get(entity));
255
256             final String targetSwitchDPId = sessionContext.getFeatures().getDatapathId().toString();
257             RolePushTask task = new RolePushTask(newRole, sessionContext);
258             ListenableFuture<Boolean> rolePushResult = pool.submit(task);
259
260             final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
261                 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
262             Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
263                 @Override
264                 public void onSuccess(Boolean result){
265                     LOG.info("onDeviceOwnershipChanged: Controller is successfully set as a " +
266                             "MASTER controller for {}", targetSwitchDPId);
267                     if(!openflowPluginConfig.skipTableFeatures()) {
268                         if(LOG.isDebugEnabled()){
269                             LOG.debug("Send table feature request for entity {}",entity.getId());
270                         }
271                         entsession.get(entity).getOfSwitch().sendEmptyTableFeatureRequest();
272                     }
273                     sendNodeAddedNotification(entsession.get(entity));
274
275                 }
276                 @Override
277                 public void onFailure(Throwable e){
278
279                     LOG.warn("onDeviceOwnershipChanged: Controller is not able to set the " +
280                             "MASTER role for {}.", targetSwitchDPId,e);
281                     if(newRole == OfpRole.BECOMEMASTER) {
282                         LOG.info("onDeviceOwnershipChanged: ..and this *instance* is owner of the device {}. " +
283                                 "Closing the registration, so other entity can become owner " +
284                                 "and attempt to be master controller.",targetSwitchDPId);
285
286                         EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
287                         if (ownershipRegistrent != null) {
288                             setDeviceOwnershipState(entity,false);
289                             ownershipRegistrent.close();
290                             MDSwitchMetaData switchMetadata = entsession.get(entity);
291                             if(switchMetadata != null){
292                                 switchMetadata.setIsOwnershipInitialized(false);
293                                 //We can probably leave deregistration till the node ownerhsip change.
294                                 //But that can probably cause some race condition.
295                                 deregisterRoutedRPCForSwitch(switchMetadata);
296                             }
297                         }
298
299                         LOG.info("onDeviceOwnershipChanged: ..and registering it back to participate in " +
300                                 "ownership and re-try");
301
302                         EntityOwnershipCandidateRegistration entityRegistration;
303                         try {
304                             entityRegistration = entityOwnershipService.registerCandidate(entity);
305                             entRegistrationMap.put(entity, entityRegistration);
306                             LOG.info("onDeviceOwnershipChanged: re-registered candidate for " +
307                                     "ownership of the {}", targetSwitchDPId );
308                         } catch (CandidateAlreadyRegisteredException ex) {
309                             // we can log and move for this error, as listener is present and role changes will be served.
310                             LOG.error("onDeviceOwnershipChanged: *Surprisingly* Entity is already " +
311                                     "registered with EntityOwnershipService : {}", targetSwitchDPId, ex );
312                         }
313
314                     } else {
315                         LOG.error("onDeviceOwnershipChanged : Not able to set role {} for " +
316                                 " {}", newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", targetSwitchDPId);
317                     }
318                 }
319             });
320         } else {
321             LOG.warn("onDeviceOwnershipChanged: sessionContext is not available. Releasing ownership of the device");
322             EntityOwnershipCandidateRegistration ownershipRegistrant = entRegistrationMap.get(entity);
323             if (ownershipRegistrant != null) {
324                 ownershipRegistrant.close();
325             }
326         }
327     }
328
329     public void unregisterEntityOwnershipRequest(NodeId nodeId) {
330         Entity entity = new Entity(DEVICE_TYPE, nodeId.getValue());
331         entsession.remove(entity);
332         EntityOwnershipCandidateRegistration entRegCandidate = entRegistrationMap.get(entity);
333         if(entRegCandidate != null){
334             LOG.info("unregisterEntityOwnershipRequest: Unregister controller entity ownership " +
335                     "request for {}", nodeId);
336             entRegCandidate.close();
337             entRegistrationMap.remove(entity);
338         }
339     }
340
341     @Override
342     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
343            final Throwable cause) {
344     }
345
346     @Override
347     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
348        // NOOP
349     }
350
351     private static void registerRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
352         // Routed RPC registration is only done when *this* instance is owner of
353         // the entity.
354         if(entityMetadata.getOfSwitch().isEntityOwner()) {
355             if (!entityMetadata.isRPCRegistrationDone.get()) {
356                 entityMetadata.setIsRPCRegistrationDone(true);
357                 ModelDrivenSwitchRegistration registration =
358                         entityMetadata.getOfSwitch().register(entityMetadata.getRpcProviderRegistry());
359
360                 entityMetadata.getContext().setProviderRegistration(registration);
361
362                 LOG.info("registerRoutedRPCForSwitch: Registered routed rpc for ModelDrivenSwitch {}",
363                         entityMetadata.getOfSwitch().getNodeId().getValue());
364             }
365         } else {
366             LOG.info("registerRoutedRPCForSwitch: Skipping routed rpc registration for ModelDrivenSwitch {}",
367                     entityMetadata.getOfSwitch().getNodeId().getValue());
368         }
369     }
370
371     private static void deregisterRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
372
373         ModelDrivenSwitchRegistration registration = entityMetadata.getContext().getProviderRegistration();
374         if (null != registration) {
375             registration.close();
376             entityMetadata.getContext().setProviderRegistration(null);
377             entityMetadata.setIsRPCRegistrationDone(false);
378         }
379         LOG.info("deregisterRoutedRPCForSwitch: De-registered routed rpc for ModelDrivenSwitch {}",
380                 entityMetadata.getOfSwitch().getNodeId().getValue());
381     }
382
383     private static void sendNodeAddedNotification(MDSwitchMetaData entityMetadata) {
384         //Node added notification need to be sent irrespective of whether
385         // *this* instance is owner of the entity or not. Because yang notifications
386         // are local, and we should maintain the behavior across the application.
387         LOG.info("sendNodeAddedNotification: Node Added notification is sent for ModelDrivenSwitch {}",
388                 entityMetadata.getOfSwitch().getNodeId().getValue());
389
390         entityMetadata.getContext().getNotificationEnqueuer().enqueueNotification(
391                 entityMetadata.getWrappedNotification());
392
393         //Send multipart request to get other details of the switch.
394         entityMetadata.getOfSwitch().requestSwitchDetails();
395     }
396
397     private void setDeviceOwnershipState(Entity entity, boolean isMaster) {
398         MDSwitchMetaData entityMetadata = entsession.get(entity);
399         entityMetadata.setIsOwnershipInitialized(true);
400         entityMetadata.getOfSwitch().setEntityOwnership(isMaster);
401     }
402
403     private class MDSwitchMetaData {
404
405         final private ModelDrivenSwitch ofSwitch;
406         final private SessionContext context;
407         final private NotificationQueueWrapper wrappedNotification;
408         final private RpcProviderRegistry rpcProviderRegistry;
409         final private AtomicBoolean isRPCRegistrationDone = new AtomicBoolean(false);
410         final private AtomicBoolean isOwnershipInitialized = new AtomicBoolean(false);
411
412         MDSwitchMetaData(ModelDrivenSwitch ofSwitch,
413                          SessionContext context,
414                          NotificationQueueWrapper wrappedNotification,
415                          RpcProviderRegistry rpcProviderRegistry) {
416             this.ofSwitch = ofSwitch;
417             this.context = context;
418             this.wrappedNotification = wrappedNotification;
419             this.rpcProviderRegistry = rpcProviderRegistry;
420         }
421
422         public ModelDrivenSwitch getOfSwitch() {
423             return ofSwitch;
424         }
425
426         public SessionContext getContext() {
427             return context;
428         }
429
430         public NotificationQueueWrapper getWrappedNotification() {
431             return wrappedNotification;
432         }
433
434         public RpcProviderRegistry getRpcProviderRegistry() {
435             return rpcProviderRegistry;
436         }
437
438         public AtomicBoolean getIsRPCRegistrationDone() {
439             return isRPCRegistrationDone;
440         }
441
442         public void setIsRPCRegistrationDone(boolean isRPCRegistrationDone) {
443             this.isRPCRegistrationDone.set(isRPCRegistrationDone);
444         }
445
446         public boolean getIsOwnershipInitialized() {
447             return isOwnershipInitialized.get();
448         }
449
450         public void setIsOwnershipInitialized( boolean ownershipState) {
451             this.isOwnershipInitialized.set(ownershipState);
452         }
453     }
454 }