X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Frole%2FRoleContextImpl.java;h=b636c9fb3021aa379c9651fca999c8ff84be70ca;hb=213cd74e317081d8952b9036adba7775aad72f1a;hp=5ddf504fbb066de0a50927683df0f363e042e2c9;hpb=c6f3d36b08270ee45900a789757c81474e72b4aa;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java index 5ddf504fbb..b636c9fb30 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java @@ -7,80 +7,71 @@ */ package org.opendaylight.openflowplugin.impl.role; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.HashedWheelTimer; +import io.netty.util.TimerTask; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; -import org.opendaylight.controller.md.sal.common.api.clustering.Entity; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.openflowplugin.api.openflow.OFPContext; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.api.openflow.role.RoleContext; +import org.opendaylight.openflowplugin.api.openflow.role.RoleManager; import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Role context hold information about entity ownership registration, - * register and unregister candidate (main and tx) + * Role context try to make change device role on device */ class RoleContextImpl implements RoleContext { private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class); - private static final int TIMEOUT = 12; - - private final DeviceInfo deviceInfo; - private final EntityOwnershipService entityOwnershipService; - private volatile EntityOwnershipCandidateRegistration entityOwnershipCandidateRegistration = null; - private volatile EntityOwnershipCandidateRegistration txEntityOwnershipCandidateRegistration = null; - - private final Entity entity; - private final Entity txEntity; private SalRoleService salRoleService = null; - - private final Semaphore roleChangeGuard = new Semaphore(1, true); - - private final LifecycleConductor conductor; - private volatile CONTEXT_STATE contextState; - - RoleContextImpl(final DeviceInfo deviceInfo, final EntityOwnershipService entityOwnershipService, final Entity entity, final Entity txEntity, final LifecycleConductor lifecycleConductor) { - this.entityOwnershipService = entityOwnershipService; - this.entity = entity; - this.txEntity = txEntity; + private final HashedWheelTimer hashedWheelTimer; + private final DeviceInfo deviceInfo; + private CONTEXT_STATE state; + private final RoleManager myManager; + private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; + private final LifecycleService lifecycleService; + + RoleContextImpl(final DeviceInfo deviceInfo, + final HashedWheelTimer hashedWheelTimer, + final RoleManager myManager, + final LifecycleService lifecycleService) { this.deviceInfo = deviceInfo; - this.conductor = lifecycleConductor; - contextState = CONTEXT_STATE.INITIALIZATION; - } - - @Override - public boolean initialization() { - LOG.info("Initialization main candidate for node {}", deviceInfo.getNodeId()); - contextState = CONTEXT_STATE.WORKING; - return registerCandidate(this.entity); - } - - @Override - public void unregisterAllCandidates() { - LOG.info("Role context closed, unregistering all candidates for ownership for node {}", deviceInfo.getNodeId()); - if (isMainCandidateRegistered()) { - unregisterCandidate(this.entity); - } - if (isTxCandidateRegistered()) { - unregisterCandidate(this.txEntity); - } + this.state = CONTEXT_STATE.WORKING; + this.myManager = myManager; + this.hashedWheelTimer = hashedWheelTimer; + this.lifecycleService = lifecycleService; } @Nullable @Override public RequestContext createRequestContext() { - return new AbstractRequestContext(conductor.reserveXidForDeviceMessage(deviceInfo)) { + return new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { @Override public void close() { } @@ -94,109 +85,120 @@ class RoleContextImpl implements RoleContext { } @Override - public SalRoleService getSalRoleService() { - return this.salRoleService; + public CONTEXT_STATE getState() { + return this.state; } @Override - public Entity getEntity() { - return this.entity; + public void setState(CONTEXT_STATE state) { + this.state = state; } @Override - public Entity getTxEntity() { - return this.txEntity; + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); } @Override public DeviceInfo getDeviceInfo() { - return deviceInfo; + return this.deviceInfo; } - @Override - public boolean isMainCandidateRegistered() { - return entityOwnershipCandidateRegistration != null; + public void startupClusterServices() throws ExecutionException, InterruptedException { + Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback()); } @Override - public boolean isTxCandidateRegistered() { - return txEntityOwnershipCandidateRegistration != null; - } + public ListenableFuture stopClusterServices(final boolean deviceDisconnected) { + + if (!deviceDisconnected) { + ListenableFuture future = Futures.transform(makeDeviceSlave(), new Function, Void>() { + @Nullable + @Override + public Void apply(@Nullable RpcResult setRoleOutputRpcResult) { + return null; + } + }); - @Override - public boolean registerCandidate(final Entity entity_) { - boolean permit = false; - try { - permit = roleChangeGuard.tryAcquire(TIMEOUT, TimeUnit.SECONDS); - if(permit) { - LOG.debug("Register candidate for entity {}", entity_); - if (entity_.equals(this.entity)) { - entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity_); - } else { - txEntityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity_); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void aVoid) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue()); + } } - } else { - return false; - } - } catch (final CandidateAlreadyRegisteredException e) { - LOG.warn("Candidate for entity {} is already registered.", entity_.getType()); - return false; - } catch (final InterruptedException e) { - LOG.warn("Cannot acquire semaphore for register entity {} candidate.", entity_.getType()); - return false; - } finally { - if (permit) { - roleChangeGuard.release(); - } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue()); + LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable); + myManager.removeDeviceFromOperationalDS(deviceInfo); + } + }); + return future; + } else { + return myManager.removeDeviceFromOperationalDS(deviceInfo); } - return true; } @Override - public boolean unregisterCandidate(final Entity entity_) { - boolean permit = false; - try { - permit = roleChangeGuard.tryAcquire(TIMEOUT, TimeUnit.SECONDS); - if(permit) { - if (entity_.equals(this.entity)) { - if (entityOwnershipCandidateRegistration != null) { - LOG.debug("Unregister candidate for entity {}", entity_); - entityOwnershipCandidateRegistration.close(); - entityOwnershipCandidateRegistration = null; - } - } else { - if (txEntityOwnershipCandidateRegistration != null) { - LOG.debug("Unregister candidate for tx entity {}", entity_); - txEntityOwnershipCandidateRegistration.close(); - txEntityOwnershipCandidateRegistration = null; - } + public ListenableFuture> makeDeviceSlave(){ + return sendRoleChangeToDevice(OfpRole.BECOMESLAVE); + } + + @VisibleForTesting + ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId()); + } + final Future> setRoleOutputFuture; + if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) { + final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole) + .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build(); + setRoleOutputFuture = this.salRoleService.setRole(setRoleInput); + final TimerTask timerTask = timeout -> { + if (!setRoleOutputFuture.isDone()) { + LOG.warn("New role {} was not propagated to device {} during 5 sec", newRole, deviceInfo.getLOGValue()); + setRoleOutputFuture.cancel(true); } - } else { - return false; - } - } catch (final InterruptedException e) { - LOG.warn("Cannot acquire semaphore for unregister entity {} candidate.", entity_.getType()); - return false; - } finally { - if (permit) { - roleChangeGuard.release(); - } + }; + hashedWheelTimer.newTimeout(timerTask, 5, TimeUnit.SECONDS); + } else { + LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion()); + return Futures.immediateFuture(null); } - return true; + return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture); } @Override - public void close() { - contextState = CONTEXT_STATE.TERMINATION; - unregisterAllCandidates(); + public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { + this.clusterInitializationPhaseHandler = handler; } - public boolean isMaster(){ - return (txEntityOwnershipCandidateRegistration != null && entityOwnershipCandidateRegistration != null); + @Override + public boolean onContextInstantiateService(final ConnectionContext connectionContext) { + + if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) { + LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue()); + return false; + } + + Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback()); + return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext); } - @Override - public CONTEXT_STATE getState() { - return contextState; + private class RpcResultFutureCallback implements FutureCallback> { + @Override + public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue()); + } + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue()); + lifecycleService.closeConnection(); + } } }