2 * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.role;
11 import java.math.BigInteger;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
27 import org.opendaylight.openflowplugin.openflow.md.core.sal.OpenflowPluginConfig;
28 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
31 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
34 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushTask;
35 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushException;
36 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
37 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.ConcurrentHashMap;
40 import com.google.common.util.concurrent.ListeningExecutorService;
41 import com.google.common.util.concurrent.MoreExecutors;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.FutureCallback;
44 import com.google.common.util.concurrent.ListenableFuture;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 public class OfEntityManager implements TransactionChainListener{
50 private static final Logger LOG = LoggerFactory.getLogger(OfEntityManager.class);
52 private static final QName ENTITY_QNAME =
53 org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.core.general.entity.rev150820.Entity.QNAME;
54 private static final QName ENTITY_NAME = QName.create(ENTITY_QNAME, "name");
56 private DataBroker dataBroker;
57 private EntityOwnershipService entityOwnershipService;
58 private final OpenflowOwnershipListener ownershipListener;
59 private final AtomicBoolean registeredListener = new AtomicBoolean();
60 private ConcurrentHashMap<Entity, MDSwitchMetaData> entsession;
61 private ConcurrentHashMap<Entity, EntityOwnershipCandidateRegistration> entRegistrationMap;
62 private final String DEVICE_TYPE = "openflow";
64 private final ListeningExecutorService pool;
66 private final OpenflowPluginConfig openflowPluginConfig;
68 public OfEntityManager(EntityOwnershipService entityOwnershipService, OpenflowPluginConfig ofPluginConfig) {
69 this.entityOwnershipService = entityOwnershipService;
70 openflowPluginConfig = ofPluginConfig;
71 ownershipListener = new OpenflowOwnershipListener(this);
72 entsession = new ConcurrentHashMap<>();
73 entRegistrationMap = new ConcurrentHashMap<>();
74 ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
75 20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "ofEntity");
76 pool = MoreExecutors.listeningDecorator(delegate);
79 public void setDataBroker(DataBroker dbBroker) {
80 this.dataBroker = dbBroker;
84 registerEntityOwnershipChangeListener();
87 public void registerEntityOwnershipChangeListener() {
88 if(entityOwnershipService!=null) {
89 if(LOG.isDebugEnabled()) {
90 LOG.debug("registerEntityOwnershipChangeListener: Registering entity ownership change listener for entitier of type {}", DEVICE_TYPE);
92 entityOwnershipService.registerListener(DEVICE_TYPE, ownershipListener);
96 public void requestOpenflowEntityOwnership(final ModelDrivenSwitch ofSwitch,
97 final SessionContext context,
98 final NotificationQueueWrapper wrappedNotification,
99 final RpcProviderRegistry rpcProviderRegistry) {
100 MDSwitchMetaData entityMetaData =
101 new MDSwitchMetaData(ofSwitch,context,wrappedNotification,rpcProviderRegistry);
103 final Entity entity = new Entity(DEVICE_TYPE, ofSwitch.getNodeId().getValue());
104 entsession.put(entity, entityMetaData);
106 //Register as soon as possible to avoid missing any entity ownership change event
107 final EntityOwnershipCandidateRegistration entityRegistration;
109 entityRegistration = entityOwnershipService.registerCandidate(entity);
110 entRegistrationMap.put(entity, entityRegistration);
111 LOG.info("requestOpenflowEntityOwnership: Registered controller for the ownership of {}", ofSwitch.getNodeId() );
112 } catch (CandidateAlreadyRegisteredException e) {
113 // we can log and move for this error, as listener is present and role changes will be served.
114 LOG.error("requestOpenflowEntityOwnership : Controller registration for ownership of {} failed ", ofSwitch.getNodeId(), e );
117 Optional <EntityOwnershipState> entityOwnershipStateOptional =
118 entityOwnershipService.getOwnershipState(entity);
120 if (entityOwnershipStateOptional.isPresent()) {
121 final EntityOwnershipState entityOwnershipState = entityOwnershipStateOptional.get();
122 if (entityOwnershipState.hasOwner()) {
123 final OfpRole newRole ;
124 if (entityOwnershipState.isOwner()) {
125 LOG.info("requestOpenflowEntityOwnership: Set controller as a MASTER controller " +
126 "because it's the OWNER of the {}", ofSwitch.getNodeId());
127 newRole = OfpRole.BECOMEMASTER;
128 setDeviceOwnershipState(entity,true);
129 registerRoutedRPCForSwitch(entsession.get(entity));
131 LOG.info("requestOpenflowEntityOwnership: Set controller as a SLAVE controller " +
132 "because it's is not the owner of the {}", ofSwitch.getNodeId());
133 newRole = OfpRole.BECOMESLAVE;
134 setDeviceOwnershipState(entity,false);
136 RolePushTask task = new RolePushTask(newRole, context);
137 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
138 CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
139 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
140 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
142 public void onSuccess(Boolean result){
143 LOG.info("requestOpenflowEntityOwnership: Controller is now {} of the {}",
144 newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE",ofSwitch.getNodeId() );
146 sendNodeAddedNotification(entsession.get(entity));
149 public void onFailure(Throwable t){
150 LOG.warn("requestOpenflowEntityOwnership: Controller is not able to set " +
151 "the role for {}",ofSwitch.getNodeId(), t);
153 if(newRole == OfpRole.BECOMEMASTER) {
154 LOG.info("requestOpenflowEntityOwnership: ..and controller is the owner of the " +
155 "device {}. Closing the registration, so other controllers can try to " +
156 "become owner and attempt to be master controller.",ofSwitch.getNodeId());
158 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
159 if (ownershipRegistrent != null) {
160 ownershipRegistrent.close();
161 entRegistrationMap.remove(entity);
164 LOG.info("requestOpenflowEntityOwnership: ..and registering it back to participate" +
165 " in ownership of the entity.");
167 EntityOwnershipCandidateRegistration entityRegistration;
169 entityRegistration = entityOwnershipService.registerCandidate(entity);
170 entRegistrationMap.put(entity, entityRegistration);
171 LOG.info("requestOpenflowEntityOwnership: re-registered controller for " +
172 "ownership of the {}", ofSwitch.getNodeId() );
173 } catch (CandidateAlreadyRegisteredException e) {
174 // we can log and move for this error, as listener is present and role changes will be served.
175 LOG.error("requestOpenflowEntityOwnership: *Surprisingly* Entity is already " +
176 "registered with EntityOwnershipService : {}", ofSwitch.getNodeId(), e );
180 LOG.error("requestOpenflowEntityOwnership : Not able to set role {} for {}"
181 , newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", ofSwitch.getNodeId());
189 public void setSlaveRole(SessionContext sessionContext) {
190 OfpRole newRole = OfpRole.BECOMESLAVE;
191 if (sessionContext != null) {
192 final BigInteger targetSwitchDPId = sessionContext.getFeatures().getDatapathId();
193 LOG.debug("setSlaveRole: Set controller as a SLAVE controller for {}", targetSwitchDPId.toString());
195 RolePushTask task = new RolePushTask(newRole, sessionContext);
196 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
197 final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
198 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
199 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
201 public void onSuccess(Boolean result){
202 LOG.debug("setSlaveRole: Controller is set as a SLAVE for {}", targetSwitchDPId.toString());
205 public void onFailure(Throwable e){
206 LOG.error("setSlaveRole: Role request to set controller as a SLAVE failed for {}",
207 targetSwitchDPId.toString(), e);
211 LOG.warn("setSlaveRole: sessionContext is not set. Device is not connected anymore");
215 public void onDeviceOwnershipChanged(final EntityOwnershipChange ownershipChange) {
216 final OfpRole newRole;
217 final Entity entity = ownershipChange.getEntity();
218 SessionContext sessionContext = entsession.get(entity)!=null?entsession.get(entity).getContext():null;
219 if (ownershipChange.isOwner()) {
220 LOG.info("onDeviceOwnershipChanged: Set controller as a MASTER controller because " +
221 "it's the OWNER of the {}", entity);
222 newRole = OfpRole.BECOMEMASTER;
226 newRole = OfpRole.BECOMESLAVE;
227 if(sessionContext != null && ownershipChange.hasOwner()) {
228 LOG.info("onDeviceOwnershipChanged: Set controller as a SLAVE controller because " +
229 "it's not the OWNER of the {}", entity);
231 if(ownershipChange.wasOwner()) {
232 setDeviceOwnershipState(entity,false);
233 deregisterRoutedRPCForSwitch(entsession.get(entity));
234 // You don't have to explicitly set role to Slave in this case,
235 // because other controller will be taking over the master role
236 // and that will force other controller to become slave.
238 boolean isOwnershipInitialized = entsession.get(entity).getIsOwnershipInitialized();
239 setDeviceOwnershipState(entity,false);
240 if (!isOwnershipInitialized) {
241 setSlaveRole(sessionContext);
242 sendNodeAddedNotification(entsession.get(entity));
248 if (sessionContext != null) {
249 //Register the RPC, given *this* controller instance is going to be master owner.
250 //If role registration fails for this node, it will deregister as a candidate for
251 //ownership and that will make this controller non-owner and it will deregister the
253 setDeviceOwnershipState(entity,newRole==OfpRole.BECOMEMASTER);
254 registerRoutedRPCForSwitch(entsession.get(entity));
256 final String targetSwitchDPId = sessionContext.getFeatures().getDatapathId().toString();
257 RolePushTask task = new RolePushTask(newRole, sessionContext);
258 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
260 final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
261 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
262 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
264 public void onSuccess(Boolean result){
265 LOG.info("onDeviceOwnershipChanged: Controller is successfully set as a " +
266 "MASTER controller for {}", targetSwitchDPId);
267 if(!openflowPluginConfig.skipTableFeatures()) {
268 if(LOG.isDebugEnabled()){
269 LOG.debug("Send table feature request for entity {}",entity.getId());
271 entsession.get(entity).getOfSwitch().sendEmptyTableFeatureRequest();
273 sendNodeAddedNotification(entsession.get(entity));
277 public void onFailure(Throwable e){
279 LOG.warn("onDeviceOwnershipChanged: Controller is not able to set the " +
280 "MASTER role for {}.", targetSwitchDPId,e);
281 if(newRole == OfpRole.BECOMEMASTER) {
282 LOG.info("onDeviceOwnershipChanged: ..and this *instance* is owner of the device {}. " +
283 "Closing the registration, so other entity can become owner " +
284 "and attempt to be master controller.",targetSwitchDPId);
286 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
287 if (ownershipRegistrent != null) {
288 setDeviceOwnershipState(entity,false);
289 ownershipRegistrent.close();
290 MDSwitchMetaData switchMetadata = entsession.get(entity);
291 if(switchMetadata != null){
292 switchMetadata.setIsOwnershipInitialized(false);
293 //We can probably leave deregistration till the node ownerhsip change.
294 //But that can probably cause some race condition.
295 deregisterRoutedRPCForSwitch(switchMetadata);
299 LOG.info("onDeviceOwnershipChanged: ..and registering it back to participate in " +
300 "ownership and re-try");
302 EntityOwnershipCandidateRegistration entityRegistration;
304 entityRegistration = entityOwnershipService.registerCandidate(entity);
305 entRegistrationMap.put(entity, entityRegistration);
306 LOG.info("onDeviceOwnershipChanged: re-registered candidate for " +
307 "ownership of the {}", targetSwitchDPId );
308 } catch (CandidateAlreadyRegisteredException ex) {
309 // we can log and move for this error, as listener is present and role changes will be served.
310 LOG.error("onDeviceOwnershipChanged: *Surprisingly* Entity is already " +
311 "registered with EntityOwnershipService : {}", targetSwitchDPId, ex );
315 LOG.error("onDeviceOwnershipChanged : Not able to set role {} for " +
316 " {}", newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", targetSwitchDPId);
321 LOG.warn("onDeviceOwnershipChanged: sessionContext is not available. Releasing ownership of the device");
322 EntityOwnershipCandidateRegistration ownershipRegistrant = entRegistrationMap.get(entity);
323 if (ownershipRegistrant != null) {
324 ownershipRegistrant.close();
329 public void unregisterEntityOwnershipRequest(NodeId nodeId) {
330 Entity entity = new Entity(DEVICE_TYPE, nodeId.getValue());
331 entsession.remove(entity);
332 EntityOwnershipCandidateRegistration entRegCandidate = entRegistrationMap.get(entity);
333 if(entRegCandidate != null){
334 LOG.info("unregisterEntityOwnershipRequest: Unregister controller entity ownership " +
335 "request for {}", nodeId);
336 entRegCandidate.close();
337 entRegistrationMap.remove(entity);
342 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
343 final Throwable cause) {
347 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
351 private void registerRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
352 // Routed RPC registration is only done when *this* instance is owner of
354 if(entityMetadata.getOfSwitch().isEntityOwner()) {
355 if (!entityMetadata.isRPCRegistrationDone.get()) {
356 entityMetadata.setIsRPCRegistrationDone(true);
357 CompositeObjectRegistration<ModelDrivenSwitch> registration =
358 entityMetadata.getOfSwitch().register(entityMetadata.getRpcProviderRegistry());
360 entityMetadata.getContext().setProviderRegistration(registration);
362 LOG.info("registerRoutedRPCForSwitch: Registered routed rpc for ModelDrivenSwitch {}",
363 entityMetadata.getOfSwitch().getNodeId().getValue());
366 LOG.info("registerRoutedRPCForSwitch: Skipping routed rpc registration for ModelDrivenSwitch {}",
367 entityMetadata.getOfSwitch().getNodeId().getValue());
371 private void deregisterRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
373 CompositeObjectRegistration<ModelDrivenSwitch> registration = entityMetadata.getContext().getProviderRegistration();
374 if (null != registration) {
375 registration.close();
376 entityMetadata.getContext().setProviderRegistration(null);
377 entityMetadata.setIsRPCRegistrationDone(false);
379 LOG.info("deregisterRoutedRPCForSwitch: De-registered routed rpc for ModelDrivenSwitch {}",
380 entityMetadata.getOfSwitch().getNodeId().getValue());
383 private void sendNodeAddedNotification(MDSwitchMetaData entityMetadata) {
384 //Node added notification need to be sent irrespective of whether
385 // *this* instance is owner of the entity or not. Because yang notifications
386 // are local, and we should maintain the behavior across the application.
387 LOG.info("sendNodeAddedNotification: Node Added notification is sent for ModelDrivenSwitch {}",
388 entityMetadata.getOfSwitch().getNodeId().getValue());
390 entityMetadata.getContext().getNotificationEnqueuer().enqueueNotification(
391 entityMetadata.getWrappedNotification());
393 //Send multipart request to get other details of the switch.
394 entityMetadata.getOfSwitch().requestSwitchDetails();
397 private void setDeviceOwnershipState(Entity entity, boolean isMaster) {
398 MDSwitchMetaData entityMetadata = entsession.get(entity);
399 entityMetadata.setIsOwnershipInitialized(true);
400 entityMetadata.getOfSwitch().setEntityOwnership(isMaster);
403 private class MDSwitchMetaData {
405 final private ModelDrivenSwitch ofSwitch;
406 final private SessionContext context;
407 final private NotificationQueueWrapper wrappedNotification;
408 final private RpcProviderRegistry rpcProviderRegistry;
409 final private AtomicBoolean isRPCRegistrationDone = new AtomicBoolean(false);
410 final private AtomicBoolean isOwnershipInitialized = new AtomicBoolean(false);
412 MDSwitchMetaData(ModelDrivenSwitch ofSwitch,
413 SessionContext context,
414 NotificationQueueWrapper wrappedNotification,
415 RpcProviderRegistry rpcProviderRegistry) {
416 this.ofSwitch = ofSwitch;
417 this.context = context;
418 this.wrappedNotification = wrappedNotification;
419 this.rpcProviderRegistry = rpcProviderRegistry;
422 public ModelDrivenSwitch getOfSwitch() {
426 public SessionContext getContext() {
430 public NotificationQueueWrapper getWrappedNotification() {
431 return wrappedNotification;
434 public RpcProviderRegistry getRpcProviderRegistry() {
435 return rpcProviderRegistry;
438 public AtomicBoolean getIsRPCRegistrationDone() {
439 return isRPCRegistrationDone;
442 public void setIsRPCRegistrationDone(boolean isRPCRegistrationDone) {
443 this.isRPCRegistrationDone.set(isRPCRegistrationDone);
446 public boolean getIsOwnershipInitialized() {
447 return isOwnershipInitialized.get();
450 public void setIsOwnershipInitialized( boolean ownershipState) {
451 this.isOwnershipInitialized.set(ownershipState);