X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fopenflow%2Fmd%2Fcore%2FConnectionConductorImpl.java;h=e9ad1e3d1ba04e4fb14cdaac51ce850ded1fe204;hb=d249ab7f81708d53c90c02dfca29e0083ac18b0c;hp=3bb0daa2c90f87cbc78f6ed10b151cfbb90442f1;hpb=bd327062ee4c7de7ca13ab1bb8474d2f60d7fa31;p=openflowplugin.git diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java index 3bb0daa2c9..e9ad1e3d1b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -8,26 +8,33 @@ package org.opendaylight.openflowplugin.openflow.md.core; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext; +import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor; +import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler; +import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener; +import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager; +import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer; +import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager; +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper; +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType; +import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListener; +import org.opendaylight.openflowplugin.api.openflow.md.queue.WaterMarkListenerImpl; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil; -import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; -import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper.QueueType; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory; -import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessor; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.SwitchConfigFlag; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; @@ -38,18 +45,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener; @@ -59,23 +60,26 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.Futures; - /** * @author mirehak */ public class ConnectionConductorImpl implements OpenflowProtocolListener, - SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer { + SystemNotificationsListener, ConnectionConductor, + ConnectionReadyListener, HandshakeListener, NotificationEnqueuer, + AutoCloseable { - /** ingress queue limit */ + /** + * ingress queue limit + */ private static final int INGRESS_QUEUE_MAX_SIZE = 200; protected static final Logger LOG = LoggerFactory .getLogger(ConnectionConductorImpl.class); - /* variable to make BitMap-based negotiation enabled / disabled. - * it will help while testing and isolating issues related to processing of - * BitMaps from switches. + /* + * variable to make BitMap-based negotiation enabled / disabled. it will + * help while testing and isolating issues related to processing of BitMaps + * from switches. */ private boolean isBitmapNegotiationEnable = true; protected ErrorHandler errorHandler; @@ -94,32 +98,34 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, private HandshakeManager handshakeManager; private boolean firstHelloProcessed; - + private PortFeaturesUtil portFeaturesUtils; private int conductorId; private int ingressMaxQueueSize; + private HandshakeContext handshakeContext; - /** - * @param connectionAdapter + * @param connectionAdapter connection adaptor for switch */ public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) { this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE); } /** - * @param connectionAdapter + * @param connectionAdapter connection adaptor for switch * @param ingressMaxQueueSize ingress queue limit (blocking) */ - public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, int ingressMaxQueueSize) { + public ConnectionConductorImpl(ConnectionAdapter connectionAdapter, + int ingressMaxQueueSize) { this.connectionAdapter = connectionAdapter; this.ingressMaxQueueSize = ingressMaxQueueSize; conductorState = CONDUCTOR_STATE.HANDSHAKING; firstHelloProcessed = false; handshakeManager = new HandshakeManagerImpl(connectionAdapter, - ConnectionConductor.versionOrder.get(0), ConnectionConductor.versionOrder); + ConnectionConductor.versionOrder.get(0), + ConnectionConductor.versionOrder); handshakeManager.setUseVersionBitmap(isBitmapNegotiationEnable); handshakeManager.setHandshakeListener(this); portFeaturesUtils = PortFeaturesUtil.getInstance(); @@ -128,18 +134,23 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void init() { int handshakeThreadLimit = 1; - hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), - "OFHandshake-"+conductorId); - + hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, + handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), "OFHandshake-" + + conductorId); + connectionAdapter.setMessageListener(this); connectionAdapter.setSystemListener(this); connectionAdapter.setConnectionReadyListener(this); - queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, ingressMaxQueueSize); + WaterMarkListener waterMarkListener = new WaterMarkListenerImpl( + connectionAdapter); + queue = QueueKeeperFactory.createFairQueueKeeper(queueProcessor, + ingressMaxQueueSize, waterMarkListener); } @Override - public void setQueueProcessor(QueueProcessor queueProcessor) { + public void setQueueProcessor( + QueueProcessor queueProcessor) { this.queueProcessor = queueProcessor; } @@ -157,7 +168,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, new Thread(new Runnable() { @Override public void run() { - LOG.debug("echo request received: " + echoRequestMessage.getXid()); + LOG.debug("echo request received: " + + echoRequestMessage.getXid()); EchoReplyInputBuilder builder = new EchoReplyInputBuilder(); builder.setVersion(echoRequestMessage.getVersion()); builder.setXid(echoRequestMessage.getXid()); @@ -173,14 +185,13 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, enqueueMessage(errorMessage); } - /** * @param message */ private void enqueueMessage(OfHeader message) { enqueueMessage(message, QueueType.DEFAULT); } - + @Override public void enqueueNotification(NotificationQueueWrapper notification) { enqueueMessage(notification); @@ -204,15 +215,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, enqueueMessage(message); } - /** - * version negotiation happened as per following steps: - * 1. If HelloMessage version field has same version, continue connection processing. - * If HelloMessage version is lower than supported versions, just disconnect. + * version negotiation happened as per following steps: 1. If HelloMessage + * version field has same version, continue connection processing. If + * HelloMessage version is lower than supported versions, just disconnect. * 2. If HelloMessage contains bitmap and common version found in bitmap - * then continue connection processing. if no common version found, just disconnect. - * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version. - * 4. If Hello message received again with not supported version, just disconnect. + * then continue connection processing. if no common version found, just + * disconnect. 3. If HelloMessage version is not supported, send + * HelloMessage with lower supported version. 4. If Hello message received + * again with not supported version, just disconnect. */ @Override public void onHelloMessage(final HelloMessage hello) { @@ -252,10 +263,13 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onPortStatusMessage(PortStatusMessage message) { - processPortStatusMsg(message); - enqueueMessage(message); + try { + processPortStatusMsg(message); + } finally { + enqueueMessage(message); + } } - + protected void processPortStatusMsg(PortStatus msg) { if (msg.getReason().getIntValue() == 2) { updatePort(msg); @@ -265,22 +279,30 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, deletePort(msg); } } - + protected void updatePort(PortStatus msg) { - Long portNumber = msg.getPortNo(); + Long portNumber = msg.getPortNo(); Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg); - - if(portBandwidth == null) { - LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString()); + + if (portBandwidth == null) { + LOG.debug( + "can't get bandwidth info from port: {}, aborting port update", + msg.toString()); } else { - this.getSessionContext().getPhysicalPorts().put(portNumber, msg); - this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth); - } + if (null != this.sessionContext) { + //FIXME these two properties are never used in code + this.getSessionContext().getPhysicalPorts().put(portNumber, msg); + this.getSessionContext().getPortsBandwidth() + .put(portNumber, portBandwidth); + } else { + LOG.warn("Trying to process update port message before session context was created."); + } + } } - + protected void deletePort(PortGrouping port) { Long portNumber = port.getPortNo(); - + this.getSessionContext().getPhysicalPorts().remove(portNumber); this.getSessionContext().getPortsBandwidth().remove(portNumber); } @@ -291,12 +313,17 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void run() { if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) { - // idle state in any other conductorState than WORKING means real - // problem and wont be handled by echoReply, but disconnection + // idle state in any other conductorState than WORKING means + // real + // problem and wont be handled by echoReply, but + // disconnection disconnect(); - OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this); + OFSessionUtil.getSessionManager().invalidateOnDisconnect( + ConnectionConductorImpl.this); } else { - LOG.debug("first idle state occured, sessionCtx={}|auxId={}", sessionContext, auxiliaryKey); + LOG.debug( + "first idle state occured, sessionCtx={}|auxId={}", + sessionContext, auxiliaryKey); EchoInputBuilder builder = new EchoInputBuilder(); builder.setVersion(getVersion()); builder.setXid(getSessionContext().getNextXid()); @@ -305,27 +332,30 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, .echo(builder.build()); try { - RpcResult echoReplyValue = echoReplyFuture.get(getMaxTimeout(), - getMaxTimeoutUnit()); + RpcResult echoReplyValue = echoReplyFuture + .get(getMaxTimeout(), getMaxTimeoutUnit()); if (echoReplyValue.isSuccessful()) { setConductorState(CONDUCTOR_STATE.WORKING); } else { - for (RpcError replyError : echoReplyValue.getErrors()) { + for (RpcError replyError : echoReplyValue + .getErrors()) { Throwable cause = replyError.getCause(); LOG.error( "while receiving echoReply in TIMEOUTING state: " + cause.getMessage(), cause); } - //switch issue occurred + // switch issue occurred throw new Exception("switch issue occurred"); } } catch (Exception e) { LOG.error("while waiting for echoReply in TIMEOUTING state: " + e.getMessage()); errorHandler.handleException(e, sessionContext); - //switch is not responding + // switch is not responding disconnect(); - OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this); + OFSessionUtil.getSessionManager() + .invalidateOnDisconnect( + ConnectionConductorImpl.this); } } } @@ -334,8 +364,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /** - * @param conductorState - * the connectionState to set + * @param conductorState the connectionState to set */ @Override public void setConductorState(CONDUCTOR_STATE conductorState) { @@ -348,10 +377,12 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, } /** - * @param expectedState + * @param expectedState connection conductor state */ protected void checkState(CONDUCTOR_STATE expectedState) { if (!conductorState.equals(expectedState)) { + LOG.warn("State of connection to switch {} is not correct, " + + "terminating the connection", connectionAdapter.getRemoteAddress()); throw new IllegalStateException("Expected state: " + expectedState + ", actual state:" + conductorState); } @@ -361,6 +392,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, public void onDisconnectEvent(DisconnectEvent arg0) { SessionManager sessionManager = OFSessionUtil.getSessionManager(); sessionManager.invalidateOnDisconnect(this); + close(); } @Override @@ -370,7 +402,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public Future disconnect() { - LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, auxiliaryKey); + LOG.trace("disconnecting: sessionCtx={}|auxId={}", sessionContext, + auxiliaryKey); Future result = null; if (connectionAdapter.isAlive()) { @@ -379,7 +412,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, LOG.debug("connection already disconnected"); result = Futures.immediateFuture(true); } - + close(); return result; } @@ -411,7 +444,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onConnectionReady() { LOG.debug("connection is ready-to-use"); - if (! firstHelloProcessed) { + if (!firstHelloProcessed) { + checkState(CONDUCTOR_STATE.HANDSHAKING); HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper( null, handshakeManager, connectionAdapter); hsPool.execute(handshakeStepWrapper); @@ -423,121 +457,95 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener, @Override public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput, - Short negotiatedVersion) { + Short negotiatedVersion) { postHandshakeBasic(featureOutput, negotiatedVersion); - - // post-handshake actions - setDefaultConfig(); - if(version == OFConstants.OFP_VERSION_1_3){ - requestPorts(); - requestGroupFeatures(); - requestMeterFeatures(); - } - - requestDesc(); + } + + @Override + public void onHandshakeFailure() { + LOG.info("OF handshake failed, doing cleanup."); + close(); } /** * used by tests - * @param featureOutput - * @param negotiatedVersion + * + * @param featureOutput feature request output + * @param negotiatedVersion negotiated openflow connection version */ protected void postHandshakeBasic(GetFeaturesOutput featureOutput, - Short negotiatedVersion) { + Short negotiatedVersion) { version = negotiatedVersion; if (version == OFConstants.OFP_VERSION_1_0) { - // Because the GetFeaturesOutput contains information about the port - // in OF1.0 (that we would otherwise get from the PortDesc) we have to pass - // it up for parsing to convert into a NodeConnectorUpdate + // Because the GetFeaturesOutput contains information about the port + // in OF1.0 (that we would otherwise get from the PortDesc) we have + // to pass + // it up for parsing to convert into a NodeConnectorUpdate // - // BUG-1988 - this must be the first item in queue in order not to get behind link-up message + // BUG-1988 - this must be the first item in queue in order not to + // get behind link-up message enqueueMessage(featureOutput); } - - OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion); + + SessionContext sessionContext = OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion); hsPool.shutdown(); hsPool.purge(); conductorState = CONDUCTOR_STATE.WORKING; QueueKeeperFactory.plugQueue(queueProcessor, queue); } - private void setDefaultConfig(){ - SetConfigInputBuilder builder = new SetConfigInputBuilder(); - builder.setVersion(getVersion()); - builder.setXid(getSessionContext().getNextXid()); - SwitchConfigFlag flag = SwitchConfigFlag.FRAGNORMAL; - builder.setFlags(flag); - builder.setMissSendLen(OFConstants.OFPCML_NO_BUFFER); - getConnectionAdapter().setConfig(builder.build()); - } - - /* - * Send an OFPMP_DESC request message to the switch - */ - private void requestDesc() { - MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder(); - builder.setType(MultipartType.OFPMPDESC); - builder.setVersion(getVersion()); - builder.setFlags(new MultipartRequestFlags(false)); - builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder().build()); - builder.setXid(getSessionContext().getNextXid()); - getConnectionAdapter().multipartRequest(builder.build()); - } - - private void requestPorts() { - MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder(); - builder.setType(MultipartType.OFPMPPORTDESC); - builder.setVersion(getVersion()); - builder.setFlags(new MultipartRequestFlags(false)); - builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder().build()); - builder.setXid(getSessionContext().getNextXid()); - getConnectionAdapter().multipartRequest(builder.build()); - } - private void requestGroupFeatures(){ - MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder(); - mprInput.setType(MultipartType.OFPMPGROUPFEATURES); - mprInput.setVersion(getVersion()); - mprInput.setFlags(new MultipartRequestFlags(false)); - mprInput.setXid(getSessionContext().getNextXid()); - - MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = - new MultipartRequestGroupFeaturesCaseBuilder(); - mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build()); - - LOG.debug("Send group features statistics request :{}",mprGroupFeaturesBuild); - getConnectionAdapter().multipartRequest(mprInput.build()); - - } - private void requestMeterFeatures(){ - MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder(); - mprInput.setType(MultipartType.OFPMPMETERFEATURES); - mprInput.setVersion(getVersion()); - mprInput.setFlags(new MultipartRequestFlags(false)); - mprInput.setXid(getSessionContext().getNextXid()); - - MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = - new MultipartRequestMeterFeaturesCaseBuilder(); - mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build()); - - LOG.debug("Send meter features statistics request :{}",mprMeterFeaturesBuild); - getConnectionAdapter().multipartRequest(mprInput.build()); - - } /** * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set */ - public void setBitmapNegotiationEnable( - boolean isBitmapNegotiationEnable) { + public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) { this.isBitmapNegotiationEnable = isBitmapNegotiationEnable; } - protected void shutdownPool() { - hsPool.shutdownNow(); - LOG.debug("pool is terminated: {}", hsPool.isTerminated()); - } - @Override public void setId(int conductorId) { this.conductorId = conductorId; } + + @Override + public void close() { + conductorState = CONDUCTOR_STATE.RIP; + if (handshakeContext != null) { + try { + handshakeContext.close(); + } catch (Exception e) { + LOG.warn("Closing handshake context failed: {}", e.getMessage()); + LOG.debug("Detail in hanshake context close:", e); + } + } else { + //This condition will occure when Old Helium openflowplugin implementation will be used. + shutdownPoolPolitely(); + } + } + + private void shutdownPoolPolitely() { + LOG.debug("Terminating handshake pool for node {}", connectionAdapter.getRemoteAddress()); + hsPool.shutdown(); + try { + hsPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.debug("Error while awaiting termination of pool. Will force shutdown now."); + } finally { + hsPool.purge(); + if (!hsPool.isTerminated()) { + hsPool.shutdownNow(); + } + LOG.debug("is handshake pool for node {} is terminated : {}", + connectionAdapter.getRemoteAddress(), hsPool.isTerminated()); + } + } + + @Override + public void setHandshakeContext(HandshakeContext handshakeContext) { + this.handshakeContext = handshakeContext; + } + + @VisibleForTesting + ThreadPoolExecutor getHsPool() { + return hsPool; + } }