Merge "Use String(byte[], Charset)"
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractConnectionAdapter.java
index 87119f423c314690e5831c2728d434bce66f8224..7544368d5bcdbb0113fd3099da4e2a71fcfe0b29 100644 (file)
@@ -5,11 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
@@ -22,16 +22,19 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
@@ -41,17 +44,26 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModOutput;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
@@ -87,14 +99,14 @@ abstract class AbstractConnectionAdapter implements ConnectionAdapter {
     protected Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
 
 
-    AbstractConnectionAdapter(@Nonnull final Channel channel, @Nullable final InetSocketAddress address,
+    AbstractConnectionAdapter(@NonNull final Channel channel, @Nullable final InetSocketAddress address,
                               @Nullable final int channelOutboundQueueSize) {
-        this.channel = Preconditions.checkNotNull(channel);
+        this.channel = requireNonNull(channel);
         this.address = address;
 
         responseCache = CacheBuilder.newBuilder().concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES).removalListener(REMOVAL_LISTENER).build();
-        LOG.info("The channel outbound queue size:{}", channelOutboundQueueSize);
+        LOG.debug("The channel outbound queue size:{}", channelOutboundQueueSize);
         this.output = new ChannelOutboundQueue(channel, channelOutboundQueueSize, address);
         channel.pipeline().addLast(output);
     }
@@ -109,99 +121,99 @@ abstract class AbstractConnectionAdapter implements ConnectionAdapter {
     }
 
     @Override
-    public Future<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
+    public ListenableFuture<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
         return sendToSwitchExpectRpcResultFuture(input, BarrierOutput.class, "barrier-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<EchoOutput>> echo(final EchoInput input) {
+    public ListenableFuture<RpcResult<EchoOutput>> echo(final EchoInput input) {
         return sendToSwitchExpectRpcResultFuture(input, EchoOutput.class, "echo-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> echoReply(final EchoReplyInput input) {
+    public ListenableFuture<RpcResult<EchoReplyOutput>> echoReply(final EchoReplyInput input) {
         return sendToSwitchFuture(input, "echo-reply sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> experimenter(final ExperimenterInput input) {
+    public ListenableFuture<RpcResult<ExperimenterOutput>> experimenter(final ExperimenterInput input) {
         return sendToSwitchFuture(input, "experimenter sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> flowMod(final FlowModInput input) {
+    public ListenableFuture<RpcResult<FlowModOutput>> flowMod(final FlowModInput input) {
         return sendToSwitchFuture(input, "flow-mod sending failed");
     }
 
     @Override
-    public Future<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
+    public ListenableFuture<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
         return sendToSwitchExpectRpcResultFuture(input, GetConfigOutput.class, "get-config-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
+    public ListenableFuture<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
         return sendToSwitchExpectRpcResultFuture(input, GetFeaturesOutput.class, "get-features-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
+    public ListenableFuture<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
         return sendToSwitchExpectRpcResultFuture(input, GetQueueConfigOutput.class,
                 "get-queue-config-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> groupMod(final GroupModInput input) {
+    public ListenableFuture<RpcResult<GroupModOutput>> groupMod(final GroupModInput input) {
         return sendToSwitchFuture(input, "group-mod-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> hello(final HelloInput input) {
+    public ListenableFuture<RpcResult<HelloOutput>> hello(final HelloInput input) {
         return sendToSwitchFuture(input, "hello-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> meterMod(final MeterModInput input) {
+    public ListenableFuture<RpcResult<MeterModOutput>> meterMod(final MeterModInput input) {
         return sendToSwitchFuture(input, "meter-mod-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> packetOut(final PacketOutInput input) {
+    public ListenableFuture<RpcResult<PacketOutOutput>> packetOut(final PacketOutInput input) {
         return sendToSwitchFuture(input, "packet-out-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> multipartRequest(final MultipartRequestInput input) {
+    public ListenableFuture<RpcResult<MultipartRequestOutput>> multipartRequest(final MultipartRequestInput input) {
         return sendToSwitchFuture(input, "multi-part-request sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> portMod(final PortModInput input) {
+    public ListenableFuture<RpcResult<PortModOutput>> portMod(final PortModInput input) {
         return sendToSwitchFuture(input, "port-mod-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
+    public ListenableFuture<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
         return sendToSwitchExpectRpcResultFuture(input, RoleRequestOutput.class,
                 "role-request-config-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> setConfig(final SetConfigInput input) {
+    public ListenableFuture<RpcResult<SetConfigOutput>> setConfig(final SetConfigInput input) {
         return sendToSwitchFuture(input, "set-config-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> tableMod(final TableModInput input) {
+    public ListenableFuture<RpcResult<TableModOutput>> tableMod(final TableModInput input) {
         return sendToSwitchFuture(input, "table-mod-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
+    public ListenableFuture<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
         return sendToSwitchExpectRpcResultFuture(input, GetAsyncOutput.class, "get-async-input sending failed");
     }
 
     @Override
-    public Future<RpcResult<Void>> setAsync(final SetAsyncInput input) {
+    public ListenableFuture<RpcResult<SetAsyncOutput>> setAsync(final SetAsyncInput input) {
         return sendToSwitchFuture(input, "set-async-input sending failed");
     }
 
@@ -264,7 +276,7 @@ abstract class AbstractConnectionAdapter implements ConnectionAdapter {
     protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>>
             sendToSwitchExpectRpcResultFuture(final I input, final Class<O> responseClazz,
                     final String failureInfo) {
-        final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
+        final RpcResponseKey key = new RpcResponseKey(input.getXid().toJava(), responseClazz.getName());
         final ResponseExpectedRpcListener<O> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
                 responseCache, key);
         return enqueueMessage(listener);
@@ -281,8 +293,10 @@ abstract class AbstractConnectionAdapter implements ConnectionAdapter {
      *         <li>else {@link RpcResult} will contain errors and failed status</li>
      *         </ul>
      */
-    protected ListenableFuture<RpcResult<Void>> sendToSwitchFuture(final DataObject input, final String failureInfo) {
-        return enqueueMessage(new SimpleRpcListener(input, failureInfo));
+    protected <O extends DataObject> ListenableFuture<RpcResult<O>> sendToSwitchFuture(final Object input,
+                                                                                     final String failureInfo) {
+        SimpleRpcListener<O> listener = new SimpleRpcListener(input, failureInfo);
+        return enqueueMessage(listener);
     }
 
     private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {