-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
+
package org.opendaylight.openflowplugin.impl.role;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
import io.netty.util.TimerTask;
-
-import java.util.concurrent.ExecutionException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.OFConstants;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
-import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
+import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Role context try to make change device role on device
- */
-class RoleContextImpl implements RoleContext {
-
+public class RoleContextImpl implements RoleContext {
private static final Logger LOG = LoggerFactory.getLogger(RoleContextImpl.class);
- // Timeout in seconds after what we will give up on propagating role
- private static final int SET_ROLE_TIMEOUT = 10;
+ // Timeout after what we will give up on propagating role
+ private static final long SET_ROLE_TIMEOUT = 10000;
- private SalRoleService salRoleService = null;
- private final HashedWheelTimer hashedWheelTimer;
private final DeviceInfo deviceInfo;
- private CONTEXT_STATE state;
- private final RoleManager myManager;
- private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
- private final LifecycleService lifecycleService;
-
- RoleContextImpl(final DeviceInfo deviceInfo,
- final HashedWheelTimer hashedWheelTimer,
- final RoleManager myManager,
- final LifecycleService lifecycleService) {
+ private final HashedWheelTimer timer;
+ private final AtomicReference<ListenableFuture<RpcResult<SetRoleOutput>>> lastRoleFuture = new AtomicReference<>();
+ private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
+ private final Timeout slaveTask;
+ private ContextChainMastershipWatcher contextChainMastershipWatcher;
+ private SalRoleService roleService;
+
+ RoleContextImpl(@Nonnull final DeviceInfo deviceInfo,
+ @Nonnull final HashedWheelTimer timer,
+ final long checkRoleMasterTimeout) {
this.deviceInfo = deviceInfo;
- this.state = CONTEXT_STATE.WORKING;
- this.myManager = myManager;
- this.hashedWheelTimer = hashedWheelTimer;
- this.lifecycleService = lifecycleService;
+ this.timer = timer;
+ slaveTask = timer.newTimeout((timerTask) -> makeDeviceSlave(), checkRoleMasterTimeout, TimeUnit.MILLISECONDS);
+
+ LOG.info("Started timer for setting SLAVE role on device {} if no role will be set in {}s.",
+ deviceInfo,
+ checkRoleMasterTimeout / 1000L);
}
- @Nullable
@Override
- public <T> RequestContext<T> createRequestContext() {
- return new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
- @Override
- public void close() {
- }
- };
+ public DeviceInfo getDeviceInfo() {
+ return deviceInfo;
}
@Override
- public void setSalRoleService(@Nonnull final SalRoleService salRoleService) {
- Preconditions.checkNotNull(salRoleService);
- this.salRoleService = salRoleService;
+ public void setRoleService(final SalRoleService salRoleService) {
+ roleService = salRoleService;
}
@Override
- public CONTEXT_STATE getState() {
- return this.state;
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+ this.contextChainMastershipWatcher = newWatcher;
}
@Override
- public void setState(CONTEXT_STATE state) {
- this.state = state;
+ public void close() {
+ changeLastRoleFuture(null);
+ requestContexts.forEach(requestContext -> RequestContextUtil
+ .closeRequestContextWithRpcError(requestContext, "Connection closed."));
+ requestContexts.clear();
}
@Override
- public ServiceGroupIdentifier getServiceIdentifier() {
- return this.deviceInfo.getServiceIdentifier();
+ public void instantiateServiceInstance() {
+ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER);
+ changeLastRoleFuture(future);
+ Futures.addCallback(future, new MasterRoleCallback(), MoreExecutors.directExecutor());
}
@Override
- public DeviceInfo getDeviceInfo() {
- return this.deviceInfo;
+ public ListenableFuture<Void> closeServiceInstance() {
+ changeLastRoleFuture(null);
+ return Futures.immediateFuture(null);
}
- public void startupClusterServices() throws ExecutionException, InterruptedException {
- Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
+ @Override
+ public <T> RequestContext<T> createRequestContext() {
+ final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
+ @Override
+ public void close() {
+ requestContexts.remove(this);
+ }
+ };
+
+ requestContexts.add(ret);
+ return ret;
}
+ @Nonnull
@Override
- public ListenableFuture<Void> stopClusterServices(final boolean deviceDisconnected) {
-
- if (!deviceDisconnected) {
- ListenableFuture<Void> future = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- return null;
- }
- });
-
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable Void aVoid) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
- }
- }
+ public ServiceGroupIdentifier getIdentifier() {
+ return deviceInfo.getServiceIdentifier();
+ }
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
- LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
- myManager.removeDeviceFromOperationalDS(deviceInfo);
- }
- });
- return future;
- } else {
- return myManager.removeDeviceFromOperationalDS(deviceInfo);
- }
+ private void changeLastRoleFuture(final ListenableFuture<RpcResult<SetRoleOutput>> newFuture) {
+ slaveTask.cancel();
+ lastRoleFuture.getAndUpdate(lastFuture -> {
+ if (Objects.nonNull(lastFuture) && !lastFuture.isCancelled() && !lastFuture.isDone()) {
+ lastFuture.cancel(true);
+ }
+
+ return newFuture;
+ });
}
- @Override
- public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave(){
- return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ private ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
+ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ changeLastRoleFuture(future);
+ Futures.addCallback(future, new SlaveRoleCallback(), MoreExecutors.directExecutor());
+ return future;
}
- @VisibleForTesting
- ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
- }
- final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo);
+
if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
- final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
- .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
- setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
+ final SetRoleInput setRoleInput = new SetRoleInputBuilder()
+ .setControllerRole(newRole)
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier()))
+ .build();
+
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture = roleService.setRole(setRoleInput);
+
final TimerTask timerTask = timeout -> {
if (!setRoleOutputFuture.isDone()) {
- LOG.warn("New role {} was not propagated to device {} during 5 sec", newRole, deviceInfo.getLOGValue());
+ LOG.warn("New role {} was not propagated to device {} during {} sec", newRole,
+ deviceInfo, SET_ROLE_TIMEOUT);
setRoleOutputFuture.cancel(true);
}
};
- hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
- } else {
- LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
- return Futures.immediateFuture(null);
+
+ timer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.MILLISECONDS);
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
}
- return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
- }
- @Override
- public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
- this.clusterInitializationPhaseHandler = handler;
+ LOG.info("Device: {} with version: {} does not support role {}", deviceInfo, deviceInfo.getVersion(), newRole);
+ return Futures.immediateFuture(null);
}
- @Override
- public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
- if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
- LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
- return false;
+ private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ contextChainMastershipWatcher.onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE);
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
}
- Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
- return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ if (!(throwable instanceof CancellationException)) {
+ contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
+ deviceInfo,
+ "Was not able to propagate MASTER role on device. Error: " + throwable.toString());
+ }
+ }
}
- private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+ private final class SlaveRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
@Override
- public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
- }
+ public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
+ contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
+ LOG.debug("Role SLAVE was successfully set on device, node {}", deviceInfo);
}
@Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
- lifecycleService.closeConnection();
+ public void onFailure(@Nonnull final Throwable throwable) {
+ if (!(throwable instanceof CancellationException)) {
+ contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo,
+ "Was not able to propagate SLAVE role on device. Error: " + throwable.toString());
+ }
}
}
-}
+}
\ No newline at end of file