/**
- * Copyright (c) 2013-2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2013, 2015 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.openflowplugin.openflow.md.core;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.api.openflow.md.core.ErrorHandler;
import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
+import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestPortDescCaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.Futures;
-
/**
* @author mirehak
*/
private int conductorId;
private int ingressMaxQueueSize;
+ private HandshakeContext handshakeContext;
/**
- * @param connectionAdapter
+ * @param connectionAdapter connection adaptor for switch
*/
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
this(connectionAdapter, INGRESS_QUEUE_MAX_SIZE);
}
/**
- * @param connectionAdapter
- * @param ingressMaxQueueSize
- * ingress queue limit (blocking)
+ * @param connectionAdapter connection adaptor for switch
+ * @param ingressMaxQueueSize ingress queue limit (blocking)
*/
public ConnectionConductorImpl(ConnectionAdapter connectionAdapter,
- int ingressMaxQueueSize) {
+ int ingressMaxQueueSize) {
this.connectionAdapter = connectionAdapter;
this.ingressMaxQueueSize = ingressMaxQueueSize;
conductorState = CONDUCTOR_STATE.HANDSHAKING;
hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit,
handshakeThreadLimit, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), "OFHandshake-"
- + conductorId);
+ + conductorId);
connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
}
/**
- * @param errorHandler
- * the errorHandler to set
+ * @param errorHandler the errorHandler to set
*/
@Override
public void setErrorHandler(ErrorHandler errorHandler) {
/**
* @param message
- * @param queueType
- * enqueue type
+ * @param queueType enqueue type
*/
private void enqueueMessage(OfHeader message, QueueType queueType) {
queue.push(message, this, queueType);
@Override
public void onPortStatusMessage(PortStatusMessage message) {
- processPortStatusMsg(message);
- enqueueMessage(message);
+ try {
+ processPortStatusMsg(message);
+ } finally {
+ enqueueMessage(message);
+ }
}
protected void processPortStatusMsg(PortStatus msg) {
"can't get bandwidth info from port: {}, aborting port update",
msg.toString());
} else {
- this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
- this.getSessionContext().getPortsBandwidth()
- .put(portNumber, portBandwidth);
+ if (null != this.sessionContext) {
+ //FIXME these two properties are never used in code
+ this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
+ this.getSessionContext().getPortsBandwidth()
+ .put(portNumber, portBandwidth);
+ } else {
+ LOG.warn("Trying to process update port message before session context was created.");
+ }
}
}
}
/**
- * @param conductorState
- * the connectionState to set
+ * @param conductorState the connectionState to set
*/
@Override
public void setConductorState(CONDUCTOR_STATE conductorState) {
}
/**
- * @param expectedState
+ * @param expectedState connection conductor state
*/
protected void checkState(CONDUCTOR_STATE expectedState) {
if (!conductorState.equals(expectedState)) {
+ LOG.warn("State of connection to switch {} is not correct, "
+ + "terminating the connection", connectionAdapter.getRemoteAddress());
throw new IllegalStateException("Expected state: " + expectedState
+ ", actual state:" + conductorState);
}
public void onConnectionReady() {
LOG.debug("connection is ready-to-use");
if (!firstHelloProcessed) {
+ checkState(CONDUCTOR_STATE.HANDSHAKING);
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
null, handshakeManager, connectionAdapter);
hsPool.execute(handshakeStepWrapper);
@Override
public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
postHandshakeBasic(featureOutput, negotiatedVersion);
-
- // post-handshake actions
- if (version == OFConstants.OFP_VERSION_1_3) {
- requestPorts();
- requestGroupFeatures();
- requestMeterFeatures();
- }
-
- requestDesc();
}
@Override
/**
* used by tests
- *
- * @param featureOutput
- * @param negotiatedVersion
+ *
+ * @param featureOutput feature request output
+ * @param negotiatedVersion negotiated openflow connection version
*/
protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
version = negotiatedVersion;
if (version == OFConstants.OFP_VERSION_1_0) {
// Because the GetFeaturesOutput contains information about the port
enqueueMessage(featureOutput);
}
- OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
+ SessionContext sessionContext = OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
hsPool.shutdown();
hsPool.purge();
conductorState = CONDUCTOR_STATE.WORKING;
QueueKeeperFactory.plugQueue(queueProcessor, queue);
}
- /*
- * Send an OFPMP_DESC request message to the switch
- */
- private void requestDesc() {
- MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
- builder.setType(MultipartType.OFPMPDESC);
- builder.setVersion(getVersion());
- builder.setFlags(new MultipartRequestFlags(false));
- builder.setMultipartRequestBody(new MultipartRequestDescCaseBuilder()
- .build());
- builder.setXid(getSessionContext().getNextXid());
- getConnectionAdapter().multipartRequest(builder.build());
- }
-
- private void requestPorts() {
- MultipartRequestInputBuilder builder = new MultipartRequestInputBuilder();
- builder.setType(MultipartType.OFPMPPORTDESC);
- builder.setVersion(getVersion());
- builder.setFlags(new MultipartRequestFlags(false));
- builder.setMultipartRequestBody(new MultipartRequestPortDescCaseBuilder()
- .build());
- builder.setXid(getSessionContext().getNextXid());
- getConnectionAdapter().multipartRequest(builder.build());
- }
-
- private void requestGroupFeatures() {
- MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
- mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
- mprInput.setVersion(getVersion());
- mprInput.setFlags(new MultipartRequestFlags(false));
- mprInput.setXid(getSessionContext().getNextXid());
-
- MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild = new MultipartRequestGroupFeaturesCaseBuilder();
- mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
-
- LOG.debug("Send group features statistics request :{}",
- mprGroupFeaturesBuild);
- getConnectionAdapter().multipartRequest(mprInput.build());
-
- }
-
- private void requestMeterFeatures() {
- MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
- mprInput.setType(MultipartType.OFPMPMETERFEATURES);
- mprInput.setVersion(getVersion());
- mprInput.setFlags(new MultipartRequestFlags(false));
- mprInput.setXid(getSessionContext().getNextXid());
-
- MultipartRequestMeterFeaturesCaseBuilder mprMeterFeaturesBuild = new MultipartRequestMeterFeaturesCaseBuilder();
- mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
-
- LOG.debug("Send meter features statistics request :{}",
- mprMeterFeaturesBuild);
- getConnectionAdapter().multipartRequest(mprInput.build());
-
- }
-
/**
- * @param isBitmapNegotiationEnable
- * the isBitmapNegotiationEnable to set
+ * @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
*/
public void setBitmapNegotiationEnable(boolean isBitmapNegotiationEnable) {
this.isBitmapNegotiationEnable = isBitmapNegotiationEnable;
}
- protected void shutdownPool() {
- hsPool.shutdownNow();
- LOG.debug("pool is terminated: {}", hsPool.isTerminated());
+ @Override
+ public void setId(int conductorId) {
+ this.conductorId = conductorId;
+ }
+
+ @Override
+ public void close() {
+ conductorState = CONDUCTOR_STATE.RIP;
+ if (handshakeContext != null) {
+ try {
+ handshakeContext.close();
+ } catch (Exception e) {
+ LOG.warn("Closing handshake context failed: {}", e.getMessage());
+ LOG.debug("Detail in hanshake context close:", e);
+ }
+ } else {
+ //This condition will occure when Old Helium openflowplugin implementation will be used.
+ shutdownPoolPolitely();
+ }
}
- protected void shutdownPoolPolitely() {
+ private void shutdownPoolPolitely() {
+ LOG.debug("Terminating handshake pool for node {}", connectionAdapter.getRemoteAddress());
hsPool.shutdown();
try {
hsPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
- shutdownPool();
+ LOG.debug("Error while awaiting termination of pool. Will force shutdown now.");
+ } finally {
+ hsPool.purge();
+ if (!hsPool.isTerminated()) {
+ hsPool.shutdownNow();
+ }
+ LOG.debug("is handshake pool for node {} is terminated : {}",
+ connectionAdapter.getRemoteAddress(), hsPool.isTerminated());
}
- hsPool.purge();
- LOG.debug("pool is terminated: {}", hsPool.isTerminated());
}
@Override
- public void setId(int conductorId) {
- this.conductorId = conductorId;
+ public void setHandshakeContext(HandshakeContext handshakeContext) {
+ this.handshakeContext = handshakeContext;
}
- @Override
- public void close() {
- shutdownPoolPolitely();
- conductorState = CONDUCTOR_STATE.RIP;
+ @VisibleForTesting
+ ThreadPoolExecutor getHsPool() {
+ return hsPool;
}
}