package org.opendaylight.openflowplugin.openflow.md.core;
+import com.google.common.util.concurrent.Futures;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
-import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionManager;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueKeeper.QueueType;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
import org.opendaylight.openflowplugin.api.openflow.md.queue.QueueProcessor;
+import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
+import org.opendaylight.openflowplugin.openflow.md.core.session.PortFeaturesUtil;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperFactory;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.SwitchConfigFlag;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestDescCaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestGroupFeaturesCaseBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestMeterFeaturesCaseBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.Futures;
-
/**
* @author mirehak
*/
public class ConnectionConductorImpl implements OpenflowProtocolListener,
- SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer {
+ SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener, HandshakeListener, NotificationEnqueuer, AutoCloseable {
- /** ingress queue limit */
+ /**
+ * ingress queue limit
+ */
private static final int INGRESS_QUEUE_MAX_SIZE = 200;
protected static final Logger LOG = LoggerFactory
private HandshakeManager handshakeManager;
private boolean firstHelloProcessed;
-
+
private PortFeaturesUtil portFeaturesUtils;
private int conductorId;
private int ingressMaxQueueSize;
-
+
/**
* @param connectionAdapter
*/
@Override
public void init() {
int handshakeThreadLimit = 1;
- hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit , handshakeThreadLimit, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- "OFHandshake-"+conductorId);
-
+ hsPool = new ThreadPoolLoggingExecutor(handshakeThreadLimit, handshakeThreadLimit, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+ "OFHandshake-" + conductorId);
+
connectionAdapter.setMessageListener(this);
connectionAdapter.setSystemListener(this);
connectionAdapter.setConnectionReadyListener(this);
enqueueMessage(errorMessage);
}
-
+
/**
* @param message
*/
private void enqueueMessage(OfHeader message) {
enqueueMessage(message, QueueType.DEFAULT);
}
-
+
@Override
public void enqueueNotification(NotificationQueueWrapper notification) {
enqueueMessage(notification);
/**
* version negotiation happened as per following steps:
* 1. If HelloMessage version field has same version, continue connection processing.
- * If HelloMessage version is lower than supported versions, just disconnect.
+ * If HelloMessage version is lower than supported versions, just disconnect.
* 2. If HelloMessage contains bitmap and common version found in bitmap
- * then continue connection processing. if no common version found, just disconnect.
+ * then continue connection processing. if no common version found, just disconnect.
* 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
* 4. If Hello message received again with not supported version, just disconnect.
*/
processPortStatusMsg(message);
enqueueMessage(message);
}
-
+
protected void processPortStatusMsg(PortStatus msg) {
if (msg.getReason().getIntValue() == 2) {
updatePort(msg);
deletePort(msg);
}
}
-
+
protected void updatePort(PortStatus msg) {
- Long portNumber = msg.getPortNo();
+ Long portNumber = msg.getPortNo();
Boolean portBandwidth = portFeaturesUtils.getPortBandwidth(msg);
-
- if(portBandwidth == null) {
+
+ if (portBandwidth == null) {
LOG.debug("can't get bandwidth info from port: {}, aborting port update", msg.toString());
} else {
this.getSessionContext().getPhysicalPorts().put(portNumber, msg);
- this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);
- }
+ this.getSessionContext().getPortsBandwidth().put(portNumber, portBandwidth);
+ }
}
-
+
protected void deletePort(PortGrouping port) {
Long portNumber = port.getPortNo();
-
+
this.getSessionContext().getPhysicalPorts().remove(portNumber);
this.getSessionContext().getPortsBandwidth().remove(portNumber);
}
}
/**
- * @param conductorState
- * the connectionState to set
+ * @param conductorState the connectionState to set
*/
@Override
public void setConductorState(CONDUCTOR_STATE conductorState) {
public void onDisconnectEvent(DisconnectEvent arg0) {
SessionManager sessionManager = OFSessionUtil.getSessionManager();
sessionManager.invalidateOnDisconnect(this);
+ close();
}
@Override
LOG.debug("connection already disconnected");
result = Futures.immediateFuture(true);
}
-
+ close();
return result;
}
@Override
public void onConnectionReady() {
LOG.debug("connection is ready-to-use");
- if (! firstHelloProcessed) {
+ if (!firstHelloProcessed) {
HandshakeStepWrapper handshakeStepWrapper = new HandshakeStepWrapper(
null, handshakeManager, connectionAdapter);
hsPool.execute(handshakeStepWrapper);
@Override
public void onHandshakeSuccessfull(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
postHandshakeBasic(featureOutput, negotiatedVersion);
-
+
// post-handshake actions
- if(version == OFConstants.OFP_VERSION_1_3){
+ if (version == OFConstants.OFP_VERSION_1_3) {
requestPorts();
requestGroupFeatures();
requestMeterFeatures();
}
-
+
requestDesc();
}
+ @Override
+ public void onHandshakeFailure() {
+ LOG.info("OF handshake failed, doing cleanup.");
+ close();
+ }
+
/**
* used by tests
+ *
* @param featureOutput
* @param negotiatedVersion
*/
protected void postHandshakeBasic(GetFeaturesOutput featureOutput,
- Short negotiatedVersion) {
+ Short negotiatedVersion) {
version = negotiatedVersion;
if (version == OFConstants.OFP_VERSION_1_0) {
// Because the GetFeaturesOutput contains information about the port
// BUG-1988 - this must be the first item in queue in order not to get behind link-up message
enqueueMessage(featureOutput);
}
-
+
OFSessionUtil.registerSession(this, featureOutput, negotiatedVersion);
hsPool.shutdown();
hsPool.purge();
builder.setXid(getSessionContext().getNextXid());
getConnectionAdapter().multipartRequest(builder.build());
}
- private void requestGroupFeatures(){
+
+ private void requestGroupFeatures() {
MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
mprInput.setType(MultipartType.OFPMPGROUPFEATURES);
mprInput.setVersion(getVersion());
mprInput.setFlags(new MultipartRequestFlags(false));
mprInput.setXid(getSessionContext().getNextXid());
- MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild =
+ MultipartRequestGroupFeaturesCaseBuilder mprGroupFeaturesBuild =
new MultipartRequestGroupFeaturesCaseBuilder();
mprInput.setMultipartRequestBody(mprGroupFeaturesBuild.build());
- LOG.debug("Send group features statistics request :{}",mprGroupFeaturesBuild);
+ LOG.debug("Send group features statistics request :{}", mprGroupFeaturesBuild);
getConnectionAdapter().multipartRequest(mprInput.build());
-
+
}
- private void requestMeterFeatures(){
+
+ private void requestMeterFeatures() {
MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
mprInput.setType(MultipartType.OFPMPMETERFEATURES);
mprInput.setVersion(getVersion());
new MultipartRequestMeterFeaturesCaseBuilder();
mprInput.setMultipartRequestBody(mprMeterFeaturesBuild.build());
- LOG.debug("Send meter features statistics request :{}",mprMeterFeaturesBuild);
+ LOG.debug("Send meter features statistics request :{}", mprMeterFeaturesBuild);
getConnectionAdapter().multipartRequest(mprInput.build());
-
+
}
+
/**
* @param isBitmapNegotiationEnable the isBitmapNegotiationEnable to set
*/
hsPool.shutdownNow();
LOG.debug("pool is terminated: {}", hsPool.isTerminated());
}
-
+
+ protected void shutdownPoolPolitely() {
+ hsPool.shutdown();
+ try {
+ hsPool.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("Error while awaiting termination on pool. Will use shutdownNow method.");
+ shutdownPool();
+ }
+ hsPool.purge();
+ LOG.debug("pool is terminated: {}", hsPool.isTerminated());
+ }
+
@Override
public void setId(int conductorId) {
this.conductorId = conductorId;
}
+
+ @Override
+ public void close() {
+ shutdownPoolPolitely();
+ conductorState = CONDUCTOR_STATE.RIP;
+ }
}
package org.opendaylight.openflowplugin.openflow.md.util;
-import org.junit.Test;
-
-import java.math.BigInteger;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.math.BigInteger;
+import org.junit.Test;
+
/**
* Created by Martin Bobak mbobak@cisco.com on 6/30/14.
*/
private static final byte[] testBytes00 = {0, 0, 0, 0};
private static final byte[] testBytesFF = {(byte) 255, (byte) 255, (byte) 255, (byte) 255};
+ private static final byte[] test3Bytes = {100, 101, 102};
+ private static final byte[] test3Bytes00 = {0, 0, 0};
+ private static final byte[] test3BytesFF = {(byte) 255, (byte) 255, (byte) 255};
+
private static final BigInteger bigInteger = new BigInteger("1684367103");
private static final BigInteger bigIntFF = new BigInteger("4294967295");
+
+ private static final Integer mediumInteger = new Integer("6579558");
+ private static final Integer mediumIntegerFF = new Integer("16777215");
private static final int int00 = 0;
private static final int shortByteLength = 2;
+ private static final int mediumByteLength = 3;
private static final int intByteLength = 4;
@Test
assertEquals(bigIntFF.shiftRight(16).intValue(), unsigned);
}
+ @Test
+ public void testBytesToUnsignedMedium() {
+ long unsigned = ByteUtil.bytesToUnsignedMedium(test3Bytes);
+ assertEquals(mediumInteger.longValue(), unsigned);
+
+ unsigned = ByteUtil.bytesToUnsignedMedium(test3Bytes00);
+ assertEquals(0, unsigned);
+
+ unsigned = ByteUtil.bytesToUnsignedMedium(test3BytesFF);
+ assertEquals(mediumIntegerFF.longValue(), unsigned);
+ }
+
@Test(expected = IllegalArgumentException.class)
public void exceptionTestBytesToUnsignedShort() {
ByteUtil.bytesToUnsignedShort(testBytes);
bytes = ByteUtil.unsignedShortToBytes(intValue);
assertTrue(bytes.length == shortByteLength);
}
+
+ @Test
+ public void testUnsignedMediumToBytes() {
+ long intValue = 255;
+ byte[] bytes = ByteUtil.unsignedMediumToBytes(intValue);
+
+ assertTrue(bytes.length == mediumByteLength);
+
+ intValue += 256;
+ bytes = ByteUtil.unsignedMediumToBytes(intValue);
+ assertTrue(bytes.length == mediumByteLength);
+
+ intValue += 256;
+ bytes = ByteUtil.unsignedMediumToBytes(intValue);
+ assertTrue(bytes.length == mediumByteLength);
+ }
+
}