package org.opendaylight.openflowplugin.impl.services;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import java.math.BigInteger;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.slf4j.LoggerFactory;
-public class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput, SetRoleOutput> implements SalRoleService {
+public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput, SetRoleOutput> implements SalRoleService {
private static final Logger LOG = LoggerFactory.getLogger(SalRoleServiceImpl.class);
private static final int MAX_RETRIES = 42;
+ private static final Function<Exception, RoleChangeException> EXCEPTION_FUNCTION = new Function<Exception, RoleChangeException>() {
+ @Override
+ public RoleChangeException apply(final Exception input) {
+ if (input instanceof ExecutionException) {
+ final Throwable cause = input.getCause();
+ if (cause instanceof RoleChangeException) {
+ return (RoleChangeException) cause;
+ }
+ } else if (input instanceof RoleChangeException) {
+ return (RoleChangeException) input;
+ }
+
+ return new RoleChangeException(input.getMessage(), input);
+ }
+ };
+
private final DeviceContext deviceContext;
private final RoleService roleService;
private final AtomicReference<OfpRole> lastKnownRoleRef = new AtomicReference<>(OfpRole.NOCHANGE);
public SalRoleServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
super(requestContextStack, deviceContext, SetRoleOutput.class);
- this.deviceContext = deviceContext;
+ this.deviceContext = Preconditions.checkNotNull(deviceContext);
this.roleService = new RoleService(requestContextStack, deviceContext, RoleRequestOutput.class);
nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
version = deviceContext.getPrimaryConnectionContext().getFeatures().getVersion();
listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
-
}
@Override
- protected OfHeader buildRequest(Xid xid, SetRoleInput input) {
+ protected OfHeader buildRequest(final Xid xid, final SetRoleInput input) {
return null;
}
- public static BigInteger getNextGenerationId(BigInteger generationId) {
- BigInteger nextGenerationId = null;
- if (generationId.compareTo(MAX_GENERATION_ID) < 0) {
- nextGenerationId = generationId.add(BigInteger.ONE);
- } else {
- nextGenerationId = BigInteger.ZERO;
- }
-
- return nextGenerationId;
- }
-
-
@Override
public Future<RpcResult<SetRoleOutput>> setRole(final SetRoleInput input) {
LOG.info("SetRole called with input:{}", input);
- OfpRole lastKnownRole = lastKnownRoleRef.get();
// compare with last known role and set if different. If they are same, then return.
if (lastKnownRoleRef.compareAndSet(input.getControllerRole(), input.getControllerRole())) {
LOG.info("Role to be set is same as the last known role for the device:{}. Hence ignoring.", input.getControllerRole());
- SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture.create();
- resultFuture.set(RpcResultBuilder.<SetRoleOutput>success().build());
- return resultFuture;
+ return Futures.immediateFuture(RpcResultBuilder.<SetRoleOutput>success().build());
}
- final SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture.create();
-
- RoleChangeTask roleChangeTask = new RoleChangeTask(nodeId, input.getControllerRole(), version, roleService);
+ RoleChangeTask roleChangeTask = new RoleChangeTask(input.getControllerRole());
do {
- ListenableFuture<RpcResult<SetRoleOutput>> deviceCheck = deviceConnectionCheck();
- if (deviceCheck != null) {
- LOG.info("Device {} is disconnected or state is not valid. Giving up on role change", input.getNode());
- return deviceCheck;
+ // Check current connection state
+ final CONNECTION_STATE state = deviceContext.getPrimaryConnectionContext().getConnectionState();
+ switch (state) {
+ case RIP:
+ LOG.info("Device {} has been disconnected", input.getNode());
+ return Futures.immediateFailedFuture(new Exception(String.format(
+ "Device connection doesn't exist anymore. Primary connection status : %s", state)));
+ case WORKING:
+ // We can proceed
+ break;
+ default:
+ LOG.info("Device {} is in state {}, role change is not allowed", input.getNode(), state);
+ return Futures.immediateCheckedFuture(RpcResultBuilder.<SetRoleOutput>failed().build());
}
ListenableFuture<SetRoleOutput> taskFuture = listeningExecutorService.submit(roleChangeTask);
LOG.info("RoleChangeTask submitted for execution");
- CheckedFuture<SetRoleOutput, RoleChangeException> taskFutureChecked = makeCheckedFuture(taskFuture);
+ CheckedFuture<SetRoleOutput, RoleChangeException> taskFutureChecked = Futures.makeChecked(taskFuture, EXCEPTION_FUNCTION);
try {
SetRoleOutput setRoleOutput = taskFutureChecked.checkedGet(10, TimeUnit.SECONDS);
LOG.info("setRoleOutput received after roleChangeTask execution:{}", setRoleOutput);
- resultFuture.set(RpcResultBuilder.<SetRoleOutput>success().withResult(setRoleOutput).build());
lastKnownRoleRef.set(input.getControllerRole());
- return resultFuture;
+ return Futures.immediateFuture(RpcResultBuilder.<SetRoleOutput>success().withResult(setRoleOutput).build());
} catch (TimeoutException | RoleChangeException e) {
roleChangeTask.incrementRetryCounter();
- LOG.info("Exception in setRole(), will retry:" + (MAX_RETRIES - roleChangeTask.getRetryCounter()) + " times.", e);
+ LOG.info("Exception in setRole(), will retry: {} times.",
+ MAX_RETRIES - roleChangeTask.getRetryCounter(), e);
}
} while (roleChangeTask.getRetryCounter() < MAX_RETRIES);
- resultFuture.setException(new RoleChangeException("Set Role failed after " + MAX_RETRIES + "tries on device " + input.getNode().getValue()));
-
- return resultFuture;
+ return Futures.immediateFailedFuture(new RoleChangeException(
+ "Set Role failed after " + MAX_RETRIES + "tries on device " + input.getNode().getValue()));
}
- private ListenableFuture<RpcResult<SetRoleOutput>> deviceConnectionCheck() {
- if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
- ListenableFuture<RpcResult<SetRoleOutput>> resultingFuture = SettableFuture.create();
- switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
- case RIP:
- final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
- deviceContext.getPrimaryConnectionContext().getConnectionState());
- resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
- break;
- default:
- resultingFuture = Futures.immediateCheckedFuture(RpcResultBuilder.<SetRoleOutput>failed().build());
- break;
- }
- return resultingFuture;
+ private static BigInteger getNextGenerationId(final BigInteger generationId) {
+ if (generationId.compareTo(MAX_GENERATION_ID) < 0) {
+ return generationId.add(BigInteger.ONE);
+ } else {
+ return BigInteger.ZERO;
}
- return null;
}
- class RoleChangeTask implements Callable<SetRoleOutput> {
+ private final class RoleChangeTask implements Callable<SetRoleOutput> {
- private final NodeId nodeId;
private final OfpRole ofpRole;
- private final Short version;
- private final RoleService roleService;
private int retryCounter = 0;
- public RoleChangeTask(NodeId nodeId, OfpRole ofpRole, Short version, RoleService roleService) {
- this.nodeId = nodeId;
- this.ofpRole = ofpRole;
- this.version = version;
- this.roleService = roleService;
+ RoleChangeTask(final OfpRole ofpRole) {
+ this.ofpRole = Preconditions.checkNotNull(ofpRole);
}
@Override
public SetRoleOutput call() throws RoleChangeException {
- LOG.info("RoleChangeTask called on device:{} OFPRole:{}", this.nodeId.getValue(), ofpRole);
+ LOG.info("RoleChangeTask called on device:{} OFPRole:{}", nodeId.getValue(), ofpRole);
// we cannot move ahead without having the generation id, so block the thread till we get it.
- BigInteger generationId = null;
- SetRoleOutput setRoleOutput = null;
-
+ final BigInteger generationId;
try {
- generationId = this.roleService.getGenerationIdFromDevice(version).get(10, TimeUnit.SECONDS);
+ generationId = roleService.getGenerationIdFromDevice(version).get(10, TimeUnit.SECONDS);
LOG.info("RoleChangeTask, GenerationIdFromDevice from device is {}", generationId);
-
} catch (Exception e ) {
- LOG.info("Exception in getting generationId for device:{}. Ex:{}" + this.nodeId.getValue(), e);
- throw new RoleChangeException("Exception in getting generationId for device:"+ this.nodeId.getValue(), e);
+ LOG.info("Exception in getting generationId for device:{}", nodeId.getValue(), e);
+ throw new RoleChangeException("Exception in getting generationId for device:"+ nodeId.getValue(), e);
}
-
LOG.info("GenerationId received from device:{} is {}", nodeId.getValue(), generationId);
-
final BigInteger nextGenerationId = getNextGenerationId(generationId);
-
LOG.info("nextGenerationId received from device:{} is {}", nodeId.getValue(), nextGenerationId);
+ final SetRoleOutput setRoleOutput;
try {
setRoleOutput = roleService.submitRoleChange(ofpRole, version, nextGenerationId).get(10 , TimeUnit.SECONDS);
LOG.info("setRoleOutput after submitRoleChange:{}", setRoleOutput);
}
return setRoleOutput;
-
}
public void incrementRetryCounter() {
- this.retryCounter = retryCounter + 1;
+ this.retryCounter++;
}
public int getRetryCounter() {
return retryCounter;
}
}
-
- public static CheckedFuture<SetRoleOutput, RoleChangeException> makeCheckedFuture(ListenableFuture<SetRoleOutput> rolePushResult) {
- return Futures.makeChecked(rolePushResult,
- new Function<Exception, RoleChangeException>() {
- @Override
- public RoleChangeException apply(Exception input) {
- RoleChangeException output = null;
- if (input instanceof ExecutionException) {
- if (input.getCause() instanceof RoleChangeException) {
- output = (RoleChangeException) input.getCause();
- }
- }
-
- if (output == null) {
- output = new RoleChangeException(input.getMessage(), input);
- }
-
- return output;
- }
- });
- }
}