X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FConnectionConductorImpl.java;h=e9ad1e3d1ba04e4fb14cdaac51ce850ded1fe204;hb=62a2522261049627e33ff4ae80120ffb530e7f73;hp=e1d0df49ea8e51df9b97c9f7c7d9a4140a64433e;hpb=11b4838abc501a3b0c262120632cfbadba4bd1b0;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java index e1d0df49ea..e9ad1e3d1b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,14 +8,16 @@ package org.opendaylight.openflowplugin.openflow.md.core; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext; import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler; import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener; @@ -33,8 +35,6 @@ import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerIm import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; @@ -45,17 +45,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener; @@ -65,8 +60,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.Futures; - /** * @author mirehak */ @@ -111,21 +104,21 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, private int conductorId; private int ingressMaxQueueSize; + private HandshakeContext handshakeContext; /** - * @param connectionAdapter + * @param connectionAdapter connection adaptor for switch */ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) { this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE); } /** - * @param connectionAdapter - * @param ingressMaxQueueSize - * ingress queue limit (blocking) + * @param connectionAdapter connection adaptor for switch + * @param ingressMaxQueueSize ingress queue limit (blocking) */ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, - int ingressMaxQueueSize) { + int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; @@ -144,7 +137,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), "OFHandshake-" - + conductorId); + + conductorId); connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); @@ -162,8 +155,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /** - * @param errorHandler - * the errorHandler to set + * @param errorHandler the errorHandler to set */ @Override public void setErrorHandler(ErrorHandler errorHandler) { @@ -207,8 +199,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, /** * @param message - * @param queueType - * enqueue type + * @param queueType enqueue type */ private void enqueueMessage(OfHeader message, QueueType queueType) { queue.push(message, this, queueType); @@ -272,8 +263,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onPortStatusMessage(PortStatusMessage message) { - processPortStatusMsg(message); - enqueueMessage(message); + try { + processPortStatusMsg(message); + } finally { + enqueueMessage(message); + } } protected void processPortStatusMsg(PortStatus msg) { @@ -295,9 +289,14 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, "can't get bandwidth info from port: {}, aborting port update", msg.toString()); } else { - this.getSessionContext().getPhysicalPorts().put(portNumber, msg); - this.getSessionContext().getPortsBandwidth() - .put(portNumber, portBandwidth); + if (null != this.sessionContext) { + //FIXME these two properties are never used in code + this.getSessionContext().getPhysicalPorts().put(portNumber, msg); + this.getSessionContext().getPortsBandwidth() + .put(portNumber, portBandwidth); + } else { + LOG.warn("Trying to process update port message before session context was created."); + } } } @@ -365,8 +364,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /** - * @param conductorState - * the connectionState to set + * @param conductorState the connectionState to set */ @Override public void setConductorState(CONDUCTOR_STATE conductorState) { @@ -379,10 +377,12 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /** - * @param expectedState + * @param expectedState connection conductor state */ protected void checkState(CONDUCTOR_STATE expectedState) { if (!conductorState.equals(expectedState)) { + LOG.warn("State of connection to switch {} is not correct, " + + "terminating the connection", connectionAdapter.getRemoteAddress()); throw new IllegalStateException("Expected state: " + expectedState + ", actual state:" + conductorState); } @@ -445,6 +445,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, public void onConnectionReady() { LOG.debug("connection is ready-to-use"); if (!firstHelloProcessed) { + checkState(CONDUCTOR_STATE.HANDSHAKING); HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( null, handshakeManager, connectionAdapter); hsPool.execute(handshakeStepWrapper); @@ -456,17 +457,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput, - Short negotiatedVersion) { + Short negotiatedVersion) { postHandshakeBasic(featureOutput, negotiatedVersion); - - // post-handshake actions - if (version == OFConstants.OFP_VERSION_1_3) { - requestPorts(); - requestGroupFeatures(); - requestMeterFeatures(); - } - - requestDesc(); } @Override @@ -477,12 +469,12 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, /** * used by tests - * - * @param featureOutput - * @param negotiatedVersion + * + * @param featureOutput feature request output + * @param negotiatedVersion negotiated openflow connection version */ protected void postHandshakeBasic(GetFeaturesOutput featureOutput, - Short negotiatedVersion) { + Short negotiatedVersion) { version = negotiatedVersion; if (version == OFConstants.OFP_VERSION_1_0) { // Because the GetFeaturesOutput contains information about the port @@ -495,103 +487,65 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, enqueueMessage(featureOutput); } - OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion); + SessionContext sessionContext = OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion); hsPool.shutdown(); hsPool.purge(); conductorState = CONDUCTOR_STATE.WORKING; QueueKeeperFactory.plugQueue(queueProcessor, queue); } - /* - * Send an OFPMP_DESC request message to the switch - */ - private void requestDesc() { - MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder(); - builder.setType(MultipartType.OFPMPDESC); - builder.setVersion(getVersion()); - builder.setFlags(new MultipartRequestFlags(false)); - builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder() - .build()); - builder.setXid(getSessionContext().getNextXid()); - getConnectionAdapter().multipartRequest(builder.build()); - } - - private void requestPorts() { - MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder(); - builder.setType(MultipartType.OFPMPPORTDESC); - builder.setVersion(getVersion()); - builder.setFlags(new MultipartRequestFlags(false)); - builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder() - .build()); - builder.setXid(getSessionContext().getNextXid()); - getConnectionAdapter().multipartRequest(builder.build()); - } - - private void requestGroupFeatures() { - MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder(); - mprInput.setType(MultipartType.OFPMPGROUPFEATURES); - mprInput.setVersion(getVersion()); - mprInput.setFlags(new MultipartRequestFlags(false)); - mprInput.setXid(getSessionContext().getNextXid()); - - MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder(); - mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build()); - - LOG.debug("Send group features statistics request :{}", - mprGroupFeaturesBuild); - getConnectionAdapter().multipartRequest(mprInput.build()); - - } - - private void requestMeterFeatures() { - MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder(); - mprInput.setType(MultipartType.OFPMPMETERFEATURES); - mprInput.setVersion(getVersion()); - mprInput.setFlags(new MultipartRequestFlags(false)); - mprInput.setXid(getSessionContext().getNextXid()); - - MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder(); - mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build()); - - LOG.debug("Send meter features statistics request :{}", - mprMeterFeaturesBuild); - getConnectionAdapter().multipartRequest(mprInput.build()); - - } - /** - * @param isBitmapNegotiationEnable - * the isBitmapNegotiationEnable to set + * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set */ public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) { this.isBitmapNegotiationEnable = isBitmapNegotiationEnable; } - protected void shutdownPool() { - hsPool.shutdownNow(); - LOG.debug("pool is terminated: {}", hsPool.isTerminated()); + @Override + public void setId(int conductorId) { + this.conductorId = conductorId; + } + + @Override + public void close() { + conductorState = CONDUCTOR_STATE.RIP; + if (handshakeContext != null) { + try { + handshakeContext.close(); + } catch (Exception e) { + LOG.warn("Closing handshake context failed: {}", e.getMessage()); + LOG.debug("Detail in hanshake context close:", e); + } + } else { + //This condition will occure when Old Helium openflowplugin implementation will be used. + shutdownPoolPolitely(); + } } - protected void shutdownPoolPolitely() { + private void shutdownPoolPolitely() { + LOG.debug("Terminating handshake pool for node {}", connectionAdapter.getRemoteAddress()); hsPool.shutdown(); try { hsPool.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOG.info("Error while awaiting termination on pool. Will use shutdownNow method."); - shutdownPool(); + LOG.debug("Error while awaiting termination of pool. Will force shutdown now."); + } finally { + hsPool.purge(); + if (!hsPool.isTerminated()) { + hsPool.shutdownNow(); + } + LOG.debug("is handshake pool for node {} is terminated : {}", + connectionAdapter.getRemoteAddress(), hsPool.isTerminated()); } - hsPool.purge(); - LOG.debug("pool is terminated: {}", hsPool.isTerminated()); } @Override - public void setId(int conductorId) { - this.conductorId = conductorId; + public void setHandshakeContext(HandshakeContext handshakeContext) { + this.handshakeContext = handshakeContext; } - @Override - public void close() { - shutdownPoolPolitely(); - conductorState = CONDUCTOR_STATE.RIP; + @VisibleForTesting + ThreadPoolExecutor getHsPool() { + return hsPool; } }