2 * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.role;
11 import java.math.BigInteger;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
20 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.openflowplugin.api.openflow.md.ModelDrivenSwitch;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
27 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
30 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.common.config.impl.rev140326.OfpRole;
33 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushTask;
34 import org.opendaylight.openflowplugin.openflow.md.core.session.RolePushException;
35 import org.opendaylight.openflowplugin.openflow.md.util.RoleUtil;
36 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.ConcurrentHashMap;
39 import com.google.common.util.concurrent.ListeningExecutorService;
40 import com.google.common.util.concurrent.MoreExecutors;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.FutureCallback;
43 import java.util.concurrent.ArrayBlockingQueue;
44 import com.google.common.util.concurrent.ListenableFuture;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 public class OfEntityManager implements TransactionChainListener{
50 private static final Logger LOG = LoggerFactory.getLogger(OfEntityManager.class);
52 private static final QName ENTITY_QNAME =
53 org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.core.general.entity.rev150820.Entity.QNAME;
54 private static final QName ENTITY_NAME = QName.create(ENTITY_QNAME, "name");
56 private DataBroker dataBroker;
57 private EntityOwnershipService entityOwnershipService;
58 private final OpenflowOwnershipListener ownershipListener;
59 private final AtomicBoolean registeredListener = new AtomicBoolean();
60 private ConcurrentHashMap<Entity, MDSwitchMetaData> entsession;
61 private ConcurrentHashMap<Entity, EntityOwnershipCandidateRegistration> entRegistrationMap;
62 private final String DEVICE_TYPE = "openflow";
64 private final ListeningExecutorService pool;
66 public OfEntityManager( EntityOwnershipService entityOwnershipService ) {
67 this.entityOwnershipService = entityOwnershipService;
68 ownershipListener = new OpenflowOwnershipListener(this);
69 entsession = new ConcurrentHashMap<>();
70 entRegistrationMap = new ConcurrentHashMap<>();
71 ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
72 20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "ofEntity");
73 pool = MoreExecutors.listeningDecorator(delegate);
76 public void setDataBroker(DataBroker dbBroker) {
77 this.dataBroker = dbBroker;
80 public void requestOpenflowEntityOwnership(final ModelDrivenSwitch ofSwitch,
81 final SessionContext context,
82 final NotificationQueueWrapper wrappedNotification,
83 final RpcProviderRegistry rpcProviderRegistry) {
84 MDSwitchMetaData entityMetaData =
85 new MDSwitchMetaData(ofSwitch,context,wrappedNotification,rpcProviderRegistry);
87 if (registeredListener.compareAndSet(false, true)) {
88 entityOwnershipService.registerListener(DEVICE_TYPE, ownershipListener);
90 final Entity entity = new Entity(DEVICE_TYPE, ofSwitch.getNodeId().getValue());
91 entsession.put(entity, entityMetaData);
93 //Register as soon as possible to avoid missing any entity ownership change event
94 final EntityOwnershipCandidateRegistration entityRegistration;
96 entityRegistration = entityOwnershipService.registerCandidate(entity);
97 entRegistrationMap.put(entity, entityRegistration);
98 LOG.info("requestOpenflowEntityOwnership: Registered controller for the ownership of {}", ofSwitch.getNodeId() );
99 } catch (CandidateAlreadyRegisteredException e) {
100 // we can log and move for this error, as listener is present and role changes will be served.
101 LOG.error("requestOpenflowEntityOwnership : Controller registration for ownership of {} failed ", ofSwitch.getNodeId(), e );
104 Optional <EntityOwnershipState> entityOwnershipStateOptional =
105 entityOwnershipService.getOwnershipState(entity);
107 if (entityOwnershipStateOptional.isPresent()) {
108 final EntityOwnershipState entityOwnershipState = entityOwnershipStateOptional.get();
109 if (entityOwnershipState.hasOwner()) {
110 final OfpRole newRole ;
111 if (entityOwnershipState.isOwner()) {
112 LOG.info("requestOpenflowEntityOwnership: Set controller as a MASTER controller " +
113 "because it's the OWNER of the {}", ofSwitch.getNodeId());
114 newRole = OfpRole.BECOMEMASTER;
115 entsession.get(entity).getOfSwitch().setEntityOwnership(true);
116 registerRoutedRPCForSwitch(entsession.get(entity));
118 LOG.info("requestOpenflowEntityOwnership: Set controller as a SLAVE controller " +
119 "because it's is not the owner of the {}", ofSwitch.getNodeId());
120 newRole = OfpRole.BECOMESLAVE;
121 entsession.get(entity).getOfSwitch().setEntityOwnership(false);
123 RolePushTask task = new RolePushTask(newRole, context);
124 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
125 CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
126 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
127 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
129 public void onSuccess(Boolean result){
130 LOG.info("requestOpenflowEntityOwnership: Controller is now {} of the {}",
131 newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE",ofSwitch.getNodeId() );
133 sendNodeAddedNotification(entsession.get(entity));
136 public void onFailure(Throwable t){
137 LOG.warn("requestOpenflowEntityOwnership: Controller is not able to set " +
138 "the role for {}",ofSwitch.getNodeId(), t);
140 if(newRole == OfpRole.BECOMEMASTER) {
141 LOG.info("requestOpenflowEntityOwnership: ..and controller is the owner of the " +
142 "device {}. Closing the registration, so other controllers can try to " +
143 "become owner and attempt to be master controller.",ofSwitch.getNodeId());
145 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
146 if (ownershipRegistrent != null) {
147 ownershipRegistrent.close();
148 entRegistrationMap.remove(entity);
151 LOG.info("requestOpenflowEntityOwnership: ..and registering it back to participate" +
152 " in ownership of the entity.");
154 EntityOwnershipCandidateRegistration entityRegistration;
156 entityRegistration = entityOwnershipService.registerCandidate(entity);
157 entRegistrationMap.put(entity, entityRegistration);
158 LOG.info("requestOpenflowEntityOwnership: re-registered controller for " +
159 "ownership of the {}", ofSwitch.getNodeId() );
160 } catch (CandidateAlreadyRegisteredException e) {
161 // we can log and move for this error, as listener is present and role changes will be served.
162 LOG.error("requestOpenflowEntityOwnership: *Surprisingly* Entity is already " +
163 "registered with EntityOwnershipService : {}", ofSwitch.getNodeId(), e );
167 LOG.error("requestOpenflowEntityOwnership : Not able to set role {} for {}"
168 , newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", ofSwitch.getNodeId());
176 public void setSlaveRole(SessionContext sessionContext) {
178 newRole = OfpRole.BECOMESLAVE;
179 if (sessionContext != null) {
180 final BigInteger targetSwitchDPId = sessionContext.getFeatures().getDatapathId();
181 LOG.debug("setSlaveRole: Set controller as a SLAVE controller for {}", targetSwitchDPId.toString());
183 RolePushTask task = new RolePushTask(newRole, sessionContext);
184 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
185 final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
186 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
187 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
189 public void onSuccess(Boolean result){
190 LOG.debug("setSlaveRole: Controller is set as a SLAVE for {}", targetSwitchDPId.toString());
193 public void onFailure(Throwable e){
194 LOG.error("setSlaveRole: Role request to set controller as a SLAVE failed for {}",
195 targetSwitchDPId.toString(), e);
199 LOG.warn("setSlaveRole: sessionContext is not set. Device is not connected anymore");
203 public void onDeviceOwnershipChanged(final EntityOwnershipChange ownershipChange) {
204 final OfpRole newRole;
205 final Entity entity = ownershipChange.getEntity();
206 SessionContext sessionContext = entsession.get(entity)!=null?entsession.get(entity).getContext():null;
207 if (ownershipChange.isOwner()) {
208 LOG.info("onDeviceOwnershipChanged: Set controller as a MASTER controller because " +
209 "it's the OWNER of the {}", entity);
210 newRole = OfpRole.BECOMEMASTER;
214 newRole = OfpRole.BECOMESLAVE;
215 if(sessionContext != null && ownershipChange.hasOwner()) {
216 LOG.info("onDeviceOwnershipChanged: Set controller as a SLAVE controller because " +
217 "it's not the OWNER of the {}", entity);
218 entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
219 setSlaveRole(sessionContext);
220 sendNodeAddedNotification(entsession.get(entity));
221 if(ownershipChange.wasOwner()) {
222 deregisterRoutedRPCForSwitch(entsession.get(entity));
223 // You don't have to explicitly set role to Slave in this case,
224 // because other controller will be taking over the master role
225 // and that will force other controller to become slave.
230 if (sessionContext != null) {
231 //Register the RPC, given *this* controller instance is going to be master owner.
232 //If role registration fails for this node, it will deregister as a candidate for
233 //ownership and that will make this controller non-owner and it will deregister the
235 entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
236 registerRoutedRPCForSwitch(entsession.get(entity));
238 final String targetSwitchDPId = sessionContext.getFeatures().getDatapathId().toString();
239 RolePushTask task = new RolePushTask(newRole, sessionContext);
240 ListenableFuture<Boolean> rolePushResult = pool.submit(task);
241 final CheckedFuture<Boolean, RolePushException> rolePushResultChecked =
242 RoleUtil.makeCheckedRuleRequestFxResult(rolePushResult);
243 Futures.addCallback(rolePushResult, new FutureCallback<Boolean>(){
245 public void onSuccess(Boolean result){
246 LOG.info("onDeviceOwnershipChanged: Controller is successfully set as a " +
247 "MASTER controller for {}", targetSwitchDPId);
248 entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
249 entsession.get(entity).getOfSwitch().sendEmptyTableFeatureRequest();
250 sendNodeAddedNotification(entsession.get(entity));
254 public void onFailure(Throwable e){
256 LOG.warn("onDeviceOwnershipChanged: Controller is not able to set the " +
257 "MASTER role for {}.", targetSwitchDPId,e);
258 if(newRole == OfpRole.BECOMEMASTER) {
259 LOG.info("onDeviceOwnershipChanged: ..and this *instance* is owner of the device {}. " +
260 "Closing the registration, so other entity can become owner " +
261 "and attempt to be master controller.",targetSwitchDPId);
263 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
264 if (ownershipRegistrent != null) {
265 ownershipRegistrent.close();
266 MDSwitchMetaData switchMetadata = entsession.get(entity);
267 if(switchMetadata != null){
268 //We can probably leave deregistration till the node ownerhsip change.
269 //But that can probably cause some race condition.
270 deregisterRoutedRPCForSwitch(switchMetadata);
274 LOG.info("onDeviceOwnershipChanged: ..and registering it back to participate in " +
275 "ownership and re-try");
277 EntityOwnershipCandidateRegistration entityRegistration;
279 entityRegistration = entityOwnershipService.registerCandidate(entity);
280 entRegistrationMap.put(entity, entityRegistration);
281 LOG.info("onDeviceOwnershipChanged: re-registered candidate for " +
282 "ownership of the {}", targetSwitchDPId );
283 } catch (CandidateAlreadyRegisteredException ex) {
284 // we can log and move for this error, as listener is present and role changes will be served.
285 LOG.error("onDeviceOwnershipChanged: *Surprisingly* Entity is already " +
286 "registered with EntityOwnershipService : {}", targetSwitchDPId, ex );
290 LOG.error("onDeviceOwnershipChanged : Not able to set role {} for " +
291 " {}", newRole == OfpRole.BECOMEMASTER?"MASTER":"SLAVE", targetSwitchDPId);
296 LOG.warn("onDeviceOwnershipChanged: sessionContext is not available. Releasing ownership of the device");
297 EntityOwnershipCandidateRegistration ownershipRegistrent = entRegistrationMap.get(entity);
298 if (ownershipRegistrent != null) {
299 ownershipRegistrent.close();
304 public void unregisterEntityOwnershipRequest(NodeId nodeId) {
305 Entity entity = new Entity(DEVICE_TYPE, nodeId.getValue());
306 entsession.remove(entity);
307 EntityOwnershipCandidateRegistration entRegCandidate = entRegistrationMap.get(entity);
308 if(entRegCandidate != null){
309 LOG.info("unregisterEntityOwnershipRequest: Unregister controller entity ownership " +
310 "request for {}", nodeId);
311 entRegCandidate.close();
312 entRegistrationMap.remove(entity);
317 public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
318 final Throwable cause) {
322 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
326 private void registerRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
327 // Routed RPC registration is only done when *this* instance is owner of
329 if(entityMetadata.getOfSwitch().isEntityOwner()) {
330 if (!entityMetadata.isRPCRegistrationDone.get()) {
331 entityMetadata.setIsRPCRegistrationDone(true);
332 CompositeObjectRegistration<ModelDrivenSwitch> registration =
333 entityMetadata.getOfSwitch().register(entityMetadata.getRpcProviderRegistry());
335 entityMetadata.getContext().setProviderRegistration(registration);
337 LOG.info("registerRoutedRPCForSwitch: Registered routed rpc for ModelDrivenSwitch {}",
338 entityMetadata.getOfSwitch().getNodeId().getValue());
341 LOG.info("registerRoutedRPCForSwitch: Skipping routed rpc registration for ModelDrivenSwitch {}",
342 entityMetadata.getOfSwitch().getNodeId().getValue());
346 private void deregisterRoutedRPCForSwitch(MDSwitchMetaData entityMetadata) {
348 CompositeObjectRegistration<ModelDrivenSwitch> registration = entityMetadata.getContext().getProviderRegistration();
349 if (null != registration) {
350 registration.close();
351 entityMetadata.getContext().setProviderRegistration(null);
353 LOG.info("deregisterRoutedRPCForSwitch: De-registered routed rpc for ModelDrivenSwitch {}",
354 entityMetadata.getOfSwitch().getNodeId().getValue());
357 private void sendNodeAddedNotification(MDSwitchMetaData entityMetadata) {
358 //Node added notification need to be sent irrespective of whether
359 // *this* instance is owner of the entity or not. Because yang notifications
360 // are local, and we should maintain the behavior across the application.
361 LOG.info("sendNodeAddedNotification: Node Added notification is sent for ModelDrivenSwitch {}",
362 entityMetadata.getOfSwitch().getNodeId().getValue());
364 entityMetadata.getContext().getNotificationEnqueuer().enqueueNotification(
365 entityMetadata.getWrappedNotification());
367 //Send multipart request to get other details of the switch.
368 entityMetadata.getOfSwitch().requestSwitchDetails();
371 private class MDSwitchMetaData {
373 final private ModelDrivenSwitch ofSwitch;
374 final private SessionContext context;
375 final private NotificationQueueWrapper wrappedNotification;
376 final private RpcProviderRegistry rpcProviderRegistry;
377 final private AtomicBoolean isRPCRegistrationDone = new AtomicBoolean(false);
379 MDSwitchMetaData(ModelDrivenSwitch ofSwitch,
380 SessionContext context,
381 NotificationQueueWrapper wrappedNotification,
382 RpcProviderRegistry rpcProviderRegistry) {
383 this.ofSwitch = ofSwitch;
384 this.context = context;
385 this.wrappedNotification = wrappedNotification;
386 this.rpcProviderRegistry = rpcProviderRegistry;
389 public ModelDrivenSwitch getOfSwitch() {
393 public SessionContext getContext() {
397 public NotificationQueueWrapper getWrappedNotification() {
398 return wrappedNotification;
401 public RpcProviderRegistry getRpcProviderRegistry() {
402 return rpcProviderRegistry;
405 public AtomicBoolean getIsRPCRegistrationDone() {
406 return isRPCRegistrationDone;
409 public void setIsRPCRegistrationDone(boolean isRPCRegistrationDone) {
410 this.isRPCRegistrationDone.set(isRPCRegistrationDone);