import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.openflow.OpenFlowPluginTimer;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
public interface DeviceContext extends AutoCloseable,
OpenFlowPluginTimer,
DeviceReplyProcessor,
- DeviceDisconnectedHandler,
PortNumberCache {
/**
*/
void removeAuxiliaryConnectionContext(ConnectionContext connectionContext);
-
/**
* Method provides state of device represented by this device context.
*
*/
boolean submitTransaction();
+ /**
+ * Method has to close TxManager ASAP we are notified about Closed Connection
+ * @return sync. future for Slave and MD-SAL completition for Master
+ */
+ ListenableFuture<Void> shuttingDownDataStoreTransactions();
+
/**
* Method exposes transaction created for device
* represented by this context. This read only transaction has a fresh dataStore snapshot.
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceLifecycleSupervisor;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.translator.TranslatorLibrarian;
/**
* has its own device context managed by this manager.
* Created by Martin Bobak <mbobak@cisco.com> on 25.2.2015.
*/
-public interface DeviceManager extends DeviceConnectedHandler,
- TranslatorLibrarian,
- DeviceLifecycleSupervisor,
- DeviceInitializationPhaseHandler, DeviceTerminationPhaseHandler,
- AutoCloseable {
+public interface DeviceManager extends DeviceConnectedHandler, DeviceDisconnectedHandler, DeviceLifecycleSupervisor,
+ DeviceInitializationPhaseHandler, DeviceTerminationPhaseHandler, TranslatorLibrarian, AutoCloseable {
/**
* Sets notification receiving service
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.FutureFallback;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
private final boolean switchFeaturesMandatory;
private StatisticsContext statisticsContext;
+ private volatile DEVICE_CONTEXT_STATE deviceCtxState;
+
@VisibleForTesting
DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
- primaryConnectionContext.setDeviceDisconnectedHandler(DeviceContextImpl.this);
this.transactionChainManager = new TransactionChainManager(dataBroker, deviceState);
auxiliaryConnectionContexts = new HashMap<>();
deviceFlowRegistry = new DeviceFlowRegistryImpl();
itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
+ deviceCtxState = DEVICE_CONTEXT_STATE.INITIALIZATION;
}
/**
public void removeAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
if (null != connectionDistinguisher) {
+ LOG.debug("auxiliary connection dropped: {}, nodeId:{}", connectionContext.getConnectionAdapter()
+ .getRemoteAddress(), getDeviceState().getNodeId());
auxiliaryConnectionContexts.remove(connectionDistinguisher);
}
}
LOG.debug("closing deviceContext: {}, nodeId:{}",
getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
getDeviceState().getNodeId());
-
- if (deviceState.isValid()) {
- primaryConnectionContext.closeConnection(false);
- tearDown();
- }
- }
-
- private synchronized void tearDown() {
- LOG.trace("tearDown method for node {}", deviceState.getNodeId());
- if (deviceState.isValid()) {
- deviceState.setValid(false);
-
- for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
- connectionContext.closeConnection(false);
- }
-
- deviceGroupRegistry.close();
- deviceFlowRegistry.close();
- deviceMeterRegistry.close();
-
- final ListenableFuture<Void> future = transactionChainManager.shuttingDown();
- Futures.addCallback(future, new FutureCallback<Void>() {
-
- @Override
- public void onSuccess(final Void result) {
- LOG.info("TxChain {} was shutdown successful.", getDeviceState().getNodeId());
- tearDownClean();
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.warn("Shutdown TxChain {} fail.", getDeviceState().getNodeId(), t);
- tearDownClean();
- }
- });
- }
- }
-
- private void tearDownClean() {
- LOG.info("Closing transaction chain manager without cleaning inventory operational");
- transactionChainManager.close();
-
- final LinkedList<DeviceTerminationPhaseHandler> reversedCloseHandlers = new LinkedList<>(closeHandlers);
- Collections.reverse(reversedCloseHandlers);
- for (final DeviceTerminationPhaseHandler deviceContextClosedHandler : reversedCloseHandlers) {
- deviceContextClosedHandler.onDeviceContextLevelDown(this);
- }
- closeHandlers.clear();
- }
-
- @Override
- public void onDeviceDisconnected(final ConnectionContext connectionContext) {
- LOG.info("ConnectionEvent: Device disconnected from controller, Device:{}, NodeId:{}",
- connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId());
- if (getPrimaryConnectionContext().equals(connectionContext)) {
- try {
- tearDown();
- } catch (final Exception e) {
- LOG.trace("Error closing device context.");
- }
- } else {
- LOG.debug("auxiliary connection dropped: {}, nodeId:{}",
- connectionContext.getConnectionAdapter().getRemoteAddress(),
- getDeviceState().getNodeId());
- final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
- auxiliaryConnectionContexts.remove(connectionDistinguisher);
- }
+ // NOOP
+ throw new UnsupportedOperationException("Autocloseble.close will be removed soon");
}
@Override
@Override
public void onPublished() {
+ Verify.verify(DEVICE_CONTEXT_STATE.INITIALIZATION.equals(deviceCtxState));
+ deviceCtxState = DEVICE_CONTEXT_STATE.WORKING;
primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
}
@Override
- public void shutdownConnection() {
+ public synchronized void shutdownConnection() {
LOG.trace("shutdown method for node {}", deviceState.getNodeId());
+ deviceState.setValid(false);
+ if (DEVICE_CONTEXT_STATE.TERMINATION.equals(deviceCtxState)) {
+ LOG.debug("DeviceCtx for Node {} is in termination process.", deviceState.getNodeId());
+ return;
+ }
+ deviceCtxState = DEVICE_CONTEXT_STATE.TERMINATION;
+ for (final Iterator<ConnectionContext> iterator = Iterators.consumingIterator(auxiliaryConnectionContexts
+ .values().iterator()); iterator.hasNext();) {
+ iterator.next().closeConnection(false);
+ }
+ if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
+ LOG.debug("ConnectionCtx for Node {} is in RIP state.", deviceState.getNodeId());
+ return;
+ }
+ /* Terminate Auxiliary Connection */
for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
connectionContext.closeConnection(false);
}
+ /* Terminate Primary Connection */
getPrimaryConnectionContext().closeConnection(true);
+ /* Close all Group Registry */
+ deviceGroupRegistry.close();
+ deviceFlowRegistry.close();
+ deviceMeterRegistry.close();
}
@Override
public DEVICE_CONTEXT_STATE getDeviceContextState() {
- // TODO Auto-generated method stub
- return null;
+ return deviceCtxState;
+ }
+
+ @Override
+ public ListenableFuture<Void> shuttingDownDataStoreTransactions() {
+ return transactionChainManager.shuttingDown();
}
}
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
import java.util.Collections;
import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@Override
public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
- deviceInitPhaseHandler = handler;
+ this.deviceInitPhaseHandler = handler;
}
@Override
LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId());
+ // Add Disconnect handler
+ connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
// Cache this for clarity
final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
connectionAdapter, deviceContext);
connectionAdapter.setMessageListener(messageListener);
+ deviceState.setValid(true);
- deviceCtxLevelUp(deviceContext);
+ deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
}
private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) {
}
}
- private void deviceCtxLevelUp(final DeviceContext deviceContext) throws Exception {
- deviceContext.getDeviceState().setValid(true);
- LOG.trace("Device context level up called.");
- deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
- }
-
@Override
public TranslatorLibrary oook() {
return translatorLibrary;
@Override
public void close() {
- for (final Iterator<Entry<NodeId, DeviceContext>> iterator = Iterators
- .consumingIterator(deviceContexts.entrySet().iterator()); iterator.hasNext();) {
- iterator.next().getValue().close();
+ for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
+ iterator.hasNext();) {
+ final DeviceContext deviceCtx = iterator.next();
+ deviceCtx.shutdownConnection();
+ deviceCtx.shuttingDownDataStoreTransactions();
}
if (spyPool != null) {
public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
this.deviceTerminPhaseHandler = handler;
}
+
+ @Override
+ public void onDeviceDisconnected(final ConnectionContext connectionContext) {
+ LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
+ Preconditions.checkArgument(connectionContext != null);
+ final NodeId nodeId = connectionContext.getNodeId();
+ final DeviceContext deviceCtx = this.deviceContexts.get(nodeId);
+
+ if (null == deviceCtx) {
+ LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.",
+ connectionContext.getNodeId());
+ return;
+ }
+
+ if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
+ /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
+ deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
+ } else {
+ /* Device is disconnected and so we need to close TxManager */
+ final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
+ Futures.addCallback(future, new FutureCallback<Void>() {
+
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("TxChainManager for device {} is closed successful.", deviceCtx.getDeviceState().getNodeId());
+ deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.warn("TxChainManager for device {} failed by closing.", deviceCtx.getDeviceState().getNodeId(), t);
+ deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
+ }
+ });
+ /* Add timer for Close TxManager because it could fain ind cluster without notification */
+ final TimerTask timerTask = new TimerTask() {
+
+ @Override
+ public void run(final Timeout timeout) throws Exception {
+ if (!future.isDone()) {
+ LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.",
+ deviceCtx.getDeviceState().getNodeId());
+ future.cancel(false);
+ }
+ }
+ };
+ deviceCtx.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS);
+ }
+ }
}
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private BindingTransactionChain txChainFactory;
@GuardedBy("txLock")
private boolean submitIsEnabled;
+ @GuardedBy("txLock")
+ private ListenableFuture<Void> lastSubmittedFuture;
public TransactionChainManagerStatus getTransactionChainManagerStatus() {
return transactionChainManagerStatus;
TransactionChainManager(@Nonnull final DataBroker dataBroker, @Nonnull final DeviceState deviceState) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+ lastSubmittedFuture = Futures.immediateFuture(null);
LOG.debug("created txChainManager for {}", nodeII);
}
}
}
});
+ lastSubmittedFuture = submitFuture;
wTx = null;
}
return true;
return future;
}
+ @GuardedBy("txLock")
private ListenableFuture<Void> txChainShuttingDown() {
submitIsEnabled = false;
ListenableFuture<Void> future;
if (txChainFactory == null) {
// stay with actual thread
future = Futures.immediateCheckedFuture(null);
+ } else if (wTx == null) {
+ // hijack md-sal thread
+ future = lastSubmittedFuture;
} else {
// hijack md-sal thread
- if (wTx == null) {
- wTx = txChainFactory.newWriteOnlyTransaction();
- }
- final NodeBuilder nodeBuilder = new NodeBuilder().setId(nodeId());
- wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
future = wTx.submit();
wTx = null;
}
}
@Test
- public void testClose() {
+ public void testShutdownConnection() {
final ConnectionAdapter mockedConnectionAdapter = mock(ConnectionAdapter.class);
final InetSocketAddress mockRemoteAddress = InetSocketAddress.createUnresolved("odl-unit.example.org",999);
when(mockedConnectionAdapter.getRemoteAddress()).thenReturn(mockRemoteAddress);
final DeviceTerminationPhaseHandler mockedDeviceContextClosedHandler = mock(DeviceTerminationPhaseHandler.class);
deviceContext.addDeviceContextClosedHandler(mockedDeviceContextClosedHandler);
when(deviceState.isValid()).thenReturn(true);
- deviceContext.close();
- verify(connectionContext).closeConnection(false);
+ deviceContext.shutdownConnection();
+ verify(connectionContext).closeConnection(true);
}
@Test
final DeviceTerminationPhaseHandler deviceContextClosedHandler = mock(DeviceTerminationPhaseHandler.class);
deviceContext.addDeviceContextClosedHandler(deviceContextClosedHandler);
- deviceContext.onDeviceDisconnected(connectionContext);
-
// Mockito.verify(deviceState).setValid(false);
// Mockito.verify(deviceContextClosedHandler).onDeviceContextClosed(deviceContext);
Assert.assertEquals(0, deviceContext.getDeviceFlowRegistry().getAllFlowDescriptors().size());
import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.openflow.md.util.OpenflowPortsUtil;
@Mock
private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
@Mock
+ private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
+ @Mock
private TranslatorLibrary translatorLibrary;
@Mock
private ConnectionContext mockConnectionContext;
TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA, false, barrierIntervalNanos, barrierCountLimit);
deviceManager.setDeviceInitializationPhaseHandler(deviceInitPhaseHandler);
+ deviceManager.setDeviceTerminationPhaseHandler(deviceTerminationPhaseHandler);
return deviceManager;
}
deviceManager.close();
- Mockito.verify(deviceContext).close();
+ Mockito.verify(deviceContext).shutdownConnection();
+ Mockito.verify(deviceContext, Mockito.never()).close();
}
private static ConcurrentHashMap<NodeId, DeviceContext> getContextsCollection(final DeviceManagerImpl deviceManager) throws NoSuchFieldException, IllegalAccessException {