/**
* General API for all OFP Context
*/
-public interface OFPContext extends ClusterLifecycleSupervisor, ClusterInitializationPhaseHandler {
-
- void setState(CONTEXT_STATE contextState);
-
+public interface OFPContext extends AutoCloseable, ClusterLifecycleSupervisor, ClusterInitializationPhaseHandler {
/**
* Context state
*/
}
/**
+ * Get actual context state
* @return actual context state
*/
CONTEXT_STATE getState();
/**
* About to stop services in cluster not master anymore or going down
* @return Future most of services need time to be closed
- * @param deviceDisconnected true if clustering services stopping by device disconnect
+ * @param connectionInterrupted true if clustering services stopping by device disconnect
*/
- default ListenableFuture<Void> stopClusterServices(final boolean deviceDisconnected){
+ default ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
return Futures.immediateFailedFuture(new RejectedExecutionException("Cannot stop abstract services, check implementation of cluster services"));
}
/**
+ * Get cluster singleton service identifier
* @return cluster singleton service identifier
*/
ServiceGroupIdentifier getServiceIdentifier();
/**
+ * Get device info
* @return device info
*/
DeviceInfo getDeviceInfo();
+ @Override
+ void close();
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow;
+
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceLifecycleSupervisor;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
+
+/**
+ * This interface is responsible for managing lifecycle of itself and all it's associated contexts.
+ * Every manager that implements this interface must handle connection initialization and termination chain
+ * by implementing methods from {@link org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceLifecycleSupervisor},
+ * then it must handle initialization and termination chain of it's associated context by implementing methods from
+ * {@link org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler} and
+ * {@link org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler} and also removal
+ * of these contexts from it's internal map of contexts by implementing methods from
+ * {@link org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler}. And at last, it must
+ * handle it's own full termination by implementing {@link AutoCloseable#close()}
+ */
+public interface OFPManager extends
+ DeviceLifecycleSupervisor,
+ DeviceInitializationPhaseHandler,
+ DeviceTerminationPhaseHandler,
+ DeviceRemovedHandler,
+ AutoCloseable {
+
+ @Override
+ void close();
+}
* Method registers handler responsible handling operations related to connected device after
* device is connected.
*
- * @param deviceConnectedHandler
+ * @param deviceConnectedHandler device connected handler
*/
void setDeviceConnectedHandler(DeviceConnectedHandler deviceConnectedHandler);
void setEchoReplyTimeout(long echoReplyTimeout);
*/
public interface DeviceContext extends
OFPContext,
- AutoCloseable,
DeviceReplyProcessor,
TxFacade,
DeviceRegistry,
*/
ItemLifeCycleRegistry getItemLifeCycleSourceRegistry();
- @Override
- void close();
-
void setSwitchFeaturesMandatory(boolean switchFeaturesMandatory);
void putLifecycleServiceIntoTxChainManager(LifecycleService lifecycleService);
import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.openflowplugin.api.openflow.OFPManager;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceLifecycleSupervisor;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.translator.TranslatorLibrarian;
/**
* registering transaction chain for each DeviceContext. Each device
* has its own device context managed by this manager.
*/
-public interface DeviceManager extends DeviceConnectedHandler, DeviceDisconnectedHandler, DeviceLifecycleSupervisor,
- DeviceInitializationPhaseHandler, DeviceTerminationPhaseHandler, TranslatorLibrarian, AutoCloseable {
-
+public interface DeviceManager extends
+ OFPManager,
+ DeviceConnectedHandler,
+ DeviceDisconnectedHandler,
+ TranslatorLibrarian {
/**
* invoked after all services injected
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
/**
- * Created by Martin Bobak <mbobak@cisco.com> on 26.2.2015.
+ * Represents handler for new connected device that will propagate information about
+ * established connection with device. It is important for correct order in device connection chain.
*/
public interface DeviceConnectedHandler {
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
/**
- * Created by Martin Bobak <mbobak@cisco.com> on 22.4.2015.
+ * Represents handler for just disconnected device that will propagate device's
+ * connection context. It is important for correct order in device disconnection chain.
*/
-
public interface DeviceDisconnectedHandler {
+
/**
* Method is used to propagate information about closed connection with device.
* It propagates connected device's connection context.
* @param lifecycleService - cluster singleton service
* @throws Exception - needs to be catch in ConnectionHandler implementation
*/
- void onDeviceContextLevelUp(final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception;
+ void onDeviceContextLevelUp(final @CheckForNull DeviceInfo deviceInfo,
+ final @CheckForNull LifecycleService lifecycleService) throws Exception;
}
* Interface has to implement all relevant manager to correctly handling
* device initialization and termination phase. Methods are used for order
* handlers in initialization/termination phase. Ordering is easily changed
- * programicaly by definition.
+ * by definition.
*
*/
public interface DeviceLifecycleSupervisor {
* Method sets relevant {@link DeviceInitializationPhaseHandler} for building
* handler's chain for new Device initial phase.
*
- * @param handler
+ * @param handler initialization phase handler
*/
void setDeviceInitializationPhaseHandler(DeviceInitializationPhaseHandler handler);
* Method sets relevant {@link DeviceInitializationPhaseHandler} for annihilating
* handler's chain for dead Device termination phase.
*
- * @param handler
+ * @param handler termination phase handler
*/
void setDeviceTerminationPhaseHandler(DeviceTerminationPhaseHandler handler);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.device.handlers;
+
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+
+/**
+ * Represents handler for device that was disconnected but needs to be removed from it's manager.
+ */
+public interface DeviceRemovedHandler {
+
+ /**
+ * Method is used to propagate information about device being removed from manager.
+ */
+ void onDeviceRemoved(DeviceInfo deviceInfo);
+}
/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
/**
* Method represents a termination cycle for {@link DeviceContext}.
+ * @param deviceInfo {@link org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo}
*
- * @param deviceInfo - {@link DeviceInfo}
*/
- void onDeviceContextLevelDown(@CheckForNull DeviceInfo deviceInfo);
+ void onDeviceContextLevelDown(final @CheckForNull DeviceInfo deviceInfo);
}
package org.opendaylight.openflowplugin.api.openflow.lifecycle;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import javax.annotation.CheckForNull;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterLifecycleSupervisor;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
/**
* Service for starting or stopping all services in plugin in cluster
*/
-public interface LifecycleService extends ClusterSingletonService, AutoCloseable, ClusterLifecycleSupervisor, ClusterInitializationPhaseHandler {
+public interface LifecycleService extends ClusterSingletonService, OFPContext {
/**
* This method registers lifecycle service to the given provider
*/
void registerService(final ClusterSingletonServiceProvider singletonServiceProvider);
+ /**
+ * This method registers device removed handler what will be executed when device should be removed
+ * from managers,
+ * @param deviceRemovedHandler device removed handler
+ */
+ void registerDeviceRemovedHandler(final @CheckForNull DeviceRemovedHandler deviceRemovedHandler);
+
/**
* Setter for device context
* @param deviceContext actual device context created per device
* {@link org.opendaylight.openflowplugin.api.openflow.device.RequestContext} to perform requests.
* <p>
*/
-public interface RpcContext extends RequestContextStack, AutoCloseable, OFPContext {
+public interface RpcContext extends RequestContextStack, OFPContext {
<S extends RpcService> void registerRpcServiceImplementation(Class<S> serviceClass, S serviceInstance);
<S extends RpcService> S lookupRpcService(Class<S> serviceClass);
<S extends RpcService> void unregisterRpcServiceImplementation(Class<S> serviceClass);
- @Override
- void close();
-
void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled);
-
- boolean isStatisticsRpcEnabled();
}
package org.opendaylight.openflowplugin.api.openflow.rpc;
+import org.opendaylight.openflowplugin.api.openflow.OFPManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceLifecycleSupervisor;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
/**
* The RPC Manager will maintain an RPC Context for each online switch. RPC context for device is created when
* {@link DeviceInitializationPhaseHandler#onDeviceContextLevelUp(DeviceInfo, org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService)}
* is called.
*/
-public interface RpcManager extends DeviceLifecycleSupervisor, DeviceInitializationPhaseHandler, AutoCloseable, DeviceTerminationPhaseHandler {
+public interface RpcManager extends OFPManager {
void setStatisticsRpcEnabled(boolean statisticsRpcEnabled);
}
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
/**
* Context for statistics
*/
-public interface StatisticsContext extends RequestContextStack, AutoCloseable, OFPContext {
+public interface StatisticsContext extends RequestContextStack, OFPContext {
/**
* Gather data from device
*/
ItemLifecycleListener getItemLifeCycleListener();
- @Override
- void close();
-
/**
* On / Off scheduling
* @param schedulingEnabled true if scheduling should be enabled
package org.opendaylight.openflowplugin.api.openflow.statistics;
+import org.opendaylight.openflowplugin.api.openflow.OFPManager;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceLifecycleSupervisor;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
/**
* Manager to start or stop scheduling statistics
*/
-public interface StatisticsManager extends DeviceLifecycleSupervisor, DeviceInitializationPhaseHandler,
- DeviceTerminationPhaseHandler, AutoCloseable {
+public interface StatisticsManager extends OFPManager {
/**
* Start scheduling statistic gathering for given device info
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{
+public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
return translatorLibrary;
}
- @Override
- public synchronized void close() {
- LOG.debug("closing deviceContext: {}, nodeId:{}",
- getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
- getDeviceInfo().getLOGValue());
- // NOOP
- throw new UnsupportedOperationException("Autocloseble.close will be removed soon");
- }
-
@Override
public void setCurrentBarrierTimeout(final Timeout timeout) {
barrierTaskTimeout = timeout;
@Override
public void onPublished() {
Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
- setState(CONTEXT_STATE.WORKING);
+ this.state = CONTEXT_STATE.WORKING;
primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
}
@Override
- public void setState(CONTEXT_STATE state) {
- this.state = state;
- }
-
- @Override
- public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
-
- ListenableFuture<Void> deactivateTxManagerFuture =
- initialized ? transactionChainManager.deactivateTransactionManager() : Futures.immediateFuture(null);
+ public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
+ final ListenableFuture<Void> deactivateTxManagerFuture = initialized
+ ? transactionChainManager.deactivateTransactionManager()
+ : Futures.immediateFuture(null);
- if (!deviceDisconnected) {
- ListenableFuture<Void> makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
+ if (!connectionInterrupted) {
+ final ListenableFuture<Void> makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
@Nullable
@Override
public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
public void onFailure(final Throwable throwable) {
LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
- myManager.removeDeviceFromOperationalDS(deviceInfo);
}
});
return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
@Override
public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- return makeSlaveFuture;
+ // Add fallback to remove device from operational DS if setting slave fails
+ return Futures.withFallback(makeSlaveFuture, t ->
+ myManager.removeDeviceFromOperationalDS(deviceInfo));
}
});
} else {
return this.deviceInfo;
}
+ @Override
+ public void close() {
+ if (CONTEXT_STATE.TERMINATION.equals(getState())){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
+ }
+ } else {
+ this.state = CONTEXT_STATE.TERMINATION;
+ }
+ }
+
@Override
public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
if (initialized) {
@Override
public void replaceConnectionContext(final ConnectionContext connectionContext){
// Act like we are initializing the context
- setState(CONTEXT_STATE.INITIALIZATION);
+ this.state = CONTEXT_STATE.INITIALIZATION;
this.primaryConnectionContext = connectionContext;
this.onPublished();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
}
+
final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+
if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
.setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
+
setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
+
final TimerTask timerTask = timeout -> {
if (!setRoleOutputFuture.isDone()) {
LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
setRoleOutputFuture.cancel(true);
}
};
+
hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
} else {
LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
return Futures.immediateFuture(null);
}
+
return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
}
LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
deviceContext.onPublished();
+ lifecycleService.registerDeviceRemovedHandler(this);
lifecycleService.registerService(this.singletonServiceProvider);
}
@Override
public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
Preconditions.checkArgument(connectionContext != null);
+ final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
- DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
/*
* This part prevent destroy another device context. Throwing here an exception result to propagate close connection
* in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
// Add Disconnect handler
- connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
+ connectionContext.setDeviceDisconnectedHandler(this);
+
// Cache this for clarity
final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
- //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
+ // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
connectionAdapter.setPacketInFiltering(true);
final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
@Override
public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
-
- LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
- }
-
updatePacketInRateLimiters();
- if (Objects.nonNull(lifecycleService)) {
- try {
- lifecycleService.close();
- LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
- } catch (Exception e) {
- LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
- }
- }
-
- deviceContexts.remove(deviceInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
- }
-
+ Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close);
}
@Override
final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
- if (null == deviceCtx) {
+ if (Objects.isNull(deviceCtx)) {
LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
return;
}
return;
}
- deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+ deviceCtx.close();
if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
- /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
+ // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
}
- //TODO: Auxiliary connections supported ?
- /* Device is disconnected and so we need to close TxManager */
+
+ // TODO: Auxiliary connections supported ?
+ // Device is disconnected and so we need to close TxManager
final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
Futures.addCallback(future, new FutureCallback<Void>() {
-
@Override
public void onSuccess(final Void result) {
LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
});
- /* Add timer for Close TxManager because it could fain ind cluster without notification */
+
+ // Add timer for Close TxManager because it could fail in cluster without notification
final TimerTask timerTask = timeout -> {
if (!future.isDone()) {
LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
future.cancel(false);
}
};
+
hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
}
}
}
- @VisibleForTesting
- void setDeviceContext(final DeviceInfo deviceInfo, final DeviceContext deviceContext) {
- this.deviceContexts.putIfAbsent(deviceInfo, deviceContext);
- }
+ public void onDeviceRemoved(DeviceInfo deviceInfo) {
+ deviceContexts.remove(deviceInfo);
+ LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
- @VisibleForTesting
- int getDeviceContextCount() {
- return this.deviceContexts.size();
+ lifecycleServices.remove(deviceInfo);
+ LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
}
-
-
}
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
public class LifecycleServiceImpl implements LifecycleService {
private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
-
- private boolean inClosing = false;
private DeviceContext deviceContext;
private RpcContext rpcContext;
private StatisticsContext statContext;
private ClusterSingletonServiceRegistration registration;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
+ private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
+ private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
@Override
public void instantiateServiceInstance() {
- LOG.info("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
- if (!this.clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
- this.closeConnection();
+ if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
+ closeConnection();
}
-
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
- LOG.info("Closing clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
-
final boolean connectionInterrupted =
this.deviceContext
.getPrimaryConnectionContext()
.getConnectionState()
.equals(ConnectionContext.CONNECTION_STATE.RIP);
- // If connection was interrupted and we are not trying to close service, then we received something
- // we do not wanted to receive, so do not continue
- if (connectionInterrupted && !inClosing) {
- LOG.warn("Failed to close clustering MASTER services for node {} because they are already closed",
- LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
-
- return Futures.immediateCancelledFuture();
- }
-
// Chain all jobs that will stop our services
final List<ListenableFuture<Void>> futureList = new ArrayList<>();
futureList.add(statContext.stopClusterServices(connectionInterrupted));
futureList.add(rpcContext.stopClusterServices(connectionInterrupted));
futureList.add(deviceContext.stopClusterServices(connectionInterrupted));
- // When we stopped all jobs then we are not in closing state anymore (at least from plugin perspective)
return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
@Nullable
@Override
public Void apply(@Nullable List<Void> input) {
- LOG.debug("Closed clustering MASTER services for node {}",
- LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
return null;
}
});
@Override
public ServiceGroupIdentifier getIdentifier() {
+ return getServiceIdentifier();
+ }
+
+ @Override
+ public CONTEXT_STATE getState() {
+ return this.state;
+ }
+
+ @Override
+ public ServiceGroupIdentifier getServiceIdentifier() {
return deviceContext.getServiceIdentifier();
}
+ @Override
+ public DeviceInfo getDeviceInfo() {
+ return deviceContext.getDeviceInfo();
+ }
@Override
- public void close() throws Exception {
- // If we are still registered and we are not already closing, then close the registration
- if (Objects.nonNull(registration) && !inClosing) {
- inClosing = true;
- registration.close();
- registration = null;
+ public void close() {
+ if (CONTEXT_STATE.TERMINATION.equals(getState())){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LifecycleService is already in TERMINATION state.");
+ }
+ } else {
+ this.state = CONTEXT_STATE.TERMINATION;
+
+ // We are closing, so cleanup all managers now
+ deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
+
+ // If we are still registered and we are not already closing, then close the registration
+ if (Objects.nonNull(registration)) {
+ try {
+ LOG.debug("Closing clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
+ registration.close();
+ } catch (Exception e) {
+ LOG.debug("Failed to close clustering MASTER services for node {} with exception: ",
+ getDeviceInfo().getLOGValue(), e);
+ }
+ }
}
}
@Override
public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
- LOG.info("Registering clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
- //lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
+ // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
this.clusterInitializationPhaseHandler = deviceContext;
this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
this.rpcContext.setLifecycleInitializationPhaseHandler(this);
//Set initial submit handler
this.statContext.setInitialSubmitHandler(this.deviceContext);
- //Register cluster singleton service
- this.registration = singletonServiceProvider.registerClusterSingletonService(this);
+
+ // Register cluster singleton service
+ try {
+ this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
+ LOG.info("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
+ } catch (Exception e) {
+ LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
+ closeConnection();
+ }
+ }
+
+ @Override
+ public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
+ if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
+ deviceRemovedHandlers.add(deviceRemovedHandler);
+ }
}
@Override
@Override
public void closeConnection() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
+ }
+
this.deviceContext.shutdownConnection();
}
@Override
public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
- if (ConnectionContext.CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
+ if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Connection to the device {} was interrupted.", this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
}
+
return false;
}
private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
- public DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
+ DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
this.deviceFlowRegistryFill = deviceFlowRegistryFill;
}
.filter(Objects::nonNull)
.count();
- LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
}
}
public void onFailure(Throwable t) {
if (deviceFlowRegistryFill.isCancelled()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
}
} else {
- LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);
}
}
}
package org.opendaylight.openflowplugin.impl.rpc;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
// TODO: add private Sal salBroker
private final ConcurrentMap<Class<?>, RoutedRpcRegistration<?>> rpcRegistrations = new ConcurrentHashMap<>();
private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
- private CONTEXT_STATE state;
+ private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
private final DeviceInfo deviceInfo;
private final DeviceContext deviceContext;
private final ExtensionConverterProvider extensionConverterProvider;
this.messageSpy = Preconditions.checkNotNull(messageSpy);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.nodeInstanceIdentifier = nodeInstanceIdentifier;
-
- tracker = new Semaphore(maxRequests, true);
+ this.tracker = new Semaphore(maxRequests, true);
this.extensionConverterProvider = extensionConverterProvider;
this.notificationPublishService = notificationPublishService;
- setState(CONTEXT_STATE.WORKING);
this.deviceInfo = deviceInfo;
this.deviceContext = deviceContext;
this.convertorExecutor = convertorExecutor;
public void close() {
if (CONTEXT_STATE.TERMINATION.equals(getState())){
if (LOG.isDebugEnabled()) {
- LOG.debug("RpcContext is already in TERMINATION state.");
+ LOG.debug("RpcContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
}
} else {
- setState(CONTEXT_STATE.TERMINATION);
- for (final Iterator<Entry<Class<?>, RoutedRpcRegistration<?>>> iterator = Iterators
- .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext(); ) {
- final RoutedRpcRegistration<?> rpcRegistration = iterator.next().getValue();
- rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
- rpcRegistration.close();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType().getSimpleName(),
- nodeInstanceIdentifier.getKey().getId().getValue());
- }
+ try {
+ stopClusterServices(true).get();
+ } catch (Exception e) {
+ LOG.debug("Failed to close RpcContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
}
+
+ this.state = CONTEXT_STATE.TERMINATION;
}
}
this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
}
- @Override
- public boolean isStatisticsRpcEnabled() {
- return isStatisticsRpcEnabled;
- }
-
@Override
public CONTEXT_STATE getState() {
return this.state;
}
- @Override
- public void setState(CONTEXT_STATE state) {
- this.state = state;
- }
-
@Override
public ServiceGroupIdentifier getServiceIdentifier() {
return this.deviceInfo.getServiceIdentifier();
}
@Override
- public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
- MdSalRegistrationUtils.unregisterServices(this);
- return Futures.immediateFuture(null);
+ public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
+ if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+ return Futures.immediateCancelledFuture();
+ }
+
+ return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable Object input) {
+ for (final Iterator<Entry<Class<?>, RoutedRpcRegistration<?>>> iterator = Iterators
+ .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext(); ) {
+ final RoutedRpcRegistration<?> rpcRegistration = iterator.next().getValue();
+ rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
+ rpcRegistration.close();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType().getSimpleName(),
+ nodeInstanceIdentifier.getKey().getId().getValue());
+ }
+ }
+
+ return null;
+ }
+ });
}
@Override
@Override
public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
return false;
}
MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor);
+
if (isStatisticsRpcEnabled) {
MdSalRegistrationUtils.registerStatCompatibilityServices(
this,
notificationPublishService,
convertorExecutor);
}
+
return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
}
}
import com.google.common.base.Verify;
import com.google.common.collect.Iterators;
import java.util.Iterator;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
private static final Logger LOG = LoggerFactory.getLogger(RpcManagerImpl.class);
private final RpcProviderRegistry rpcProviderRegistry;
private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
- private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
+ private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
private final int maxRequestsQuota;
private final ConcurrentMap<DeviceInfo, RpcContext> contexts = new ConcurrentHashMap<>();
private boolean isStatisticsRpcEnabled;
Verify.verify(contexts.putIfAbsent(deviceInfo, rpcContext) == null, "RpcCtx still not closed for node {}", deviceInfo.getNodeId());
lifecycleService.setRpcContext(rpcContext);
+ lifecycleService.registerDeviceRemovedHandler(this);
rpcContext.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
// finish device initialization cycle back to DeviceManager
@Override
public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
- final RpcContext removedContext = contexts.remove(deviceInfo);
- if (removedContext != null) {
- LOG.debug("Unregister RPCs services for node {}", deviceInfo.getLOGValue());
- removedContext.close();
- }
- deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
+ Optional.ofNullable(contexts.get(deviceInfo)).ifPresent(OFPContext::close);
+ deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
@Override
public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
- this.deviceTerminPhaseHandler = handler;
+ this.deviceTerminationPhaseHandler = handler;
}
/**
public void setStatisticsRpcEnabled(boolean statisticsRpcEnabled) {
isStatisticsRpcEnabled = statisticsRpcEnabled;
}
+
+ @Override
+ public void onDeviceRemoved(DeviceInfo deviceInfo) {
+ contexts.remove(deviceInfo);
+ LOG.debug("Rpc context removed for node {}", deviceInfo.getLOGValue());
+ }
}
package org.opendaylight.openflowplugin.impl.statistics;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
statListForCollectingInitialization();
- setState(CONTEXT_STATE.INITIALIZATION);
+ this.state = CONTEXT_STATE.INITIALIZATION;
this.deviceInfo = deviceInfo;
this.myManager = myManager;
this.lastDataGathering = null;
public void close() {
if (CONTEXT_STATE.TERMINATION.equals(getState())) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Statistics context is already in state TERMINATION.");
+ LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
}
} else {
- stopGatheringData();
- setState(CONTEXT_STATE.TERMINATION);
- schedulingEnabled = false;
+ try {
+ stopClusterServices(true).get();
+ } catch (Exception e) {
+ LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
+ }
+
+ this.state = CONTEXT_STATE.TERMINATION;
+
for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
iterator.hasNext(); ) {
RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
}
+
if (null != pollTimeout && !pollTimeout.isExpired()) {
pollTimeout.cancel();
}
return this.state;
}
- @Override
- public void setState(CONTEXT_STATE state) {
- this.state = state;
- }
-
@Override
public ServiceGroupIdentifier getServiceIdentifier() {
return this.deviceInfo.getServiceIdentifier();
}
@Override
- public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
- stopGatheringData();
- myManager.stopScheduling(deviceInfo);
- return Futures.immediateFuture(null);
+ public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
+ if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+ return Futures.immediateCancelledFuture();
+ }
+
+ return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable Object input) {
+ schedulingEnabled = false;
+ stopGatheringData();
+ return null;
+ }
+ });
}
@Override
if (LOG.isDebugEnabled()) {
LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
}
- this.lastDataGathering.cancel(true);
+
+ lastDataGathering.cancel(true);
}
}
@Override
public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
return false;
import javax.annotation.Nonnull;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
private final ConvertorExecutor converterExecutor;
private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
- private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
+ private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
private final ConcurrentMap<DeviceInfo, StatisticsContext> contexts = new ConcurrentHashMap<>();
lifecycleService,
converterExecutor,
this);
+
Verify.verify(
contexts.putIfAbsent(deviceInfo, statisticsContext) == null,
"StatisticsCtx still not closed for Node {}", deviceInfo.getLOGValue()
);
+
lifecycleService.setStatContext(statisticsContext);
+ lifecycleService.registerDeviceRemovedHandler(this);
deviceInitPhaseHandler.onDeviceContextLevelUp(deviceInfo, lifecycleService);
}
if (LOG.isDebugEnabled()) {
LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
}
+
timeCounter.markStart();
final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
- final StatisticsContext statisticsContext = contexts.remove(deviceInfo);
- if (null != statisticsContext) {
- LOG.debug("Removing device context from stack. No more statistics gathering for device: {}", deviceInfo.getLOGValue());
- statisticsContext.close();
- }
- deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
+ Optional.ofNullable(contexts.get(deviceInfo)).ifPresent(OFPContext::close);
+ deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
@Override
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
}
+
final StatisticsContext statisticsContext = contexts.get(deviceInfo);
if (statisticsContext == null) {
LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId());
return;
}
+
statisticsContext.setSchedulingEnabled(false);
}
controlServiceRegistration.close();
controlServiceRegistration = null;
}
+
for (final Iterator<StatisticsContext> iterator = Iterators.consumingIterator(contexts.values().iterator());
iterator.hasNext();) {
iterator.next().close();
@Override
public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
- this.deviceTerminPhaseHandler = handler;
+ this.deviceTerminationPhaseHandler = handler;
}
@Override
this.isStatisticsPollingOn = isStatisticsPollingOn;
}
+ public void onDeviceRemoved(DeviceInfo deviceInfo) {
+ contexts.remove(deviceInfo);
+ LOG.debug("Statistics context removed for node {}", deviceInfo.getLOGValue());
+ }
}
static void translateAndWriteReply(final MultipartType type, final DeviceContext dContext,
final InstanceIdentifier<Node> nodeII, final Collection<MultipartReply> result,
final ConvertorExecutor convertorExecutor) {
- try {
- result.stream()
- .map(MultipartReply::getMultipartReplyBody)
- .forEach(multipartReplyBody -> {
- if (!(writeDesc(type, multipartReplyBody, dContext, nodeII)
- || writeTableFeatures(type, multipartReplyBody, dContext, nodeII, convertorExecutor)
- || writeMeterFeatures(type, multipartReplyBody, dContext, nodeII)
- || writeGroupFeatures(type, multipartReplyBody, dContext, nodeII)
- || writePortDesc(type, multipartReplyBody, dContext, nodeII))) {
- throw new IllegalArgumentException("Unexpected MultipartType " + type);
- }
- });
- } catch (final Exception e) {
- LOG.debug("translateAndWriteReply: Failed to write node {} to DS ", dContext.getDeviceInfo().getNodeId().toString(), e);
+ if (Objects.nonNull(result)) {
+ try {
+ result.stream()
+ .map(MultipartReply::getMultipartReplyBody)
+ .forEach(multipartReplyBody -> {
+ if (!(writeDesc(type, multipartReplyBody, dContext, nodeII)
+ || writeTableFeatures(type, multipartReplyBody, dContext, nodeII, convertorExecutor)
+ || writeMeterFeatures(type, multipartReplyBody, dContext, nodeII)
+ || writeGroupFeatures(type, multipartReplyBody, dContext, nodeII)
+ || writePortDesc(type, multipartReplyBody, dContext, nodeII))) {
+ throw new IllegalArgumentException("Unexpected MultipartType " + type);
+ }
+ });
+ } catch (final Exception e) {
+ LOG.debug("translateAndWriteReply: Failed to write node {} to DS ", dContext.getDeviceInfo().getNodeId().toString(), e);
+ }
+ } else {
+ LOG.debug("translateAndWriteReply: Failed to write node {} to DS because we failed to gather device" +
+ "info.",
+ dContext.getDeviceInfo().getNodeId().toString());
}
}
final Xid xid = requestContext.getXid();
+ if (Objects.isNull(xid)) {
+ LOG.debug("Xid is not present, so cancelling node static info gathering.");
+ return Futures.immediateCancelledFuture();
+ }
+
LOG.trace("Hooking xid {} to device context - precaution.", reserved);
final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector(requestContext);
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
private DeviceFlowRegistry deviceFlowRegistry;
@Mock
private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+ @Mock
+ private ClusterSingletonServiceRegistration clusterSingletonServiceRegistration;
private LifecycleService lifecycleService;
Mockito.when(deviceFlowRegistry.fill()).thenReturn(Futures.immediateFuture(null));
Mockito.when(connectionContext.getConnectionState()).thenReturn(ConnectionContext.CONNECTION_STATE.WORKING);
Mockito.when(deviceInfo.getLOGValue()).thenReturn(TEST_NODE);
+ Mockito.when(clusterSingletonServiceProvider.registerClusterSingletonService(Mockito.any()))
+ .thenReturn(clusterSingletonServiceRegistration);
Mockito.when(deviceContext.stopClusterServices(Mockito.anyBoolean())).thenReturn(Futures.immediateFuture(null));
Mockito.when(statContext.stopClusterServices(Mockito.anyBoolean())).thenReturn(Futures.immediateFuture(null));
@Test
public void closeServiceInstance() throws Exception {
- lifecycleService.closeServiceInstance();
+ lifecycleService.closeServiceInstance().get();
Mockito.verify(statContext).stopClusterServices(false);
Mockito.verify(deviceContext).stopClusterServices(false);
Mockito.verify(rpcContext).stopClusterServices(false);
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Mockito.when(deviceContext.getMessageSpy()).thenReturn(messageSpy);
Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeKey.getId());
Mockito.when(rpcProviderRegistry.addRoutedRpcImplementation(
- Matchers.<Class<RpcService>>any(), Matchers.any(RpcService.class)))
+ Matchers.any(), Matchers.any(RpcService.class)))
.thenReturn(routedRpcRegistration);
Mockito.when(contexts.remove(deviceInfo)).thenReturn(removedContexts);
Mockito.when(lifecycleService.getDeviceContext()).thenReturn(deviceContext);
*/
@Test
public void onDeviceContextLevelDown1() {
- rpcManager.addRecordToContexts(deviceInfo,removedContexts);
+ rpcManager.addRecordToContexts(deviceInfo, removedContexts);
rpcManager.onDeviceContextLevelDown(deviceInfo);
verify(removedContexts,times(1)).close();
verify(deviceTerminationPhaseHandler,times(1)).onDeviceContextLevelDown(deviceInfo);
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
statisticsManager.onDeviceContextLevelDown(deviceInfo);
verify(statisticContext).close();
verify(mockedTerminationPhaseHandler).onDeviceContextLevelDown(deviceInfo);
- Assert.assertEquals(0, contextsMap.size());
+ Assert.assertEquals(1, contextsMap.size());
}
private static Map<DeviceInfo, StatisticsContext> getContextsMap(final StatisticsManagerImpl statisticsManager)