import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
/**
- * @author michal.polkorab
+ * Factory for ChannelInitializer instances.
*
+ * @author michal.polkorab
*/
public class ChannelInitializerFactory {
private boolean useBarrier;
/**
+ * Creates a TCP publishing channel initializer.
+ *
* @return PublishingChannelInitializer that initializes new channels
*/
public TcpChannelInitializer createPublishingChannelInitializer() {
}
/**
+ * Creates a UDP channel initializer.
+ *
* @return PublishingChannelInitializer that initializes new channels
*/
public UdpChannelInitializer createUdpChannelInitializer() {
}
/**
- * @param switchIdleTimeOut
+ * Sets the switch idle timeout.
+ *
+ * @param timeout the timeout
*/
- public void setSwitchIdleTimeout(final long switchIdleTimeOut) {
- this.switchIdleTimeOut = switchIdleTimeOut;
+ public void setSwitchIdleTimeout(final long timeout) {
+ this.switchIdleTimeOut = timeout;
}
/**
- * @param deserializationFactory
+ * Sets the DeserializationFactory.
+ *
+ * @param deserializationFactory the DeserializationFactory
*/
public void setDeserializationFactory(final DeserializationFactory deserializationFactory) {
this.deserializationFactory = deserializationFactory;
}
/**
- * @param serializationFactory
+ * Sets the SerializationFactory.
+ *
+ * @param serializationFactory the SerializationFactory
*/
public void setSerializationFactory(final SerializationFactory serializationFactory) {
this.serializationFactory = serializationFactory;
}
/**
- * @param tlsConfig
+ * Sets the TlsConfiguration.
+ *
+ * @param tlsConfig the TlsConfiguration
*/
public void setTlsConfig(final TlsConfiguration tlsConfig) {
this.tlsConfig = tlsConfig;
}
/**
- * @param switchConnectionHandler
+ * Sets the SwitchConnectionHandler.
+ *
+ * @param switchConnectionHandler the SwitchConnectionHandler
*/
public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
this.switchConnectionHandler = switchConnectionHandler;
}
/**
- * @param useBarrier
+ * Sets whether or not to use a barrier.
*/
public void setUseBarrier(final boolean useBarrier) {
this.useBarrier = useBarrier;
}
-}
\ No newline at end of file
+}
package org.opendaylight.openflowjava.protocol.impl.core;
/**
- * @author martin.uhlir
+ * Interface for a connection initializer.
*
+ * @author martin.uhlir
*/
public interface ConnectionInitializer {
/**
- * Initiates connection towards device
+ * Initiates connection towards device.
+ *
* @param host - host IP
* @param port - port number
*/
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
-
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Detects idle state of switch and informs upper layers
+ * Detects idle state of switch and informs upper layers.
+ *
* @author michal.polkorab
*/
public class IdleHandler extends ReadTimeoutHandler {
private boolean first = true;
/**
- * @param readerIdleTime
- * @param unit
+ * Constructor.
*/
public IdleHandler(final long readerIdleTime, final TimeUnit unit) {
super(readerIdleTime, unit);
import org.slf4j.LoggerFactory;
/**
- * @author michal.polkorab
+ * Decoder for datagram packets.
*
+ * @author michal.polkorab
*/
-public class OFDatagramPacketDecoder extends SimpleChannelInboundHandler<VersionMessageUdpWrapper>{
+public class OFDatagramPacketDecoder extends SimpleChannelInboundHandler<VersionMessageUdpWrapper> {
private static final Logger LOG = LoggerFactory.getLogger(OFDatagramPacketDecoder.class);
private DeserializationFactory deserializationFactory;
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void channelRead0(final ChannelHandlerContext ctx, final VersionMessageUdpWrapper msg)
throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("UdpVersionMessageWrapper received");
- LOG.debug("<< {}", ByteBufUtils.byteBufToHexString(msg.getMessageBuffer()));
+ LOG.debug("UdpVersionMessageWrapper received");
+ LOG.debug("<< {}", ByteBufUtils.byteBufToHexString(msg.getMessageBuffer()));
}
try {
MessageConsumer consumer = UdpConnectionMap.getMessageConsumer(msg.getAddress());
consumer.consume(dataObject);
}
- } catch(Exception e) {
+ } catch (RuntimeException e) {
LOG.warn("Message deserialization failed", e);
// TODO: delegate exception to allow easier deserialization
// debugging / deserialization problem awareness
}
}
- /**
- * @param deserializationFactory
- */
public void setDeserializationFactory(final DeserializationFactory deserializationFactory) {
this.deserializationFactory = deserializationFactory;
}
package org.opendaylight.openflowjava.protocol.impl.core;
-import java.util.List;
-
-import org.opendaylight.openflowjava.protocol.impl.core.connection.UdpMessageListenerWrapper;
-import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.concurrent.Future;
+import java.util.List;
+import org.opendaylight.openflowjava.protocol.impl.core.connection.UdpMessageListenerWrapper;
+import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * @author michal.polkorab
+ * Encoder for datagram packets.
*
+ * @author michal.polkorab
*/
public class OFDatagramPacketEncoder extends MessageToMessageEncoder<UdpMessageListenerWrapper> {
private SerializationFactory serializationFactory;
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void encode(ChannelHandlerContext ctx,
UdpMessageListenerWrapper wrapper, List<Object> out) throws Exception {
LOG.trace("Encoding");
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
serializationFactory.messageToBuffer(wrapper.getMsg().getVersion(), buffer, wrapper.getMsg());
out.add(new DatagramPacket(buffer, wrapper.getAddress()));
- } catch(Exception e) {
+ } catch (RuntimeException e) {
LOG.warn("Message serialization failed: {}", e.getMessage());
Future<Void> newFailedFuture = ctx.newFailedFuture(e);
wrapper.getListener().operationComplete(newFailedFuture);
}
}
- /**
- * @param serializationFactory
- */
public void setSerializationFactory(SerializationFactory serializationFactory) {
this.serializationFactory = serializationFactory;
}
-}
\ No newline at end of file
+}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
-
import java.util.List;
-
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.util.EncodeConstants;
import org.opendaylight.openflowjava.protocol.impl.core.connection.ConnectionAdapterFactory;
import org.slf4j.LoggerFactory;
/**
- * @author michal.polkorab
+ * Handler for datagram packets.
*
+ * @author michal.polkorab
*/
public class OFDatagramPacketHandler extends MessageToMessageDecoder<DatagramPacket> {
private static final Logger LOG = LoggerFactory.getLogger(OFDatagramPacketHandler.class);
- /** Length of OpenFlow 1.3 header */
+ /** Length of OpenFlow 1.3 header. */
public static final byte LENGTH_OF_HEADER = 8;
private static final byte LENGTH_INDEX_IN_HEADER = 2;
- private ConnectionAdapterFactory adapterFactory = new ConnectionAdapterFactoryImpl();
- private SwitchConnectionHandler connectionHandler;
+ private final ConnectionAdapterFactory adapterFactory = new ConnectionAdapterFactoryImpl();
+ private final SwitchConnectionHandler connectionHandler;
/**
- * Default constructor
- * @param sch the switchConnectionHandler that decides
- * what to do with incomming message / channel
+ * Default constructor.
+ *
+ * @param sch the switchConnectionHandler that decides what to do with incomming message / channel
*/
public OFDatagramPacketHandler(SwitchConnectionHandler sch) {
this.connectionHandler = sch;
byte version = bb.readByte();
- if ((version == EncodeConstants.OF13_VERSION_ID) || (version == EncodeConstants.OF10_VERSION_ID)) {
+ if (version == EncodeConstants.OF13_VERSION_ID || version == EncodeConstants.OF10_VERSION_ID) {
LOG.debug("detected version: {}", version);
ByteBuf messageBuffer = bb.slice();
out.add(new VersionMessageUdpWrapper(version, messageBuffer, msg.sender()));
/**
* Transforms OpenFlow Protocol messages to POJOs.
+ *
* @author michal.polkorab
*/
public class OFDecoder extends MessageToMessageDecoder<VersionMessageWrapper> {
public OFDecoder() {
LOG.trace("Creating OFDecoder");
- // TODO: pass as argument
+ // TODO: pass as argument
statisticsCounter = StatisticsCounters.getInstance();
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void decode(ChannelHandlerContext ctx, VersionMessageWrapper msg, List<Object> out) throws Exception {
statisticsCounter.incrementCounter(CounterEventTypes.US_RECEIVED_IN_OFJAVA);
if (LOG.isDebugEnabled()) {
out.add(dataObject);
statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_SUCCESS);
}
- } catch (Exception e) {
+ } catch (RuntimeException e) {
LOG.warn("Message deserialization failed", e);
statisticsCounter.incrementCounter(CounterEventTypes.US_DECODE_FAIL);
} finally {
}
}
- /**
- * @param deserializationFactory
- */
public void setDeserializationFactory(DeserializationFactory deserializationFactory) {
this.deserializationFactory = deserializationFactory;
}
/**
* Transforms OpenFlow Protocol messages to POJOs.
+ *
* @author michal.polkorab
* @author timotej.kubas
*/
private SerializationFactory serializationFactory;
private final StatisticsCounters statisticsCounters;
- /** Constructor of class */
public OFEncoder() {
statisticsCounters = StatisticsCounters.getInstance();
LOG.trace("Creating OFEncoder");
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void encode(final ChannelHandlerContext ctx, final MessageListenerWrapper wrapper, final ByteBuf out)
throws Exception {
LOG.trace("Encoding");
try {
serializationFactory.messageToBuffer(wrapper.getMsg().getVersion(), out, wrapper.getMsg());
- if(wrapper.getMsg() instanceof FlowModInput){
+ if (wrapper.getMsg() instanceof FlowModInput) {
statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT);
}
statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS);
- } catch(final Exception e) {
+ } catch (RuntimeException e) {
LOG.warn("Message serialization failed ", e);
statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL);
if (wrapper.getListener() != null) {
}
}
- /**
- * @param serializationFactory
- */
public void setSerializationFactory(final SerializationFactory serializationFactory) {
this.serializationFactory = serializationFactory;
}
/**
* Decodes incoming messages into message frames.
+ *
* @author michal.polkorab
*/
public class OFFrameDecoder extends ByteToMessageDecoder {
- /** Length of OpenFlow header */
+ /** Length of OpenFlow header. */
public static final byte LENGTH_OF_HEADER = 8;
private static final byte LENGTH_INDEX_IN_HEADER = 2;
private static final Logger LOG = LoggerFactory.getLogger(OFFrameDecoder.class);
- private ConnectionFacade connectionFacade;
+ private final ConnectionFacade connectionFacade;
private boolean firstTlsPass = false;
/**
/**
* Detects version of used OpenFlow Protocol and discards unsupported version messages.
+ *
* @author michal.polkorab
*/
public class OFVersionDetector extends ByteToMessageDecoder {
private static final Logger LOG = LoggerFactory.getLogger(OFVersionDetector.class);
- /** IDs of accepted OpenFlow protocol versions */
+ /** IDs of accepted OpenFlow protocol versions. */
private static final List<Byte> OF_VERSIONS = new ArrayList<>(Arrays.asList(
EncodeConstants.OF10_VERSION_ID,
EncodeConstants.OF13_VERSION_ID
import com.google.common.util.concurrent.ListenableFuture;
/**
+ * OnlineProvider.
+ *
* @author mirehak
*/
public interface OnlineProvider {
- /**
- * @return the isOnlineFuture
- */
ListenableFuture<Boolean> getIsOnlineFuture();
-
}
public enum PipelineHandlers {
/**
- * Detects switch idle state
+ * Detects switch idle state.
*/
IDLE_HANDLER,
+
/**
- * Component for handling TLS frames
+ * Component for handling TLS frames.
*/
SSL_HANDLER,
+
/**
- * Decodes incoming messages into message frames
+ * Decodes incoming messages into message frames.
*/
OF_FRAME_DECODER,
+
/**
- * Detects version of incoming OpenFlow Protocol message
+ * Detects version of incoming OpenFlow Protocol message.
*/
OF_VERSION_DETECTOR,
+
/**
- * Transforms OpenFlow Protocol byte messages into POJOs
+ * Transforms OpenFlow Protocol byte messages into POJOs.
*/
OF_DECODER,
+
/**
- * Transforms POJOs into OpenFlow Protocol byte messages
+ * Transforms POJOs into OpenFlow Protocol byte messages.
*/
OF_ENCODER,
+
/**
- * Delegates translated POJOs into MessageConsumer
+ * Delegates translated POJOs into MessageConsumer.
*/
DELEGATING_INBOUND_HANDLER,
+
/**
- * Performs configurable efficient flushing
+ * Performs configurable efficient flushing.
*/
CHANNEL_OUTBOUND_QUEUE_MANAGER,
+
/**
- * Decodes incoming messages into message frames
- * and filters them based on version supported
+ * Decodes incoming messages into message frames and filters them based on version supported.
*/
OF_DATAGRAMPACKET_HANDLER,
+
/**
- * Transforms OpenFlow Protocol datagram messages into POJOs
+ * Transforms OpenFlow Protocol datagram messages into POJOs.
*/
OF_DATAGRAMPACKET_DECODER,
+
/**
- * Transforms POJOs into OpenFlow Protocol datagrams
+ * Transforms POJOs into OpenFlow Protocol datagrams.
*/
OF_DATAGRAMPACKET_ENCODER
-}
\ No newline at end of file
+}
import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
/**
+ * Initializer for protocol channels.
+ *
* @param <C> Channel type
* @author michal.polkorab
*/
private boolean useBarrier;
/**
+ * Sets the SwitchConnectionHandler.
+ *
* @param switchConnectionHandler the switchConnectionHandler to set
*/
public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
}
/**
+ * Sets the switch idle timeout.
+ *
* @param switchIdleTimeout the switchIdleTimeout to set
*/
public void setSwitchIdleTimeout(final long switchIdleTimeout) {
this.switchIdleTimeout = switchIdleTimeout;
}
- /**
- * @param serializationFactory
- */
public void setSerializationFactory(final SerializationFactory serializationFactory) {
this.serializationFactory = serializationFactory;
}
- /**
- * @param deserializationFactory
- */
public void setDeserializationFactory(final DeserializationFactory deserializationFactory) {
this.deserializationFactory = deserializationFactory;
}
- /**
- * @param tlsConfiguration
- */
public void setTlsConfiguration(final TlsConfiguration tlsConfiguration) {
this.tlsConfiguration = tlsConfiguration;
}
- /**
- * @return switch connection handler
- */
public SwitchConnectionHandler getSwitchConnectionHandler() {
return switchConnectionHandler;
}
- /**
- * @return switch idle timeout
- */
public long getSwitchIdleTimeout() {
return switchIdleTimeout;
}
- /**
- * @return serialization factory
- */
public SerializationFactory getSerializationFactory() {
return serializationFactory;
}
- /**
- * @return deserialization factory
- */
public DeserializationFactory getDeserializationFactory() {
return deserializationFactory;
}
- /**
- * @return TLS configuration
- */
public TlsConfiguration getTlsConfiguration() {
return tlsConfiguration;
}
- /**
- * @param useBarrier
- */
public void setUseBarrier(final boolean useBarrier) {
this.useBarrier = useBarrier;
}
- /**
- * @return useBarrrier
- */
public boolean useBarrier() {
return useBarrier;
}
-}
\ No newline at end of file
+}
import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
/**
+ * Server facade interface.
+ *
* @author mirehak
*/
public interface ServerFacade extends ShutdownProvider, OnlineProvider, Runnable {
/**
- * Sets thread configuration
+ * Sets thread configuration.
+ *
* @param threadConfig desired thread configuration
*/
void setThreadConfig(ThreadConfiguration threadConfig);
/**
+ * Shutdown provider interface.
+ *
* @author mirehak
*/
public interface ShutdownProvider {
- /**
- * @return shutdown future
- */
ListenableFuture<Boolean> shutdown();
-
}
package org.opendaylight.openflowjava.protocol.impl.core;
import java.io.IOException;
+import java.security.KeyManagementException;
import java.security.KeyStore;
+import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.Security;
+import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
-
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
-
import org.opendaylight.openflowjava.protocol.api.connection.TlsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// "TLS" - supports some version of TLS
// Use "TLSv1", "TLSv1.1", "TLSv1.2" for specific TLS version
private static final String PROTOCOL = "TLS";
- private TlsConfiguration tlsConfig;
+ private final TlsConfiguration tlsConfig;
private static final Logger LOG = LoggerFactory
.getLogger(SslContextFactory.class);
/**
+ * Sets the TlsConfiguration.
+ *
* @param tlsConfig
* TLS configuration object, contains keystore locations +
* keystore types
this.tlsConfig = tlsConfig;
}
- /**
- * @return servercontext
- */
public SSLContext getServerContext() {
String algorithm = Security
.getProperty("ssl.KeyManagerFactory.algorithm");
} catch (CertificateException e) {
LOG.warn("CertificateException - Unable to access certificate (check password)."
+ " Failed to initialize the server-side SSLContext", e);
- } catch (Exception e) {
+ } catch (KeyManagementException | KeyStoreException | UnrecoverableKeyException e) {
LOG.warn("Exception - Failed to initialize the server-side SSLContext", e);
}
return serverContext;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.PathType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Class for storing keys
+ * Class for storing keys.
*
* @author michal.polkorab
*/
}
/**
- * InputStream instance of key - key location is on classpath
+ * InputStream instance of key - key location is on classpath.
+ *
* @param filename keystore location
* @param pathType keystore location type - "classpath" or "path"
*
public static InputStream asInputStream(String filename, PathType pathType) {
InputStream in;
switch (pathType) {
- case CLASSPATH:
- in = SslKeyStore.class.getResourceAsStream(filename);
- if (in == null) {
- throw new IllegalStateException("KeyStore file not found: "
- + filename);
- }
- break;
- case PATH:
- LOG.debug("Current dir using System:"
- + System.getProperty("user.dir"));
- File keystorefile = new File(filename);
- try {
- in = new FileInputStream(keystorefile);
- } catch (FileNotFoundException e) {
- throw new IllegalStateException("KeyStore file not found: "
- + filename,e);
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown path type: " + pathType);
+ case CLASSPATH:
+ in = SslKeyStore.class.getResourceAsStream(filename);
+ if (in == null) {
+ throw new IllegalStateException("KeyStore file not found: "
+ + filename);
+ }
+ break;
+ case PATH:
+ LOG.debug("Current dir using System:"
+ + System.getProperty("user.dir"));
+ File keystorefile = new File(filename);
+ try {
+ in = new FileInputStream(keystorefile);
+ } catch (FileNotFoundException e) {
+ throw new IllegalStateException("KeyStore file not found: "
+ + filename,e);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown path type: " + pathType);
}
return in;
}
package org.opendaylight.openflowjava.protocol.impl.core;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Throwables;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
private final SwitchConnectionConfig config;
private InetAddress address;
- private ConnectionConfigurationImpl(SwitchConnectionConfig config) {
+ ConnectionConfigurationImpl(SwitchConnectionConfig config) {
this.config = config;
try {
address = extractIpAddressBin(config.getAddress());
- } catch(UnknownHostException e) {
- Throwables.propagate(e);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
}
}
@Override
public TlsConfiguration getTlsConfiguration() {
final Tls tlsConfig = config.getTls();
- if(tlsConfig == null || !(TransportProtocol.TLS.equals(getTransferProtocol()))) {
+ if (tlsConfig == null || !TransportProtocol.TLS.equals(getTransferProtocol())) {
return null;
}
public KeystoreType getTlsTruststoreType() {
return MoreObjects.firstNonNull(tlsConfig.getTruststoreType(), null);
}
+
@Override
public String getTlsTruststore() {
return MoreObjects.firstNonNull(tlsConfig.getTruststore(), null);
}
+
@Override
public KeystoreType getTlsKeystoreType() {
return MoreObjects.firstNonNull(tlsConfig.getKeystoreType(), null);
}
+
@Override
public String getTlsKeystore() {
return MoreObjects.firstNonNull(tlsConfig.getKeystore(), null);
}
+
@Override
- public org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.PathType getTlsKeystorePathType() {
+ public org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.PathType
+ getTlsKeystorePathType() {
return MoreObjects.firstNonNull(tlsConfig.getKeystorePathType(), null);
}
+
@Override
- public org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.PathType getTlsTruststorePathType() {
+ public org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.PathType
+ getTlsTruststorePathType() {
return MoreObjects.firstNonNull(tlsConfig.getTruststorePathType(), null);
}
+
@Override
public String getKeystorePassword() {
return MoreObjects.firstNonNull(tlsConfig.getKeystorePassword(), null);
}
+
@Override
public String getCertificatePassword() {
return MoreObjects.firstNonNull(tlsConfig.getCertificatePassword(), null);
}
+
@Override
public String getTruststorePassword() {
return MoreObjects.firstNonNull(tlsConfig.getTruststorePassword(), null);
}
+
@Override
public List<String> getCipherSuites() {
return tlsConfig.getCipherSuites();
@Override
public ThreadConfiguration getThreadConfiguration() {
final Threads threads = config.getThreads();
- if(threads == null) {
+ if (threads == null) {
return null;
}
import org.opendaylight.openflowjava.protocol.api.keys.MatchEntrySerializerKey;
import org.opendaylight.openflowjava.protocol.api.keys.MessageCodeKey;
import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
+import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializerRegistryImpl;
import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
import org.opendaylight.openflowjava.protocol.impl.serialization.SerializerRegistryImpl;
-import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.TransportProtocol;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.MatchField;
import org.slf4j.LoggerFactory;
/**
- * Exposed class for server handling<br>
+ * Exposed class for server handling. <br>
* C - {@link MatchEntrySerializerKey} parameter representing oxm_class (see specification)<br>
* F - {@link MatchEntrySerializerKey} parameter representing oxm_field (see specification)
* @author mirehak
@Override
public ListenableFuture<Boolean> shutdown() {
LOG.debug("Shutdown summoned");
- if(serverFacade == null){
+ if (serverFacade == null) {
LOG.warn("Can not shutdown - not configured or started");
throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
}
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public ListenableFuture<Boolean> startup() {
LOG.debug("Startup summoned");
ListenableFuture<Boolean> result = null;
}
new Thread(serverFacade).start();
result = serverFacade.getIsOnlineFuture();
- } catch (final Exception e) {
+ } catch (RuntimeException e) {
final SettableFuture<Boolean> exResult = SettableFuture.create();
exResult.setException(e);
result = exResult;
return result;
}
- /**
- * @return
- */
private ServerFacade createAndConfigureServer() {
LOG.debug("Configuring ..");
ServerFacade server = null;
// TODO : Add option to disable Epoll.
boolean isEpollEnabled = Epoll.isAvailable();
- if ((TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol))) {
+ if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
((TcpHandler) server).setChannelInitializer(channelInitializer);
connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
connectionInitializer.setChannelInitializer(channelInitializer);
connectionInitializer.run();
- } else if (TransportProtocol.UDP.equals(transportProtocol)){
+ } else if (TransportProtocol.UDP.equals(transportProtocol)) {
server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
return server;
}
- /**
- * @return servers
- */
public ServerFacade getServerFacade() {
return serverFacade;
}
}
@Override
- public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(final MatchEntrySerializerKey<C, F> key,
- final OFGeneralSerializer serializer) {
+ public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(
+ final MatchEntrySerializerKey<C, F> key, final OFGeneralSerializer serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
public void registerExperimenterMessageDeserializer(ExperimenterIdDeserializerKey key,
- OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+ OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}
@Override
public void registerMultipartReplyMessageDeserializer(ExperimenterIdDeserializerKey key,
- OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+ OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}
}
@Override
- public void registerExperimenterMessageSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
- OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+ public void registerExperimenterMessageSerializer(
+ ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+ OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
serializerRegistry.registerSerializer(key, serializer);
}
- @Override
/**
- * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order to avoid
- * the occurrence of an error, we should discard this function
+ * Deprecated.
+ *
+ * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order
+ * to avoid the occurrence of an error, we should discard this function.
*/
+ @Override
@Deprecated
public void registerMeterBandSerializer(final ExperimenterIdSerializerKey<MeterBandExperimenterCase> key,
final OFSerializer<MeterBandExperimenterCase> serializer) {
}
@Override
- public void registerMeterBandSerializer(final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
- final OFSerializer<MeterBandExperimenterCase> serializer) {
+ public void registerMeterBandSerializer(
+ final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
+ final OFSerializer<MeterBandExperimenterCase> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
return this.connConfig;
}
- @Override
+ @Override
public <K> void registerSerializer(MessageTypeKey<K> key, OFGeneralSerializer serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
- deserializerRegistry.registerDeserializer(key, deserializer);
+ deserializerRegistry.registerDeserializer(key, deserializer);
}
@Override
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.List;
* Default constructor.
*/
public TcpChannelInitializer() {
- this( new DefaultChannelGroup("netty-receiver", null), new ConnectionAdapterFactoryImpl() );
+ this(new DefaultChannelGroup("netty-receiver", null), new ConnectionAdapterFactoryImpl());
}
/**
* Testing constructor.
*/
- protected TcpChannelInitializer( final DefaultChannelGroup channelGroup, final ConnectionAdapterFactory connAdaptorFactory ) {
- allChannels = channelGroup ;
- connectionAdapterFactory = connAdaptorFactory ;
+ protected TcpChannelInitializer(final DefaultChannelGroup channelGroup,
+ final ConnectionAdapterFactory connAdaptorFactory) {
+ allChannels = channelGroup;
+ connectionAdapterFactory = connAdaptorFactory;
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void initChannel(final SocketChannel ch) {
if (ch.remoteAddress() != null) {
final InetAddress switchAddress = ch.remoteAddress().getAddress();
final int port = ch.localAddress().getPort();
final int remotePort = ch.remoteAddress().getPort();
LOG.debug("Incoming connection from (remote address): {}:{} --> :{}",
- switchAddress.toString(), remotePort, port);
+ switchAddress.toString(), remotePort, port);
if (!getSwitchConnectionHandler().accept(switchAddress)) {
ch.disconnect();
final SslHandler ssl = new SslHandler(engine);
final Future<Channel> handshakeFuture = ssl.handshakeFuture();
final ConnectionFacade finalConnectionFacade = connectionFacade;
- handshakeFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
- @Override
- public void operationComplete(final Future<? super Channel> future) throws Exception {
- finalConnectionFacade.fireConnectionReadyNotification();
- }
- });
+ handshakeFuture.addListener(future -> finalConnectionFacade.fireConnectionReadyNotification());
ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), ssl);
}
ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(),
if (!tlsPresent) {
connectionFacade.fireConnectionReadyNotification();
}
- } catch (final Exception e) {
+ } catch (RuntimeException e) {
LOG.warn("Failed to initialize channel", e);
ch.close();
}
}
/**
+ * Returns the connection iterator.
+ *
* @return iterator through active connections
*/
public Iterator<Channel> getConnectionIterator() {
}
/**
+ * Returns the number of active channels.
+ *
* @return amount of active channels
*/
public int size() {
package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
- * Initializes (TCP) connection to device
- * @author martin.uhlir
+ * Initializes (TCP) connection to device.
*
+ * @author martin.uhlir
*/
public class TcpConnectionInitializer implements ServerFacade,
ConnectionInitializer {
private static final Logger LOG = LoggerFactory
.getLogger(TcpConnectionInitializer.class);
- private EventLoopGroup workerGroup;
+ private final EventLoopGroup workerGroup;
private ThreadConfiguration threadConfig;
private TcpChannelInitializer channelInitializer;
- private Bootstrap b;
- private boolean isEpollEnabled;
+ private Bootstrap bootstrap;
+ private final boolean isEpollEnabled;
/**
- * Constructor
+ * Constructor.
+ *
* @param workerGroup - shared worker group
*/
public TcpConnectionInitializer(EventLoopGroup workerGroup, boolean isEpollEnabled) {
@Override
public void run() {
- b = new Bootstrap();
- if(isEpollEnabled) {
- b.group(workerGroup).channel(EpollSocketChannel.class)
+ bootstrap = new Bootstrap();
+ if (isEpollEnabled) {
+ bootstrap.group(workerGroup).channel(EpollSocketChannel.class)
.handler(channelInitializer);
} else {
- b.group(workerGroup).channel(NioSocketChannel.class)
+ bootstrap.group(workerGroup).channel(NioSocketChannel.class)
.handler(channelInitializer);
}
}
@Override
public void initiateConnection(String host, int port) {
try {
- b.connect(host, port).sync();
+ bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
LOG.error("Unable to initiate connection", e);
}
}
- /**
- * @param channelInitializer
- */
public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
this.channelInitializer = channelInitializer;
}
package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
-
import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* Class implementing server over TCP / TLS for handling incoming connections.
*
final ChannelFuture f;
try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup)
.channel(socketChannelClass)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(channelInitializer)
.childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
if (startupAddress != null) {
- f = b.bind(startupAddress.getHostAddress(), port).sync();
+ f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
} else {
- f = b.bind(port).sync();
+ f = bootstrap.bind(port).sync();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while binding port {}", port, e);
}
/**
- * Shuts down {@link TcpHandler}}
+ * Shuts down {@link TcpHandler}}.
*/
@Override
public ListenableFuture<Boolean> shutdown() {
final SettableFuture<Boolean> result = SettableFuture.create();
workerGroup.shutdownGracefully();
// boss will shutdown as soon, as worker is down
- bossGroup.shutdownGracefully().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Object>>() {
-
- @Override
- public void operationComplete(
- final io.netty.util.concurrent.Future<Object> downResult) throws Exception {
- result.set(downResult.isSuccess());
- if (downResult.cause() != null) {
- result.setException(downResult.cause());
- }
+ bossGroup.shutdownGracefully().addListener(downResult -> {
+ result.set(downResult.isSuccess());
+ if (downResult.cause() != null) {
+ result.setException(downResult.cause());
}
-
});
return result;
}
/**
+ * Returns the number of connected clients / channels.
*
* @return number of connected clients / channels
*/
return isOnlineFuture;
}
- /**
- * @return the port
- */
public int getPort() {
return port;
}
- /**
- * @return the address
- */
public String getAddress() {
return address;
}
- /**
- * @param channelInitializer
- */
public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
this.channelInitializer = channelInitializer;
}
}
/**
- * Initiate event loop groups
+ * Initiate event loop groups.
+ *
* @param threadConfiguration number of threads to be created, if not specified in threadConfig
*/
public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
- if(isEpollEnabled) {
+ if (isEpollEnabled) {
initiateEpollEventLoopGroups(threadConfiguration);
} else {
initiateNioEventLoopGroups(threadConfiguration);
}
/**
- * Initiate Nio event loop groups
+ * Initiate Nio event loop groups.
+ *
* @param threadConfiguration number of threads to be created, if not specified in threadConfig
*/
public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
}
/**
- * Initiate Epoll event loop groups with Nio as fall back
- * @param threadConfiguration
+ * Initiate Epoll event loop groups with Nio as fall back.
+ *
+ * @param threadConfiguration the ThreadConfiguration
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
try {
socketChannelClass = EpollServerSocketChannel.class;
if (threadConfiguration != null) {
- bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
+ bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
} else {
bossGroup = new EpollEventLoopGroup();
}
((EpollEventLoopGroup)workerGroup).setIoRatio(100);
return;
- } catch (Throwable ex) {
+ } catch (RuntimeException ex) {
LOG.debug("Epoll initiation failed");
}
initiateNioEventLoopGroups(threadConfiguration);
}
- /**
- * @return workerGroup
- */
public EventLoopGroup getWorkerGroup() {
return workerGroup;
}
-
}
import io.netty.channel.socket.DatagramChannel;
/**
- * @author michal.polkorab
+ * UDP implementation of ChannelInitializer.
*
+ * @author michal.polkorab
*/
public class UdpChannelInitializer extends ProtocolChannelInitializer<DatagramChannel> {
ofDatagramPacketEncoder.setSerializationFactory(getSerializationFactory());
ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofDatagramPacketEncoder);
}
-}
\ No newline at end of file
+}
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.opendaylight.openflowjava.protocol.impl.core.connection.MessageConsumer;
/**
* As UDP communication is handled only by one channel, it is needed
- * to store MessageConsumers, so that we know which consumer handles which channel
+ * to store MessageConsumers, so that we know which consumer handles which channel.
* @author michal.polkorab
*/
}
/**
+ * Gets the MessageConsumer for the given address.
+ *
* @param address sender's address
* @return corresponding MessageConsumer
*/
public static MessageConsumer getMessageConsumer(InetSocketAddress address) {
- if(address == null){
+ if (address == null) {
throw new IllegalArgumentException("Address can not be null");
}
return connectionMap.get(address);
}
/**
+ * Adds a connection.
+ *
* @param address sender's address
* @param consumer MessageConsumer to be added / paired with specified address
*/
public static void addConnection(InetSocketAddress address, MessageConsumer consumer) {
- if(address == null){
+ if (address == null) {
throw new IllegalArgumentException("Address can not be null");
}
connectionMap.put(address, consumer);
}
/**
+ * Removes a connection.
+ *
* @param address sender's address
*/
public static void removeConnection(InetSocketAddress address) {
- if(address == null){
+ if (address == null) {
throw new IllegalArgumentException("Address can not be null");
}
connectionMap.remove(address);
}
-}
\ No newline at end of file
+}
package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
-import io.netty.util.concurrent.GenericFutureListener;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
-
import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* Class implementing server over UDP for handling incoming connections.
*
public void run() {
final ChannelFuture f;
try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(datagramChannelClass)
- .option(ChannelOption.SO_BROADCAST, false)
- .handler(channelInitializer);
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(group).channel(datagramChannelClass).option(ChannelOption.SO_BROADCAST, false)
+ .handler(channelInitializer);
if (startupAddress != null) {
- f = b.bind(startupAddress.getHostAddress(), port).sync();
+ f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
} else {
- f = b.bind(port).sync();
+ f = bootstrap.bind(port).sync();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while binding port {}", port, e);
@Override
public ListenableFuture<Boolean> shutdown() {
final SettableFuture<Boolean> result = SettableFuture.create();
- group.shutdownGracefully().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Object>>() {
-
- @Override
- public void operationComplete(
- final io.netty.util.concurrent.Future<Object> downResult) throws Exception {
- result.set(downResult.isSuccess());
- if (downResult.cause() != null) {
- result.setException(downResult.cause());
- }
+ group.shutdownGracefully().addListener(downResult -> {
+ result.set(downResult.isSuccess());
+ if (downResult.cause() != null) {
+ result.setException(downResult.cause());
}
-
});
return result;
}
return isOnlineFuture;
}
- /**
- * @return the port
- */
public int getPort() {
return port;
}
- /**
- * @param channelInitializer
- */
public void setChannelInitializer(UdpChannelInitializer channelInitializer) {
this.channelInitializer = channelInitializer;
}
}
/**
- * Initiate event loop groups
+ * Initiate event loop groups.
+ *
* @param threadConfiguration number of threads to be created, if not specified in threadConfig
*/
public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
- if(isEpollEnabled) {
+ if (isEpollEnabled) {
initiateEpollEventLoopGroups(threadConfiguration);
} else {
initiateNioEventLoopGroups(threadConfiguration);
}
/**
- * Initiate Nio event loop groups
+ * Initiate Nio event loop groups.
+ *
* @param threadConfiguration number of threads to be created, if not specified in threadConfig
*/
public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
}
/**
- * Initiate Epoll event loop groups with Nio as fall back
- * @param threadConfiguration
+ * Initiate Epoll event loop groups with Nio as fall back.
+ *
+ * @param threadConfiguration the ThreadConfiguration
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
try {
datagramChannelClass = EpollDatagramChannel.class;
group = new EpollEventLoopGroup();
}
return;
- } catch (Throwable ex) {
+ } catch (RuntimeException ex) {
LOG.debug("Epoll initiation failed");
}
//Fallback mechanism
initiateNioEventLoopGroups(threadConfiguration);
}
-}
\ No newline at end of file
+}
import java.net.InetSocketAddress;
/**
- * Wraps received messages (includes version) and sender address
-
+ * Wraps received messages (includes version) and sender address.
+ *
* @author michal.polkorab
*/
public class VersionMessageUdpWrapper extends VersionMessageWrapper {
private final InetSocketAddress address;
/**
+ * Constructor.
+ *
* @param version Openflow wire version
* @param messageBuffer ByteBuf containing binary message
* @param address sender address
this.address = address;
}
- /**
- * @return sender address
- */
public InetSocketAddress getAddress() {
return address;
}
-}
\ No newline at end of file
+}
import io.netty.buffer.ByteBuf;
/**
- * Wraps received messages (includes version)
+ * Wraps received messages (includes version).
+ *
* @author michal.polkorab
*/
public class VersionMessageWrapper {
private final ByteBuf messageBuffer;
/**
- * Constructor
+ * Constructor.
+ *
* @param version version decoded in {@link OFVersionDetector}
* @param messageBuffer message received from {@link OFFrameDecoder}
*/
}
/**
+ * Returns the version version decoded in {@link OFVersionDetector}.
+ *
* @return the version version decoded in {@link OFVersionDetector}
*/
public short getVersion() {
}
/**
+ * Returns the messageBuffer message received from {@link OFFrameDecoder}.
+ *
* @return the messageBuffer message received from {@link OFFrameDecoder}
*/
public ByteBuf getMessageBuffer() {
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionAdapter.class);
- /** after this time, RPC future response objects will be thrown away (in minutes) */
+ /** after this time, RPC future response objects will be thrown away (in minutes). */
private static final int RPC_RESPONSE_EXPIRATION = 1;
private static final Exception QUEUE_FULL_EXCEPTION = new RejectedExecutionException("Output queue is full");
*/
private static final int DEFAULT_QUEUE_DEPTH = 1024;
- protected static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER = new RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>>() {
- @Override
- public void onRemoval(final RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
+ protected static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER =
+ notification -> {
if (!notification.getCause().equals(RemovalCause.EXPLICIT)) {
notification.getValue().discard();
}
- }
- };
+ };
protected final Channel channel;
protected final InetSocketAddress address;
protected boolean disconnectOccured = false;
protected final ChannelOutboundQueue output;
- /** expiring cache for future rpcResponses */
+ /** expiring cache for future rpcResponses. */
protected Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
}
/**
- * Used only for testing purposes
+ * Used only for testing purposes.
+ *
* @param cache replacement
*/
@VisibleForTesting
}
/**
- * Return cached RpcListener or {@code null} if not cached
- * @return
+ * Return cached RpcListener or {@code null} if not cached.
*/
protected ResponseExpectedRpcListener<?> findRpcResponse(final RpcResponseKey key) {
return responseCache.getIfPresent(key);
}
/**
- * sends given message to switch, sending result or switch response will be reported via return value
+ * Sends given message to switch, sending result or switch response will be reported via return value.
*
* @param input message to send
* @param responseClazz type of response
* </li>
* </ul>
*/
- protected <IN extends OfHeader, OUT extends OfHeader> ListenableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
- final IN input, final Class<OUT> responseClazz, final String failureInfo) {
+ protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>>
+ sendToSwitchExpectRpcResultFuture(final I input, final Class<O> responseClazz,
+ final String failureInfo) {
final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
- final ResponseExpectedRpcListener<OUT> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
+ final ResponseExpectedRpcListener<O> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
responseCache, key);
return enqueueMessage(listener);
}
/**
- * sends given message to switch, sending result will be reported via return value
+ * Sends given message to switch, sending result will be reported via return value.
*
* @param input message to send
* @param failureInfo describes, what type of message caused failure by sending
return promise.getResult();
}
- /**
- * @param resultFuture
- * @param failureInfo
- * @param errorSeverity
- * @param message
- * @return
- */
private static SettableFuture<Boolean> handleTransportChannelFuture(final ChannelFuture resultFuture) {
final SettableFuture<Boolean> transportResult = SettableFuture.create();
- resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
-
- @Override
- public void operationComplete(final io.netty.util.concurrent.Future<? super Void> future) throws Exception {
- transportResult.set(future.isSuccess());
- if (!future.isSuccess()) {
- transportResult.setException(future.cause());
- }
+ resultFuture.addListener(future -> {
+ transportResult.set(future.isSuccess());
+ if (!future.isSuccess()) {
+ transportResult.setException(future.cause());
}
});
return transportResult;
}
@Override
- protected <IN extends OfHeader, OUT extends OfHeader> ListenableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
- final IN input, final Class<OUT> responseClazz, final String failureInfo) {
+ protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>> sendToSwitchExpectRpcResultFuture(
+ final I input, final Class<O> responseClazz, final String failureInfo) {
statisticsCounters.incrementCounter(CounterEventTypes.DS_ENTERED_OFJAVA);
return super.sendToSwitchExpectRpcResultFuture(input, responseClazz, failureInfo);
}
/**
* Method is equivalent to {@link MessageConsumer#consume(DataObject)} to prevent missing method
* in every children of {@link AbstractConnectionAdapterStatistics} class, because we overriding
- * original method for {@link StatisticsCounters}
+ * original method for {@link StatisticsCounters}.
*
* @param message from device to processing
*/
private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
- private static enum PipelineState {
+ private enum PipelineState {
/**
* Netty thread is potentially idle, no assumptions
* can be made about its state.
protected boolean shuttingDown;
// Passed to executor to request triggering of flush
- protected final Runnable flushRunnable = new Runnable() {
- @Override
- public void run() {
- flush();
- }
- };
+ protected final Runnable flushRunnable = () -> flush();
AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
this.parent = Preconditions.checkNotNull(parent);
}
/**
- * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
+ * Method has to initialize some child of {@link AbstractStackedOutboundQueue}.
*
* @return correct implementation of StacketOutboundqueue
*/
final PipelineState localState = state;
LOG.debug("Synchronize on pipeline state {}", localState);
switch (localState) {
- case READING:
- // Netty thread is currently reading, it will flush the pipeline once it
- // finishes reading. This is a no-op situation.
- break;
- case WRITING:
- case IDLE:
- default:
- // We cannot rely on the change being flushed, schedule a request
- scheduleFlush();
+ case READING:
+ // Netty thread is currently reading, it will flush the pipeline once it
+ // finishes reading. This is a no-op situation.
+ break;
+ case WRITING:
+ case IDLE:
+ default:
+ // We cannot rely on the change being flushed, schedule a request
+ scheduleFlush();
}
}
* Wraps outgoing message and includes listener attached to this message
* which is send to OFEncoder for serialization. Correct wrapper is
* selected by communication pipeline.
- *
- * @param message
- * @param now
*/
void writeMessage(final OfHeader message, final long now) {
final Object wrapper = makeMessageListenerWrapper(message);
* Wraps outgoing message and includes listener attached to this message
* which is send to OFEncoder for serialization. Correct wrapper is
* selected by communication pipeline.
- *
- * @return
*/
protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
Preconditions.checkArgument(msg != null);
}
/* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
- private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
-
- private final Logger LOG = LoggerFactory.getLogger(GenericFutureListener.class);
-
- @Override
- public void operationComplete(final Future<Void> future) throws Exception {
- if (future.cause() != null) {
- LOG.warn("Message encoding fail !", future.cause());
- }
+ private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = future -> {
+ if (future.cause() != null) {
+ LOG.warn("Message encoding fail !", future.cause());
}
};
} else {
close();
if (currentQueue.finishShutdown(parent.getChannel())) {
- LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
} else {
- LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
- rescheduleFlush();
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
}
}
}
* A MessageHolder (used in queue) and the actual listener. It is not a thing of beauty,
* but it keeps us from allocating unnecessary objects in the egress path.
*/
-abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<Void>>, ChannelOutboundQueue.MessageHolder<Object> {
+abstract class AbstractRpcListener<T> implements GenericFutureListener<Future<Void>>,
+ ChannelOutboundQueue.MessageHolder<Object> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRpcListener.class);
private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
private static final String TAG = "OPENFLOW";
private Object message;
/**
- * Create RcpError object
- * @param info
+ * Create RcpError object.
+ *
+ * @param info error info
* @param severity - error severity
- * @param message
+ * @param message error message
* @param cause - details of reason
- * @return
+ * @return RpcError
*/
static RpcError buildRpcError(final String info, final String message, final Throwable cause) {
return RpcResultBuilder.newError(ErrorType.RPC, TAG, message, APPLICATION_TAG, info, cause);
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
-
import io.netty.channel.Channel;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.function.Function;
-
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
-
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractStackedOutboundQueue implements OutboundQueue {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStackedOutboundQueue.class);
- protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_OFFSET_UPDATER = AtomicLongFieldUpdater
- .newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
+ protected static final AtomicLongFieldUpdater<AbstractStackedOutboundQueue> LAST_XID_OFFSET_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractStackedOutboundQueue.class, "lastXid");
@GuardedBy("unflushedSegments")
protected volatile StackedSegment firstSegment;
@GuardedBy("unflushedSegments")
protected void ensureSegment(final StackedSegment first, final int offset) {
final int segmentOffset = offset / StackedSegment.SEGMENT_SIZE;
- LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset, unflushedSegments.size());
+ LOG.debug("Queue {} slow offset {} maps to {} segments {}", this, offset, segmentOffset,
+ unflushedSegments.size());
for (int i = unflushedSegments.size(); i <= segmentOffset; ++i) {
- final StackedSegment newSegment = StackedSegment.create(first.getBaseXid() + (StackedSegment.SEGMENT_SIZE * i));
+ final StackedSegment newSegment = StackedSegment.create(first.getBaseXid()
+ + StackedSegment.SEGMENT_SIZE * i);
LOG.debug("Adding segment {}", newSegment);
unflushedSegments.add(newSegment);
}
* in the corresponding EventLoop.
*
* @param channel Channel onto which we are writing
- * @param now
+ * @param now time stamp
* @return Number of entries written out
*/
int writeEntries(@Nonnull final Channel channel, final long now) {
while (channel.isWritable()) {
final OutboundQueueEntry entry = segment.getEntry(flushOffset);
if (!entry.isCommitted()) {
- LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid() + flushOffset, segment, flushOffset);
+ LOG.debug("Queue {} XID {} segment {} offset {} not committed yet", this, segment.getBaseXid()
+ + flushOffset, segment, flushOffset);
break;
}
/**
* Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
- * and fails all not completed entries (if in final phase)
+ * and fails all not completed entries (if in final phase).
+ *
* @param channel netty channel
* @return true if in final phase, false if a flush is needed
*/
protected OutboundQueueEntry getEntry(final Long xid) {
final StackedSegment fastSegment = firstSegment;
final long calcOffset = xid - fastSegment.getBaseXid();
- Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
+ Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s",
+ xid, fastSegment.getBaseXid());
Verify.verify(calcOffset <= Integer.MAX_VALUE);
final int fastOffset = (int) calcOffset;
}
final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
- LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+ LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this,
+ xid, slowOffset, segment, segOffset);
return segment.getEntry(segOffset);
}
return fastSegment.getEntry(fastOffset);
}
/**
- * Fails not completed entries in segments and frees completed segments
+ * Fails not completed entries in segments and frees completed segments.
+ *
* @param iterator list of segments to be failed
* @return number of failed entries
*/
return entries;
}
-
}
* writes to be enqueued from any thread, it then schedules a task pipeline task,
* which shuffles messages from the queue into the pipeline.
*
+ * <p>
* Note this is an *Inbound* handler, as it reacts to channel writability changing,
* which in the Netty vocabulary is an inbound event. This has already changed in
* the Netty 5.0.0 API, where Handlers are unified.
private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
// Passed to executor to request triggering of flush
- private final Runnable flushRunnable = new Runnable() {
- @Override
- public void run() {
- ChannelOutboundQueue.this.flush();
- }
- };
+ private final Runnable flushRunnable = () -> ChannelOutboundQueue.this.flush();
/*
* Instead of using an AtomicBoolean object, we use these two. It saves us
private final Channel channel;
private final InetSocketAddress address;
- public ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
+ ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
/*
scheduleFlush(channel.pipeline().lastContext().executor());
}
+ private void conditionalFlush(final ChannelHandlerContext ctx) {
+ Preconditions.checkState(ctx.channel().equals(channel),
+ "Inconsistent channel %s with context %s", channel, ctx);
+ conditionalFlush();
+ }
+
/*
* The synchronized keyword should be unnecessary, really, but it enforces
* queue order should something go terribly wrong. It should be completely
* should be able to perform dynamic adjustments here.
* is that additional complexity needed, though?
*/
- if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
+ if (messages % WORKTIME_RECHECK_MSGS == 0 && System.nanoTime() >= deadline) {
LOG.trace("Exceeded allotted work time {}us",
TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
break;
conditionalFlush();
}
- private void conditionalFlush(final ChannelHandlerContext ctx) {
- Preconditions.checkState(ctx.channel().equals(channel), "Inconsistent channel %s with context %s", channel, ctx);
- conditionalFlush();
- }
-
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
import java.net.InetSocketAddress;
/**
+ * Factory for creating ConnectionFacade instances.
+ *
* @author mirehak
* @author michal.polkorab
*/
public interface ConnectionAdapterFactory {
/**
+ * Creates a ConnectionFacade.
+ *
* @param ch {@link Channel} channel
* @param address {@link InetSocketAddress}
- * @param useBarrier
+ * @param useBarrier true to use a barrier, false otherwise
* @return connection adapter tcp-implementation
*/
ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address, boolean useBarrier);
import java.net.InetSocketAddress;
/**
+ * Implementation of ConnectionAdapterFactory.
+ *
* @author mirehak
* @author michal.polkorab
*/
public class ConnectionAdapterFactoryImpl implements ConnectionAdapterFactory {
- /**
- * @param ch
- * @return connection adapter tcp-implementation
- */
- @Override
+ @Override
public ConnectionFacade createConnectionFacade(final Channel ch, final InetSocketAddress address,
final boolean useBarrier) {
return new ConnectionAdapterImpl(ch, address, useBarrier);
}
-
}
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
/**
+ * Unifying super interface for a connection.
+ *
* @author michal.polkorab
*/
public interface ConnectionFacade extends MessageConsumer, ConnectionAdapter {
import org.opendaylight.yangtools.yang.binding.DataObject;
/**
+ * Interface for a message consumer.
+ *
* @author mirehak
*/
public interface MessageConsumer {
/**
+ * Invoked to consume a message.
+ *
* @param message to process
*/
void consume(DataObject message);
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
/**
*/
public class MessageListenerWrapper {
- private OfHeader msg;
- private GenericFutureListener<Future<Void>> listener;
+ private final OfHeader msg;
+ private final GenericFutureListener<Future<Void>> listener;
/**
+ * Constructor.
+ *
* @param msg outgoing message
* @param listener listener attached to channel.write(msg) Future
*/
}
/**
+ * Returns the outgoing message.
+ *
* @return outgoing message (downstream)
*/
public OfHeader getMsg() {
/**
+ * Returns the listener listening on message sending success / failure.
+ *
* @return listener listening on message sending success / failure
*/
public GenericFutureListener<Future<Void>> getListener() {
return listener;
}
-}
\ No newline at end of file
+}
private OutboundQueueException lastException = null;
private Function<OfHeader, Boolean> isCompletedFunction = DEFAULT_IS_COMPLETE;
- void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
- commit(message, callback, DEFAULT_IS_COMPLETE);
+ void commit(final OfHeader messageToCommit, final FutureCallback<OfHeader> commitCallback) {
+ commit(messageToCommit, commitCallback, DEFAULT_IS_COMPLETE);
}
- void commit(final OfHeader message, final FutureCallback<OfHeader> callback,
- final Function<OfHeader, Boolean> isCompletedFunction) {
+ void commit(final OfHeader messageToCommit, final FutureCallback<OfHeader> commitCallback,
+ final Function<OfHeader, Boolean> isCommitCompletedFunction) {
if (this.completed) {
LOG.warn("Can't commit a completed message.");
- if (callback != null) {
- callback.onFailure(lastException);
+ if (commitCallback != null) {
+ commitCallback.onFailure(lastException);
}
} else {
- this.message = message;
- this.callback = callback;
- this.barrier = message instanceof BarrierInput;
- this.isCompletedFunction = isCompletedFunction;
+ this.message = messageToCommit;
+ this.callback = commitCallback;
+ this.barrier = messageToCommit instanceof BarrierInput;
+ this.isCompletedFunction = isCommitCompletedFunction;
// Volatile write, needs to be last
this.committed = true;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-abstract class OutboundQueueHandlerRegistrationImpl<T extends OutboundQueueHandler> extends AbstractObjectRegistration<T> implements OutboundQueueHandlerRegistration<T> {
+abstract class OutboundQueueHandlerRegistrationImpl<T extends OutboundQueueHandler>
+ extends AbstractObjectRegistration<T> implements OutboundQueueHandlerRegistration<T> {
protected OutboundQueueHandlerRegistrationImpl(final T instance) {
super(instance);
}
import java.net.InetSocketAddress;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
-/**
- *
- * @param <T>
- */
public class OutboundQueueManagerNoBarrier<T extends OutboundQueueHandler> extends
AbstractOutboundQueueManager<T, StackedOutboundQueueNoBarrier> {
- OutboundQueueManagerNoBarrier(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
+ OutboundQueueManagerNoBarrier(final ConnectionAdapterImpl parent, final InetSocketAddress address,
+ final T handler) {
super(parent, address, handler);
}
*/
package org.opendaylight.openflowjava.protocol.impl.core.connection;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
import java.util.concurrent.TimeoutException;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-
final class ResponseExpectedRpcListener<T extends OfHeader> extends AbstractRpcListener<T> {
private static final Logger LOG = LoggerFactory.getLogger(ResponseExpectedRpcListener.class);
private final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache;
package org.opendaylight.openflowjava.protocol.impl.core.connection;
-
-
/**
+ * RPC response key.
+ *
* @author mirehak
*/
public class RpcResponseKey {
private final long xid;
private final String outputClazz;
- /**
- * @param xid
- * @param outputClazz
- */
+
public RpcResponseKey(long xid, String outputClazz) {
this.xid = xid;
this.outputClazz = outputClazz;
}
/**
- * @return the xid
+ * Returns the xid.
*/
public long getXid() {
return xid;
}
/**
- * @return the outputClazz
+ * Return the outputClazz.
*/
public String getOutputClazz() {
return outputClazz;
final int prime = 31;
int result = 1;
result = prime * result
- + ((outputClazz == null) ? 0 : outputClazz.hashCode());
+ + (outputClazz == null ? 0 : outputClazz.hashCode());
return result;
}
return "RpcResultKey [xid=" + xid + ", outputClazz=" + outputClazz
+ "]";
}
-}
\ No newline at end of file
+}
package org.opendaylight.openflowjava.protocol.impl.core.connection;
final class SimpleRpcListener extends AbstractRpcListener<Void> {
- public SimpleRpcListener(final Object message, final String failureInfo) {
+ SimpleRpcListener(final Object message, final String failureInfo) {
super(message, failureInfo);
}
package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.util.concurrent.FutureCallback;
-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class StackedOutboundQueue extends AbstractStackedOutboundQueue {
private static final Logger LOG = LoggerFactory.getLogger(StackedOutboundQueue.class);
- private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER = AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
+ private static final AtomicLongFieldUpdater<StackedOutboundQueue> BARRIER_XID_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(StackedOutboundQueue.class, "barrierXid");
private volatile long barrierXid = -1;
package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.util.concurrent.FutureCallback;
-
import io.netty.channel.Channel;
-
import java.util.function.Function;
-
import javax.annotation.Nonnull;
-
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
/**
- * Size of each individual segment
+ * Size of each individual segment.
*/
static final int SEGMENT_SIZE = 4096;
@Override
public String toString() {
- return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid).add("completeCount", completeCount).toString();
+ return MoreObjects.toStringHelper(this).add("baseXid", baseXid).add("endXid", endXid)
+ .add("completeCount", completeCount).toString();
}
long getBaseXid() {
private static boolean completeEntry(final OutboundQueueEntry entry, final OfHeader response) {
if (response instanceof Error) {
final Error err = (Error)response;
- LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(), err.getCodeString());
+ LOG.debug("Device-reported request XID {} failed {}:{}", response.getXid(), err.getTypeString(),
+ err.getCodeString());
entry.fail(new DeviceRequestFailedException("Device-side failure", err));
return true;
}
// If this assumption is changed, this logic will need to be expanded
// to ensure that the requests implied by the barrier are reported as
// completed *after* the barrier.
- LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+ LOG.trace("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid,
+ baseXid + lastBarrierOffset + 1, xid - 1);
completeRequests(offset);
lastBarrierOffset = offset;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-
import java.net.InetSocketAddress;
/**
*/
public class UdpMessageListenerWrapper extends MessageListenerWrapper {
- private InetSocketAddress address;
+ private final InetSocketAddress address;
/**
+ * Constructor.
+ *
* @param msg message to be sent
* @param listener listener attached to channel.write(msg) Future
* @param address recipient's address
}
/**
- * @return recipient address
+ * Returns recipient address.
*/
public InetSocketAddress getAddress() {
return address;
}
-}
\ No newline at end of file
+}
import org.opendaylight.openflowjava.protocol.impl.util.ActionDeserializerRegistryHelper;
/**
- * @author michal.polkorab
+ * Helper for registering action deserializer initializers.
*
+ * @author michal.polkorab
*/
public final class ActionDeserializerInitializer {
}
/**
- * Registers action deserializers
+ * Registers action deserializers.
+ *
* @param registry registry to be filled with deserializers
*/
public static void registerDeserializers(DeserializerRegistry registry) {
helper.registerDeserializer(26, new OF13PushPbbActionDeserializer());
helper.registerDeserializer(27, new OF13PopPbbActionDeserializer());
}
-}
\ No newline at end of file
+}
/**
* Util class for init registration of additional deserializers.
+ *
* @author giuseppex.petralia@intel.com
*/
-public class AdditionalMessageDeserializerInitializer {
+public final class AdditionalMessageDeserializerInitializer {
private AdditionalMessageDeserializerInitializer() {
throw new UnsupportedOperationException("Utility class shouldn't be instantiated");
}
import org.opendaylight.yangtools.yang.binding.DataObject;
/**
+ * Factory for deserialization.
+ *
* @author michal.polkorab
* @author timotej.kubas
* @author giuseppex.petralia@intel.com
private final Map<TypeToClassKey, Class<?>> messageClassMap = new ConcurrentHashMap<>();
private DeserializerRegistry registry;
- /**
- * Constructor
- */
public DeserializationFactory() {
TypeToClassMapInitializer.initializeTypeToClassMap(messageClassMap);
}
/**
- * Transforms ByteBuf into correct POJO message
+ * Transforms ByteBuf into correct POJO message.
*
- * @param rawMessage
+ * @param rawMessage the message
* @param version
* version decoded from OpenFlow protocol message
* @return correct POJO as DataObject
}
/**
- * Register new type to class mapping used to assign return type when deserializing message
+ * Register new type to class mapping used to assign return type when deserializing message.
+ *
* @param key type to class key
* @param clazz return class
*/
}
/**
- * Unregister type to class mapping used to assign return type when deserializing message
+ * Unregister type to class mapping used to assign return type when deserializing message.
+ *
* @param key type to class key
* @return true if mapping was successfully removed
*/
return messageClassMap.remove(key) != null;
}
- /**
- * @param registry
- */
public void setRegistry(final DeserializerRegistry registry) {
this.registry = registry;
}
import org.slf4j.LoggerFactory;
/**
- * Stores and registers deserializers
+ * Stores and registers deserializers.
*
* @author michal.polkorab
*/
private Map<MessageCodeKey, OFGeneralDeserializer> registry;
/**
- * Decoder table provisioning
+ * Decoder table provisioning.
*/
@Override
public void init() {
@Override
public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
- if ((key == null) || (deserializer == null)) {
+ if (key == null || deserializer == null) {
throw new IllegalArgumentException("MessageCodeKey or Deserializer is null");
}
OFGeneralDeserializer desInRegistry = registry.put(key, deserializer);
import org.opendaylight.openflowjava.protocol.impl.util.InstructionDeserializerRegistryHelper;
/**
- * @author michal.polkorab
+ * Utilities for registering nstruction deserializer initializers.
*
+ * @author michal.polkorab
*/
public final class InstructionDeserializerInitializer {
}
/**
- * Registers instruction deserializers
+ * Registers instruction deserializers.
+ *
* @param registry registry to be filled with deserializers
*/
public static void registerDeserializers(DeserializerRegistry registry) {
helper.registerDeserializer(5, new ClearActionsInstructionDeserializer());
helper.registerDeserializer(6, new MeterInstructionDeserializer());
}
-}
\ No newline at end of file
+}
/**
- * Class used as a key in {@link DeserializerRegistryImpl}
+ * Class used as a key in {@link DeserializerRegistryImpl}.
+ *
* @author michal.polkorab
* @author timotej.kubas
*/
private final short msgVersion;
/**
+ * Constructor.
+ *
* @param msgVersion protocol version
* @param msgType type code of message
*/
}
/**
- * @return the msgType
+ * Returns the msgType.
*/
public short getMsgType() {
return msgType;
}
/**
- * @return the msgVersion
+ * Returns the msgVersion.
*/
public short getMsgVersion() {
return msgVersion;
}
return true;
}
-}
\ No newline at end of file
+}
package org.opendaylight.openflowjava.protocol.impl.deserialization;
import java.util.Map;
+import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
import org.opendaylight.openflowjava.protocol.api.util.EncodeConstants;
import org.opendaylight.openflowjava.protocol.impl.util.TypeToClassInitHelper;
-import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;