*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.math.BigInteger;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import java.util.concurrent.Semaphore;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.impl.role.RoleChangeException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.slf4j.LoggerFactory;
-public class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput, SetRoleOutput> implements SalRoleService {
+public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput, SetRoleOutput> implements SalRoleService {
private static final Logger LOG = LoggerFactory.getLogger(SalRoleServiceImpl.class);
private final DeviceContext deviceContext;
private final RoleService roleService;
- private final AtomicReference<OfpRole> lastKnownRoleRef = new AtomicReference<>(OfpRole.NOCHANGE);
- private final ListeningExecutorService listeningExecutorService;
- private final NodeId nodeId;
- private final Short version;
+
+ private final Semaphore currentRoleGuard = new Semaphore(1, true);
+
+ @GuardedBy("currentRoleGuard")
+ private OfpRole currentRole = OfpRole.NOCHANGE;
public SalRoleServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
super(requestContextStack, deviceContext, SetRoleOutput.class);
- this.deviceContext = deviceContext;
+ this.deviceContext = Preconditions.checkNotNull(deviceContext);
this.roleService = new RoleService(requestContextStack, deviceContext, RoleRequestOutput.class);
- nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
- version = deviceContext.getPrimaryConnectionContext().getFeatures().getVersion();
- listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
-
}
@Override
- protected OfHeader buildRequest(Xid xid, SetRoleInput input) {
+ protected OfHeader buildRequest(final Xid xid, final SetRoleInput input) throws ServiceException {
return null;
}
- public static BigInteger getNextGenerationId(BigInteger generationId) {
- BigInteger nextGenerationId = null;
- if (generationId.compareTo(MAX_GENERATION_ID) < 0) {
- nextGenerationId = generationId.add(BigInteger.ONE);
- } else {
- nextGenerationId = BigInteger.ZERO;
- }
-
- return nextGenerationId;
- }
-
-
@Override
public Future<RpcResult<SetRoleOutput>> setRole(final SetRoleInput input) {
LOG.info("SetRole called with input:{}", input);
- OfpRole lastKnownRole = lastKnownRoleRef.get();
-
+ try {
+ currentRoleGuard.acquire();
+ LOG.trace("currentRole lock queue length: {} " + currentRoleGuard.getQueueLength());
+ } catch (final InterruptedException e) {
+ LOG.error("Unexpected exception {} for acquire semaphore for input {}", e, input);
+ return RpcResultBuilder.<SetRoleOutput> failed().buildFuture();
+ }
// compare with last known role and set if different. If they are same, then return.
- if (lastKnownRoleRef.compareAndSet(input.getControllerRole(), input.getControllerRole())) {
- LOG.info("Role to be set is same as the last known role for the device:{}. Hence ignoring.", input.getControllerRole());
- SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture.create();
- resultFuture.set(RpcResultBuilder.<SetRoleOutput>success().build());
- return resultFuture;
+ if (currentRole.equals(input.getControllerRole())) {
+ LOG.info("Role to be set is same as the last known role for the device:{}. Hence ignoring.",
+ input.getControllerRole());
+ currentRoleGuard.release();
+ return RpcResultBuilder.<SetRoleOutput> success().buildFuture();
}
- final SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture.create();
-
- RoleChangeTask roleChangeTask = new RoleChangeTask(nodeId, input.getControllerRole(), version, roleService);
+ final SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture.<RpcResult<SetRoleOutput>> create();
+ repeaterForChangeRole(resultFuture, input, 0);
+ /* Add Callback for release Guard */
+ Futures.addCallback(resultFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
- do {
- ListenableFuture<RpcResult<SetRoleOutput>> deviceCheck = deviceConnectionCheck();
- if (deviceCheck != null) {
- LOG.info("Device {} is disconnected or state is not valid. Giving up on role change", input.getNode());
- return deviceCheck;
+ @Override
+ public void onSuccess(final RpcResult<SetRoleOutput> result) {
+ LOG.debug("SetRoleService for Node: {} is ok Role: {}", input.getNode().getValue(),
+ input.getControllerRole());
+ currentRoleGuard.release();
}
- ListenableFuture<SetRoleOutput> taskFuture = listeningExecutorService.submit(roleChangeTask);
- LOG.info("RoleChangeTask submitted for execution");
- CheckedFuture<SetRoleOutput, RoleChangeException> taskFutureChecked = makeCheckedFuture(taskFuture);
- try {
- SetRoleOutput setRoleOutput = taskFutureChecked.checkedGet(10, TimeUnit.SECONDS);
- LOG.info("setRoleOutput received after roleChangeTask execution:{}", setRoleOutput);
- resultFuture.set(RpcResultBuilder.<SetRoleOutput>success().withResult(setRoleOutput).build());
- lastKnownRoleRef.set(input.getControllerRole());
- return resultFuture;
-
- } catch (TimeoutException | RoleChangeException e) {
- roleChangeTask.incrementRetryCounter();
- LOG.info("Exception in setRole(), will retry:" + (MAX_RETRIES - roleChangeTask.getRetryCounter()) + " times.", e);
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("SetRoleService set Role {} for Node: {} fail . Reason {}", input.getControllerRole(),
+ input.getNode().getValue(), t);
+ currentRoleGuard.release();
}
-
- } while (roleChangeTask.getRetryCounter() < MAX_RETRIES);
-
- resultFuture.setException(new RoleChangeException("Set Role failed after " + MAX_RETRIES + "tries on device " + input.getNode().getValue()));
-
+ });
return resultFuture;
}
- private ListenableFuture<RpcResult<SetRoleOutput>> deviceConnectionCheck() {
- if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
- ListenableFuture<RpcResult<SetRoleOutput>> resultingFuture = SettableFuture.create();
- switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
- case RIP:
- final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
- deviceContext.getPrimaryConnectionContext().getConnectionState());
- resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
- break;
- default:
- resultingFuture = Futures.immediateCheckedFuture(RpcResultBuilder.<SetRoleOutput>failed().build());
- break;
- }
- return resultingFuture;
+ private void repeaterForChangeRole(final SettableFuture<RpcResult<SetRoleOutput>> future, final SetRoleInput input,
+ final int retryCounter) {
+ if (future.isCancelled()) {
+ future.setException(new RoleChangeException(String.format(
+ "Set Role for device %s stop because Future was canceled", input.getNode().getValue())));
+ return;
}
- return null;
- }
-
- class RoleChangeTask implements Callable<SetRoleOutput> {
-
- private final NodeId nodeId;
- private final OfpRole ofpRole;
- private final Short version;
- private final RoleService roleService;
- private int retryCounter = 0;
-
- public RoleChangeTask(NodeId nodeId, OfpRole ofpRole, Short version, RoleService roleService) {
- this.nodeId = nodeId;
- this.ofpRole = ofpRole;
- this.version = version;
- this.roleService = roleService;
+ if (retryCounter >= MAX_RETRIES) {
+ future.setException(new RoleChangeException(String.format("Set Role failed after %s tries on device %s",
+ MAX_RETRIES, input.getNode().getValue())));
+ return;
+ }
+ // Check current connection state
+ final CONNECTION_STATE state = deviceContext.getPrimaryConnectionContext().getConnectionState();
+ switch (state) {
+ case RIP:
+ LOG.info("Device {} has been disconnected", input.getNode());
+ future.setException(new Exception(String.format(
+ "Device connection doesn't exist anymore. Primary connection status : %s", state)));
+ return;
+ case WORKING:
+ // We can proceed
+ LOG.trace("Device {} has been working", input.getNode());
+ break;
+ default:
+ LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state);
+ future.setException(new Exception(String.format("Unexcpected device connection status : %s", state)));
+ return;
}
- @Override
- public SetRoleOutput call() throws RoleChangeException {
- LOG.info("RoleChangeTask called on device:{} OFPRole:{}", this.nodeId.getValue(), ofpRole);
-
- // we cannot move ahead without having the generation id, so block the thread till we get it.
- BigInteger generationId = null;
- SetRoleOutput setRoleOutput = null;
-
- try {
- generationId = this.roleService.getGenerationIdFromDevice(version).get(10, TimeUnit.SECONDS);
- LOG.info("RoleChangeTask, GenerationIdFromDevice from device is {}", generationId);
-
- } catch (Exception e ) {
- LOG.info("Exception in getting generationId for device:{}. Ex:{}" + this.nodeId.getValue(), e);
- throw new RoleChangeException("Exception in getting generationId for device:"+ this.nodeId.getValue(), e);
+ LOG.info("Requesting state change to {}", input.getControllerRole());
+ final ListenableFuture<RpcResult<SetRoleOutput>> changeRoleFuture = tryToChangeRole(input.getControllerRole());
+ Futures.addCallback(changeRoleFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
+
+ @Override
+ public void onSuccess(final RpcResult<SetRoleOutput> result) {
+ if (result.isSuccessful()) {
+ LOG.debug("setRoleOutput received after roleChangeTask execution:{}", result);
+ currentRole = input.getControllerRole();
+ future.set(RpcResultBuilder.<SetRoleOutput> success().withResult(result.getResult()).build());
+ } else {
+ LOG.error("setRole() failed with errors, will retry: {} times.", MAX_RETRIES - retryCounter);
+ repeaterForChangeRole(future, input, (retryCounter + 1));
+ }
}
-
- LOG.info("GenerationId received from device:{} is {}", nodeId.getValue(), generationId);
-
- final BigInteger nextGenerationId = getNextGenerationId(generationId);
-
- LOG.info("nextGenerationId received from device:{} is {}", nodeId.getValue(), nextGenerationId);
-
- try {
- setRoleOutput = roleService.submitRoleChange(ofpRole, version, nextGenerationId).get(10 , TimeUnit.SECONDS);
- LOG.info("setRoleOutput after submitRoleChange:{}", setRoleOutput);
-
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Exception in making role change for device", e);
- throw new RoleChangeException("Exception in making role change for device:" + nodeId.getValue());
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Exception in setRole(), will retry: {} times.", t, MAX_RETRIES - retryCounter);
+ repeaterForChangeRole(future, input, (retryCounter + 1));
}
+ });
+ }
- return setRoleOutput;
+ private ListenableFuture<RpcResult<SetRoleOutput>> tryToChangeRole(final OfpRole role) {
+ LOG.info("RoleChangeTask called on device:{} OFPRole:{}", getDeviceInfo().getNodeId().getValue(), role);
- }
-
- public void incrementRetryCounter() {
- this.retryCounter = retryCounter + 1;
- }
+ final Future<BigInteger> generationFuture = roleService.getGenerationIdFromDevice(getVersion());
- public int getRetryCounter() {
- return retryCounter;
- }
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(generationFuture), (AsyncFunction<BigInteger, RpcResult<SetRoleOutput>>) generationId -> {
+ LOG.debug("RoleChangeTask, GenerationIdFromDevice from device {} is {}", getDeviceInfo().getNodeId().getValue(), generationId);
+ final BigInteger nextGenerationId = getNextGenerationId(generationId);
+ LOG.debug("nextGenerationId received from device:{} is {}", getDeviceInfo().getNodeId().getValue(), nextGenerationId);
+ final Future<RpcResult<SetRoleOutput>> submitRoleFuture = roleService.submitRoleChange(role, getVersion(), nextGenerationId);
+ return JdkFutureAdapters.listenInPoolThread(submitRoleFuture);
+ });
}
- public static CheckedFuture<SetRoleOutput, RoleChangeException> makeCheckedFuture(ListenableFuture<SetRoleOutput> rolePushResult) {
- return Futures.makeChecked(rolePushResult,
- new Function<Exception, RoleChangeException>() {
- @Override
- public RoleChangeException apply(Exception input) {
- RoleChangeException output = null;
- if (input instanceof ExecutionException) {
- if (input.getCause() instanceof RoleChangeException) {
- output = (RoleChangeException) input.getCause();
- }
- }
-
- if (output == null) {
- output = new RoleChangeException(input.getMessage(), input);
- }
-
- return output;
- }
- });
+ private static BigInteger getNextGenerationId(final BigInteger generationId) {
+ if (generationId.compareTo(MAX_GENERATION_ID) < 0) {
+ return generationId.add(BigInteger.ONE);
+ } else {
+ return BigInteger.ZERO;
+ }
}
}