-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
+
package org.opendaylight.openflowplugin.impl.role;
-import com.google.common.base.Preconditions;
-import java.util.concurrent.Semaphore;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
-import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
-import org.opendaylight.openflowplugin.api.openflow.OFPContext;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
+import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Role context hold information about entity ownership registration,
- * register and unregister candidate (main and tx)
- */
-class RoleContextImpl implements RoleContext {
-
+public class RoleContextImpl implements RoleContext {
private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
- private static final int TIMEOUT = 12;
+
+ // Timeout after what we will give up on propagating role
+ private static final long SET_ROLE_TIMEOUT = 10000;
private final DeviceInfo deviceInfo;
- private final EntityOwnershipService entityOwnershipService;
- private volatile EntityOwnershipCandidateRegistration entityOwnershipCandidateRegistration = null;
- private volatile EntityOwnershipCandidateRegistration txEntityOwnershipCandidateRegistration = null;
+ private final HashedWheelTimer timer;
+ private final AtomicReference<ListenableFuture<RpcResult<SetRoleOutput>>> lastRoleFuture = new AtomicReference<>();
+ private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
+ private final Timeout slaveTask;
+ private ContextChainMastershipWatcher contextChainMastershipWatcher;
+ private SalRoleService roleService;
+
+ RoleContextImpl(@Nonnull final DeviceInfo deviceInfo,
+ @Nonnull final HashedWheelTimer timer,
+ final long checkRoleMasterTimeout) {
+ this.deviceInfo = deviceInfo;
+ this.timer = timer;
+ slaveTask = timer.newTimeout((timerTask) -> makeDeviceSlave(), checkRoleMasterTimeout, TimeUnit.MILLISECONDS);
- private final Entity entity;
- private final Entity txEntity;
+ LOG.info("Started timer for setting SLAVE role on device {} if no role will be set in {}s.",
+ deviceInfo,
+ checkRoleMasterTimeout / 1000L);
+ }
- private SalRoleService salRoleService = null;
+ @Override
+ public DeviceInfo getDeviceInfo() {
+ return deviceInfo;
+ }
- private final Semaphore roleChangeGuard = new Semaphore(1, true);
+ @Override
+ public void setRoleService(final SalRoleService salRoleService) {
+ roleService = salRoleService;
+ }
- private final LifecycleConductor conductor;
- private volatile CONTEXT_STATE contextState;
+ @Override
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+ this.contextChainMastershipWatcher = newWatcher;
+ }
- RoleContextImpl(final DeviceInfo deviceInfo, final EntityOwnershipService entityOwnershipService, final Entity entity, final Entity txEntity, final LifecycleConductor lifecycleConductor) {
- this.entityOwnershipService = entityOwnershipService;
- this.entity = entity;
- this.txEntity = txEntity;
- this.deviceInfo = deviceInfo;
- this.conductor = lifecycleConductor;
- contextState = CONTEXT_STATE.INITIALIZATION;
+ @Override
+ public void close() {
+ changeLastRoleFuture(null);
+ requestContexts.forEach(requestContext -> RequestContextUtil
+ .closeRequestContextWithRpcError(requestContext, "Connection closed."));
+ requestContexts.clear();
}
@Override
- public boolean initialization() {
- LOG.info("Initialization main candidate for node {}", deviceInfo.getNodeId());
- contextState = CONTEXT_STATE.WORKING;
- return registerCandidate(this.entity);
+ public void instantiateServiceInstance() {
+ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
+ changeLastRoleFuture(future);
+ Futures.addCallback(future, new MasterRoleCallback(), MoreExecutors.directExecutor());
}
@Override
- public void unregisterAllCandidates() {
- LOG.info("Role context closed, unregistering all candidates for ownership for node {}", deviceInfo.getNodeId());
- if (isMainCandidateRegistered()) {
- unregisterCandidate(this.entity);
- }
- if (isTxCandidateRegistered()) {
- unregisterCandidate(this.txEntity);
- }
+ public ListenableFuture<Void> closeServiceInstance() {
+ changeLastRoleFuture(null);
+ return Futures.immediateFuture(null);
}
- @Nullable
@Override
public <T> RequestContext<T> createRequestContext() {
- return new AbstractRequestContext<T>(conductor.reserveXidForDeviceMessage(deviceInfo)) {
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
@Override
public void close() {
+ requestContexts.remove(this);
}
};
- }
- @Override
- public void setSalRoleService(@Nonnull final SalRoleService salRoleService) {
- Preconditions.checkNotNull(salRoleService);
- this.salRoleService = salRoleService;
+ requestContexts.add(ret);
+ return ret;
}
+ @Nonnull
@Override
- public SalRoleService getSalRoleService() {
- return this.salRoleService;
+ public ServiceGroupIdentifier getIdentifier() {
+ return deviceInfo.getServiceIdentifier();
}
- @Override
- public Entity getEntity() {
- return this.entity;
- }
+ private void changeLastRoleFuture(final ListenableFuture<RpcResult<SetRoleOutput>> newFuture) {
+ slaveTask.cancel();
+ lastRoleFuture.getAndUpdate(lastFuture -> {
+ if (Objects.nonNull(lastFuture) && !lastFuture.isCancelled() && !lastFuture.isDone()) {
+ lastFuture.cancel(true);
+ }
- @Override
- public Entity getTxEntity() {
- return this.txEntity;
+ return newFuture;
+ });
}
- @Override
- public DeviceInfo getDeviceInfo() {
- return deviceInfo;
+ private ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
+ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ changeLastRoleFuture(future);
+ Futures.addCallback(future, new SlaveRoleCallback(), MoreExecutors.directExecutor());
+ return future;
}
- @Override
- public boolean isMainCandidateRegistered() {
- return entityOwnershipCandidateRegistration != null;
- }
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo);
- @Override
- public boolean isTxCandidateRegistered() {
- return txEntityOwnershipCandidateRegistration != null;
- }
+ if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
+ final SetRoleInput setRoleInput = new SetRoleInputBuilder()
+ .setControllerRole(newRole)
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier()))
+ .build();
- @Override
- public boolean registerCandidate(final Entity entity_) {
- boolean permit = false;
- try {
- permit = roleChangeGuard.tryAcquire(TIMEOUT, TimeUnit.SECONDS);
- if(permit) {
- LOG.debug("Register candidate for entity {}", entity_);
- if (entity_.equals(this.entity)) {
- entityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity_);
- } else {
- txEntityOwnershipCandidateRegistration = entityOwnershipService.registerCandidate(entity_);
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = roleService.setRole(setRoleInput);
+
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during {} sec", newRole,
+ deviceInfo, SET_ROLE_TIMEOUT);
+ setRoleOutputFuture.cancel(true);
}
- } else {
- return false;
- }
- } catch (final CandidateAlreadyRegisteredException e) {
- LOG.warn("Candidate for entity {} is already registered.", entity_.getType());
- return false;
- } catch (final InterruptedException e) {
- LOG.warn("Cannot acquire semaphore for register entity {} candidate.", entity_.getType());
- return false;
- } finally {
- if (permit) {
- roleChangeGuard.release();
- }
+ };
+
+ timer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.MILLISECONDS);
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
}
- return true;
+
+ LOG.info("Device: {} with version: {} does not support role {}", deviceInfo, deviceInfo.getVersion(), newRole);
+ return Futures.immediateFuture(null);
}
- @Override
- public boolean unregisterCandidate(final Entity entity_) {
- boolean permit = false;
- try {
- permit = roleChangeGuard.tryAcquire(TIMEOUT, TimeUnit.SECONDS);
- if(permit) {
- if (entity_.equals(this.entity)) {
- if (entityOwnershipCandidateRegistration != null) {
- LOG.debug("Unregister candidate for entity {}", entity_);
- entityOwnershipCandidateRegistration.close();
- entityOwnershipCandidateRegistration = null;
- }
- } else {
- if (txEntityOwnershipCandidateRegistration != null) {
- LOG.debug("Unregister candidate for tx entity {}", entity_);
- txEntityOwnershipCandidateRegistration.close();
- txEntityOwnershipCandidateRegistration = null;
- }
- }
- } else {
- return false;
- }
- } catch (final InterruptedException e) {
- LOG.warn("Cannot acquire semaphore for unregister entity {} candidate.", entity_.getType());
- return false;
- } finally {
- if (permit) {
- roleChangeGuard.release();
- }
+ private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ contextChainMastershipWatcher.onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE);
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
}
- return true;
- }
- @Override
- public void close() {
- contextState = CONTEXT_STATE.TERMINATION;
- unregisterAllCandidates();
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ if (!(throwable instanceof CancellationException)) {
+ contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
+ deviceInfo,
+ "Was not able to propagate MASTER role on device. Error: " + throwable.toString());
+ }
+ }
}
- public boolean isMaster(){
- return (txEntityOwnershipCandidateRegistration != null && entityOwnershipCandidateRegistration != null);
- }
+ private final class SlaveRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ @Override
+ public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
+ contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
+ LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
+ }
- @Override
- public CONTEXT_STATE getState() {
- return contextState;
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ if (!(throwable instanceof CancellationException)) {
+ contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo,
+ "Was not able to propagate SLAVE role on device. Error: " + throwable.toString());
+ }
+ }
}
-}
+}
\ No newline at end of file