/**
- * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
private HandshakeContext handshakeContext;
/**
- * @param connectionAdapter
+ * @param connectionAdapter connection adaptor for switch
*/
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
}
/**
- * @param connectionAdapter
- * @param ingressMaxQueueSize
- * ingress queue limit (blocking)
+ * @param connectionAdapter connection adaptor for switch
+ * @param ingressMaxQueueSize ingress queue limit (blocking)
*/
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
- int ingressMaxQueueSize) {
+ int ingressMaxQueueSize) {
this.connectionAdapter = connectionAdapter;
this.ingressMaxQueueSize = ingressMaxQueueSize;
conductorState = CONDUCTOR_STATE.HANDSHAKING;
hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
- + conductorId);
+ + conductorId);
connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
}
/**
- * @param errorHandler
- * the errorHandler to set
+ * @param errorHandler the errorHandler to set
*/
@Override
public void setErrorHandler(ErrorHandler errorHandler) {
/**
* @param message
- * @param queueType
- * enqueue type
+ * @param queueType enqueue type
*/
private void enqueueMessage(OfHeader message, QueueType queueType) {
queue.push(message, this, queueType);
@Override
public void onPortStatusMessage(PortStatusMessage message) {
- processPortStatusMsg(message);
- enqueueMessage(message);
+ try {
+ processPortStatusMsg(message);
+ } finally {
+ enqueueMessage(message);
+ }
}
protected void processPortStatusMsg(PortStatus msg) {
"can't get bandwidth info from port: {}, aborting port update",
msg.toString());
} else {
- this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
- this.getSessionContext().getPortsBandwidth()
- .put(portNumber, portBandwidth);
+ if (null != this.sessionContext) {
+ //FIXME these two properties are never used in code
+ this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
+ this.getSessionContext().getPortsBandwidth()
+ .put(portNumber, portBandwidth);
+ } else {
+ LOG.warn("Trying to process update port message before session context was created.");
+ }
}
}
}
/**
- * @param conductorState
- * the connectionState to set
+ * @param conductorState the connectionState to set
*/
@Override
public void setConductorState(CONDUCTOR_STATE conductorState) {
}
/**
- * @param expectedState
+ * @param expectedState connection conductor state
*/
protected void checkState(CONDUCTOR_STATE expectedState) {
if (!conductorState.equals(expectedState)) {
+ LOG.warn("State of connection to switch {} is not correct, "
+ + "terminating the connection", connectionAdapter.getRemoteAddress());
throw new IllegalStateException("Expected state: " + expectedState
+ ", actual state:" + conductorState);
}
public void onConnectionReady() {
LOG.debug("connection is ready-to-use");
if (!firstHelloProcessed) {
+ checkState(CONDUCTOR_STATE.HANDSHAKING);
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
null, handshakeManager, connectionAdapter);
hsPool.execute(handshakeStepWrapper);
public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
Short negotiatedVersion) {
postHandshakeBasic(featureOutput, negotiatedVersion);
-
- // post-handshake actions
- if (version == OFConstants.OFP_VERSION_1_3) {
- requestPorts();
- requestGroupFeatures();
- requestMeterFeatures();
- }
-
- requestDesc();
}
@Override
/**
* used by tests
*
- * @param featureOutput
- * @param negotiatedVersion
+ * @param featureOutput feature request output
+ * @param negotiatedVersion negotiated openflow connection version
*/
protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
version = negotiatedVersion;
if (version == OFConstants.OFP_VERSION_1_0) {
// Because the GetFeaturesOutput contains information about the port
enqueueMessage(featureOutput);
}
- OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
+ SessionContext sessionContext = OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
hsPool.shutdown();
hsPool.purge();
conductorState = CONDUCTOR_STATE.WORKING;
QueueKeeperFactory.plugQueue(queueProcessor, queue);
}
- /*
- * Send an OFPMP_DESC request message to the switch
- */
- private void requestDesc() {
- MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
- builder.setType(MultipartType.OFPMPDESC);
- builder.setVersion(getVersion());
- builder.setFlags(new MultipartRequestFlags(false));
- builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder()
- .build());
- builder.setXid(getSessionContext().getNextXid());
- getConnectionAdapter().multipartRequest(builder.build());
- }
-
- private void requestPorts() {
- MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
- builder.setType(MultipartType.OFPMPPORTDESC);
- builder.setVersion(getVersion());
- builder.setFlags(new MultipartRequestFlags(false));
- builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder()
- .build());
- builder.setXid(getSessionContext().getNextXid());
- getConnectionAdapter().multipartRequest(builder.build());
- }
-
- private void requestGroupFeatures() {
- MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
- mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
- mprInput.setVersion(getVersion());
- mprInput.setFlags(new MultipartRequestFlags(false));
- mprInput.setXid(getSessionContext().getNextXid());
-
- MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder();
- mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
-
- LOG.debug("Send group features statistics request :{}",
- mprGroupFeaturesBuild);
- getConnectionAdapter().multipartRequest(mprInput.build());
-
- }
-
- private void requestMeterFeatures() {
- MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
- mprInput.setType(MultipartType.OFPMPMETERFEATURES);
- mprInput.setVersion(getVersion());
- mprInput.setFlags(new MultipartRequestFlags(false));
- mprInput.setXid(getSessionContext().getNextXid());
-
- MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder();
- mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
-
- LOG.debug("Send meter features statistics request :{}",
- mprMeterFeaturesBuild);
- getConnectionAdapter().multipartRequest(mprInput.build());
-
- }
-
/**
- * @param isBitmapNegotiationEnable
- * the isBitmapNegotiationEnable to set
+ * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
*/
public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
@Override
public void close() {
conductorState = CONDUCTOR_STATE.RIP;
+ if (handshakeContext != null) {
+ try {
+ handshakeContext.close();
+ } catch (Exception e) {
+ LOG.warn("Closing handshake context failed: {}", e.getMessage());
+ LOG.debug("Detail in hanshake context close:", e);
+ }
+ } else {
+ //This condition will occure when Old Helium openflowplugin implementation will be used.
+ shutdownPoolPolitely();
+ }
+ }
+
+ private void shutdownPoolPolitely() {
+ LOG.debug("Terminating handshake pool for node {}", connectionAdapter.getRemoteAddress());
+ hsPool.shutdown();
try {
- handshakeContext.close();
- } catch (Exception e) {
- LOG.warn("Closing handshake context failed: {}", e.getMessage());
- LOG.debug("Detail in hanshake context close:", e);
+ hsPool.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.debug("Error while awaiting termination of pool. Will force shutdown now.");
+ } finally {
+ hsPool.purge();
+ if (!hsPool.isTerminated()) {
+ hsPool.shutdownNow();
+ }
+ LOG.debug("is handshake pool for node {} is terminated : {}",
+ connectionAdapter.getRemoteAddress(), hsPool.isTerminated());
}
}