2 * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.role;
11 import java.math.BigInteger;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
27 import org.opendaylight.openflowplugin.openflow.md.core.sal.OpenflowPluginConfig;
28 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
31 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
32 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
34 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushTask;
35 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushException;
36 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
37 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.ConcurrentHashMap;
40 import com.google.common.util.concurrent.ListeningExecutorService;
41 import com.google.common.util.concurrent.MoreExecutors;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.FutureCallback;
44 import com.google.common.util.concurrent.ListenableFuture;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 public class OfEntityManager implements TransactionChainListener{
50 private static final Logger LOG = LoggerFactory.getLogger(OfEntityManager.class);
52 private static final QName ENTITY_QNAME =
53 org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.core.general.entity.rev150820.Entity.QNAME;
54 private static final QName ENTITY_NAME = QName.create(ENTITY_QNAME, "name");
56 private DataBroker dataBroker;
57 private EntityOwnershipService entityOwnershipService;
58 private final OpenflowOwnershipListener ownershipListener;
59 private final AtomicBoolean registeredListener = new AtomicBoolean();
60 private ConcurrentHashMap<Entity, MDSwitchMetaData> entsession;
61 private ConcurrentHashMap<Entity, EntityOwnershipCandidateRegistration> entRegistrationMap;
62 private final String DEVICE_TYPE = "openflow";
64 private final ListeningExecutorService pool;
66 private final OpenflowPluginConfig openflowPluginConfig;
68 public OfEntityManager(EntityOwnershipService entityOwnershipService, OpenflowPluginConfig ofPluginConfig) {
69 this.entityOwnershipService = entityOwnershipService;
70 openflowPluginConfig = ofPluginConfig;
71 ownershipListener = new OpenflowOwnershipListener(this);
72 entsession = new ConcurrentHashMap<>();
73 entRegistrationMap = new ConcurrentHashMap<>();
74 ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
75 20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "ofEntity");
76 pool = MoreExecutors.listeningDecorator(delegate);
79 public void setDataBroker(DataBroker dbBroker) {
80 this.dataBroker = dbBroker;
83 public void requestOpenflowEntityOwnership(final ModelDrivenSwitch ofSwitch,
84 final SessionContext context,
85 final NotificationQueueWrapper wrappedNotification,
86 final RpcProviderRegistry rpcProviderRegistry) {
87 MDSwitchMetaData entityMetaData =
88 new MDSwitchMetaData(ofSwitch,context,wrappedNotification,rpcProviderRegistry);
90 if (registeredListener.compareAndSet(false, true)) {
91 entityOwnershipService.registerListener(DEVICE_TYPE, ownershipListener);
93 final Entity entity = new Entity(DEVICE_TYPE, ofSwitch.getNodeId().getValue());
94 entsession.put(entity, entityMetaData);
96 //Register as soon as possible to avoid missing any entity ownership change event
97 final EntityOwnershipCandidateRegistration entityRegistration;
99 entityRegistration = entityOwnershipService.registerCandidate(entity);
100 entRegistrationMap.put(entity, entityRegistration);
101 LOG.info("requestOpenflowEntityOwnership: Registered controller for the ownership of {}", ofSwitch.getNodeId() );
102 } catch (CandidateAlreadyRegisteredException e) {
103 // we can log and move for this error, as listener is present and role changes will be served.
104 LOG.error("requestOpenflowEntityOwnership : Controller registration for ownership of {} failed ", ofSwitch.getNodeId(), e );
107 Optional <EntityOwnershipState> entityOwnershipStateOptional =
108 entityOwnershipService.getOwnershipState(entity);
110 if (entityOwnershipStateOptional.isPresent()) {
111 final EntityOwnershipState entityOwnershipState = entityOwnershipStateOptional.get();
112 if (entityOwnershipState.hasOwner()) {
113 final OfpRole newRole ;
114 if (entityOwnershipState.isOwner()) {
115 LOG.info("requestOpenflowEntityOwnership: Set controller as a MASTER controller " +
116 "because it's the OWNER of the {}", ofSwitch.getNodeId());
117 newRole = OfpRole.BECOMEMASTER;
118 setDeviceOwnershipState(entity,true);
119 registerRoutedRPCForSwitch(entsession.get(entity));
121 LOG.info("requestOpenflowEntityOwnership: Set controller as a SLAVE controller " +
122 "because it's is not the owner of the {}", ofSwitch.getNodeId());
123 newRole = OfpRole.BECOMESLAVE;
124 setDeviceOwnershipState(entity,false);
126 RolePushTask task = new RolePushTask(newRole, context);
127 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
128 CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
129 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
130 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
132 public void onSuccess(Boolean result){
133 LOG.info("requestOpenflowEntityOwnership: Controller is now {} of the {}",
134 newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE",ofSwitch.getNodeId() );
136 sendNodeAddedNotification(entsession.get(entity));
139 public void onFailure(Throwable t){
140 LOG.warn("requestOpenflowEntityOwnership: Controller is not able to set " +
141 "the role for {}",ofSwitch.getNodeId(), t);
143 if(newRole == OfpRole.BECOMEMASTER) {
144 LOG.info("requestOpenflowEntityOwnership: ..and controller is the owner of the " +
145 "device {}. Closing the registration, so other controllers can try to " +
146 "become owner and attempt to be master controller.",ofSwitch.getNodeId());
148 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
149 if (ownershipRegistrent != null) {
150 ownershipRegistrent.close();
151 entRegistrationMap.remove(entity);
154 LOG.info("requestOpenflowEntityOwnership: ..and registering it back to participate" +
155 " in ownership of the entity.");
157 EntityOwnershipCandidateRegistration entityRegistration;
159 entityRegistration = entityOwnershipService.registerCandidate(entity);
160 entRegistrationMap.put(entity, entityRegistration);
161 LOG.info("requestOpenflowEntityOwnership: re-registered controller for " +
162 "ownership of the {}", ofSwitch.getNodeId() );
163 } catch (CandidateAlreadyRegisteredException e) {
164 // we can log and move for this error, as listener is present and role changes will be served.
165 LOG.error("requestOpenflowEntityOwnership: *Surprisingly* Entity is already " +
166 "registered with EntityOwnershipService : {}", ofSwitch.getNodeId(), e );
170 LOG.error("requestOpenflowEntityOwnership : Not able to set role {} for {}"
171 , newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", ofSwitch.getNodeId());
179 public void setSlaveRole(SessionContext sessionContext) {
180 OfpRole newRole = OfpRole.BECOMESLAVE;
181 if (sessionContext != null) {
182 final BigInteger targetSwitchDPId = sessionContext.getFeatures().getDatapathId();
183 LOG.debug("setSlaveRole: Set controller as a SLAVE controller for {}", targetSwitchDPId.toString());
185 RolePushTask task = new RolePushTask(newRole, sessionContext);
186 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
187 final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
188 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
189 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
191 public void onSuccess(Boolean result){
192 LOG.debug("setSlaveRole: Controller is set as a SLAVE for {}", targetSwitchDPId.toString());
195 public void onFailure(Throwable e){
196 LOG.error("setSlaveRole: Role request to set controller as a SLAVE failed for {}",
197 targetSwitchDPId.toString(), e);
201 LOG.warn("setSlaveRole: sessionContext is not set. Device is not connected anymore");
205 public void onDeviceOwnershipChanged(final EntityOwnershipChange ownershipChange) {
206 final OfpRole newRole;
207 final Entity entity = ownershipChange.getEntity();
208 SessionContext sessionContext = entsession.get(entity)!=null?entsession.get(entity).getContext():null;
209 if (ownershipChange.isOwner()) {
210 LOG.info("onDeviceOwnershipChanged: Set controller as a MASTER controller because " +
211 "it's the OWNER of the {}", entity);
212 newRole = OfpRole.BECOMEMASTER;
216 newRole = OfpRole.BECOMESLAVE;
217 if(sessionContext != null && ownershipChange.hasOwner()) {
218 LOG.info("onDeviceOwnershipChanged: Set controller as a SLAVE controller because " +
219 "it's not the OWNER of the {}", entity);
221 if(ownershipChange.wasOwner()) {
222 setDeviceOwnershipState(entity,false);
223 deregisterRoutedRPCForSwitch(entsession.get(entity));
224 // You don't have to explicitly set role to Slave in this case,
225 // because other controller will be taking over the master role
226 // and that will force other controller to become slave.
228 boolean isOwnershipInitialized = entsession.get(entity).getIsOwnershipInitialized();
229 setDeviceOwnershipState(entity,false);
230 if (!isOwnershipInitialized) {
231 setSlaveRole(sessionContext);
232 sendNodeAddedNotification(entsession.get(entity));
238 if (sessionContext != null) {
239 //Register the RPC, given *this* controller instance is going to be master owner.
240 //If role registration fails for this node, it will deregister as a candidate for
241 //ownership and that will make this controller non-owner and it will deregister the
243 setDeviceOwnershipState(entity,newRole==OfpRole.BECOMEMASTER);
244 registerRoutedRPCForSwitch(entsession.get(entity));
246 final String targetSwitchDPId = sessionContext.getFeatures().getDatapathId().toString();
247 RolePushTask task = new RolePushTask(newRole, sessionContext);
248 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
250 final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
251 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
252 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
254 public void onSuccess(Boolean result){
255 LOG.info("onDeviceOwnershipChanged: Controller is successfully set as a " +
256 "MASTER controller for {}", targetSwitchDPId);
257 if(!openflowPluginConfig.skipTableFeatures()) {
258 if(LOG.isDebugEnabled()){
259 LOG.debug("Send table feature request for entity {}",entity.getId());
261 entsession.get(entity).getOfSwitch().sendEmptyTableFeatureRequest();
263 sendNodeAddedNotification(entsession.get(entity));
267 public void onFailure(Throwable e){
269 LOG.warn("onDeviceOwnershipChanged: Controller is not able to set the " +
270 "MASTER role for {}.", targetSwitchDPId,e);
271 if(newRole == OfpRole.BECOMEMASTER) {
272 LOG.info("onDeviceOwnershipChanged: ..and this *instance* is owner of the device {}. " +
273 "Closing the registration, so other entity can become owner " +
274 "and attempt to be master controller.",targetSwitchDPId);
276 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
277 if (ownershipRegistrent != null) {
278 setDeviceOwnershipState(entity,false);
279 ownershipRegistrent.close();
280 MDSwitchMetaData switchMetadata = entsession.get(entity);
281 if(switchMetadata != null){
282 switchMetadata.setIsOwnershipInitialized(false);
283 //We can probably leave deregistration till the node ownerhsip change.
284 //But that can probably cause some race condition.
285 deregisterRoutedRPCForSwitch(switchMetadata);
289 LOG.info("onDeviceOwnershipChanged: ..and registering it back to participate in " +
290 "ownership and re-try");
292 EntityOwnershipCandidateRegistration entityRegistration;
294 entityRegistration = entityOwnershipService.registerCandidate(entity);
295 entRegistrationMap.put(entity, entityRegistration);
296 LOG.info("onDeviceOwnershipChanged: re-registered candidate for " +
297 "ownership of the {}", targetSwitchDPId );
298 } catch (CandidateAlreadyRegisteredException ex) {
299 // we can log and move for this error, as listener is present and role changes will be served.
300 LOG.error("onDeviceOwnershipChanged: *Surprisingly* Entity is already " +
301 "registered with EntityOwnershipService : {}", targetSwitchDPId, ex );
305 LOG.error("onDeviceOwnershipChanged : Not able to set role {} for " +
306 " {}", newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", targetSwitchDPId);
311 LOG.warn("onDeviceOwnershipChanged: sessionContext is not available. Releasing ownership of the device");
312 EntityOwnershipCandidateRegistration ownershipRegistrant = entRegistrationMap.get(entity);
313 if (ownershipRegistrant != null) {
314 ownershipRegistrant.close();
319 public void unregisterEntityOwnershipRequest(NodeId nodeId) {
320 Entity entity = new Entity(DEVICE_TYPE, nodeId.getValue());
321 entsession.remove(entity);
322 EntityOwnershipCandidateRegistration entRegCandidate = entRegistrationMap.get(entity);
323 if(entRegCandidate != null){
324 LOG.info("unregisterEntityOwnershipRequest: Unregister controller entity ownership " +
325 "request for {}", nodeId);
326 entRegCandidate.close();
327 entRegistrationMap.remove(entity);
332 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
333 final Throwable cause) {
337 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
341 private void registerRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
342 // Routed RPC registration is only done when *this* instance is owner of
344 if(entityMetadata.getOfSwitch().isEntityOwner()) {
345 if (!entityMetadata.isRPCRegistrationDone.get()) {
346 entityMetadata.setIsRPCRegistrationDone(true);
347 CompositeObjectRegistration<ModelDrivenSwitch> registration =
348 entityMetadata.getOfSwitch().register(entityMetadata.getRpcProviderRegistry());
350 entityMetadata.getContext().setProviderRegistration(registration);
352 LOG.info("registerRoutedRPCForSwitch: Registered routed rpc for ModelDrivenSwitch {}",
353 entityMetadata.getOfSwitch().getNodeId().getValue());
356 LOG.info("registerRoutedRPCForSwitch: Skipping routed rpc registration for ModelDrivenSwitch {}",
357 entityMetadata.getOfSwitch().getNodeId().getValue());
361 private void deregisterRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
363 CompositeObjectRegistration<ModelDrivenSwitch> registration = entityMetadata.getContext().getProviderRegistration();
364 if (null != registration) {
365 registration.close();
366 entityMetadata.getContext().setProviderRegistration(null);
367 entityMetadata.setIsRPCRegistrationDone(false);
369 LOG.info("deregisterRoutedRPCForSwitch: De-registered routed rpc for ModelDrivenSwitch {}",
370 entityMetadata.getOfSwitch().getNodeId().getValue());
373 private void sendNodeAddedNotification(MDSwitchMetaData entityMetadata) {
374 //Node added notification need to be sent irrespective of whether
375 // *this* instance is owner of the entity or not. Because yang notifications
376 // are local, and we should maintain the behavior across the application.
377 LOG.info("sendNodeAddedNotification: Node Added notification is sent for ModelDrivenSwitch {}",
378 entityMetadata.getOfSwitch().getNodeId().getValue());
380 entityMetadata.getContext().getNotificationEnqueuer().enqueueNotification(
381 entityMetadata.getWrappedNotification());
383 //Send multipart request to get other details of the switch.
384 entityMetadata.getOfSwitch().requestSwitchDetails();
387 private void setDeviceOwnershipState(Entity entity, boolean isMaster) {
388 MDSwitchMetaData entityMetadata = entsession.get(entity);
389 entityMetadata.setIsOwnershipInitialized(true);
390 entityMetadata.getOfSwitch().setEntityOwnership(isMaster);
393 private class MDSwitchMetaData {
395 final private ModelDrivenSwitch ofSwitch;
396 final private SessionContext context;
397 final private NotificationQueueWrapper wrappedNotification;
398 final private RpcProviderRegistry rpcProviderRegistry;
399 final private AtomicBoolean isRPCRegistrationDone = new AtomicBoolean(false);
400 final private AtomicBoolean isOwnershipInitialized = new AtomicBoolean(false);
402 MDSwitchMetaData(ModelDrivenSwitch ofSwitch,
403 SessionContext context,
404 NotificationQueueWrapper wrappedNotification,
405 RpcProviderRegistry rpcProviderRegistry) {
406 this.ofSwitch = ofSwitch;
407 this.context = context;
408 this.wrappedNotification = wrappedNotification;
409 this.rpcProviderRegistry = rpcProviderRegistry;
412 public ModelDrivenSwitch getOfSwitch() {
416 public SessionContext getContext() {
420 public NotificationQueueWrapper getWrappedNotification() {
421 return wrappedNotification;
424 public RpcProviderRegistry getRpcProviderRegistry() {
425 return rpcProviderRegistry;
428 public AtomicBoolean getIsRPCRegistrationDone() {
429 return isRPCRegistrationDone;
432 public void setIsRPCRegistrationDone(boolean isRPCRegistrationDone) {
433 this.isRPCRegistrationDone.set(isRPCRegistrationDone);
436 public boolean getIsOwnershipInitialized() {
437 return isOwnershipInitialized.get();
440 public void setIsOwnershipInitialized( boolean ownershipState) {
441 this.isOwnershipInitialized.set(ownershipState);