X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fconnection%2FConnectionContextImpl.java;h=3fecb793c5dbb63deb6d838f947b75c12af04e1b;hb=7fffede33481db0b1adf36974a1717c13ae7fdac;hp=43f5d67ad886f8f237e6659d21a104b067731639;hpb=124f8d33c4e428d2fd978a652da29f6da2b66801;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java index 43f5d67ad8..3fecb793c5 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java @@ -5,17 +5,23 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ + package org.opendaylight.openflowplugin.impl.connection; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import java.util.Collection; +import java.math.BigInteger; +import java.net.InetSocketAddress; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.connection.MultiMsgCollector; +import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext; +import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler; +import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.SessionStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -23,16 +29,20 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 public class ConnectionContextImpl implements ConnectionContext { private final ConnectionAdapter connectionAdapter; - private CONNECTION_STATE connectionState; + private volatile CONNECTION_STATE connectionState; private FeaturesReply featuresReply; - private final MultiMsgCollector multipartCollector; + private NodeId nodeId; + private DeviceDisconnectedHandler deviceDisconnectedHandler; + private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class); + private OutboundQueueProvider outboundQueueProvider; + private OutboundQueueHandlerRegistration outboundQueueHandlerRegistration; + private HandshakeContext handshakeContext; /** * @param connectionAdapter */ public ConnectionContextImpl(final ConnectionAdapter connectionAdapter) { this.connectionAdapter = connectionAdapter; - multipartCollector = new MultiMsgCollectorImpl(); } @Override @@ -40,6 +50,16 @@ public class ConnectionContextImpl implements ConnectionContext { return connectionAdapter; } + @Override + public OutboundQueue getOutboundQueueProvider() { + return this.outboundQueueProvider; + } + + @Override + public void setOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider) { + this.outboundQueueProvider = outboundQueueProvider; + } + @Override public CONNECTION_STATE getConnectionState() { return connectionState; @@ -47,13 +67,12 @@ public class ConnectionContextImpl implements ConnectionContext { @Override public NodeId getNodeId() { - // TODO Auto-generated method stub - return null; + return nodeId; } @Override - public void setConnectionState(final CONNECTION_STATE connectionState) { - this.connectionState = connectionState; + public void setNodeId(final NodeId nodeId) { + this.nodeId = nodeId; } @Override @@ -61,24 +80,117 @@ public class ConnectionContextImpl implements ConnectionContext { return featuresReply; } + @Override + public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) { + this.deviceDisconnectedHandler = deviceDisconnectedHandler; + } + @Override public void setFeatures(final FeaturesReply featuresReply) { this.featuresReply = featuresReply; + } + + @Override + public void closeConnection(boolean propagate) { + if (null == nodeId){ + SessionStatistics.countEvent(connectionAdapter.getRemoteAddress().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP); + } else { + SessionStatistics.countEvent(nodeId.toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_OFP); + } + final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO; + LOG.debug("Actively closing connection: {}, datapathId:{}.", + connectionAdapter.getRemoteAddress(), datapathId); + connectionState = ConnectionContext.CONNECTION_STATE.RIP; + + unregisterOutboundQueue(); + closeHandshakeContext(); + + if (getConnectionAdapter().isAlive()) { + getConnectionAdapter().disconnect(); + } + + if (propagate) { + propagateDeviceDisconnectedEvent(); + } + } + private void closeHandshakeContext() { + if (handshakeContext != null) { + try { + handshakeContext.close(); + } catch (Exception e) { + LOG.info("handshake context closing failed: ", e); + } finally { + handshakeContext = null; + } + } + } + + @Override + public void onConnectionClosed() { + if (null == nodeId){ + SessionStatistics.countEvent(connectionAdapter.getRemoteAddress().toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_DEVICE); + } else { + SessionStatistics.countEvent(nodeId.toString(), SessionStatistics.ConnectionStatus.CONNECTION_DISCONNECTED_BY_DEVICE); + } + connectionState = ConnectionContext.CONNECTION_STATE.RIP; + + final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress(); + final Short auxiliaryId; + if (null != getFeatures() && null != getFeatures().getAuxiliaryId()) { + auxiliaryId = getFeatures().getAuxiliaryId(); + } else { + auxiliaryId = 0; + } + + LOG.debug("disconnecting: node={}|auxId={}|connection state = {}", + remoteAddress, + auxiliaryId, + getConnectionState()); + + unregisterOutboundQueue(); + closeHandshakeContext(); + propagateDeviceDisconnectedEvent(); + } + + private void propagateDeviceDisconnectedEvent() { + if (null != deviceDisconnectedHandler) { + final BigInteger datapathId = featuresReply != null ? featuresReply.getDatapathId() : BigInteger.ZERO; + LOG.debug("Propagating connection closed event: {}, datapathId:{}.", + connectionAdapter.getRemoteAddress(), datapathId); + deviceDisconnectedHandler.onDeviceDisconnected(this); + } + } + + @Override + public void setOutboundQueueHandleRegistration(OutboundQueueHandlerRegistration outboundQueueHandlerRegistration) { + this.outboundQueueHandlerRegistration = outboundQueueHandlerRegistration; + } + + private void unregisterOutboundQueue() { + if (outboundQueueHandlerRegistration != null) { + outboundQueueHandlerRegistration.close(); + outboundQueueHandlerRegistration = null; + } + } + + @Override + public synchronized void changeStateToHandshaking() { + connectionState = CONNECTION_STATE.HANDSHAKING; } @Override - public ListenableFuture> registerMultipartMsg(final long xid) { - return multipartCollector.registerMultipartMsg(xid); + public synchronized void changeStateToTimeouting() { + connectionState = CONNECTION_STATE.TIMEOUTING; } @Override - public void addMultipartMsg(final MultipartReply reply) { - multipartCollector.addMultipartMsg(reply); + public synchronized void changeStateToWorking() { + connectionState = CONNECTION_STATE.WORKING; } @Override - public void registerMultipartFutureMsg(final long xid, final SettableFuture> future) { - multipartCollector.registerMultipartFutureMsg(xid, future); + public void setHandshakeContext(HandshakeContext handshakeContext) { + this.handshakeContext = handshakeContext; } }