* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.openflowjava.protocol.impl.core.connection;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModOutput;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
protected Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
- AbstractConnectionAdapter(@Nonnull final Channel channel, @Nullable final InetSocketAddress address,
+ AbstractConnectionAdapter(@NonNull final Channel channel, @Nullable final InetSocketAddress address,
@Nullable final int channelOutboundQueueSize) {
- this.channel = Preconditions.checkNotNull(channel);
+ this.channel = requireNonNull(channel);
this.address = address;
responseCache = CacheBuilder.newBuilder().concurrencyLevel(1)
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES).removalListener(REMOVAL_LISTENER).build();
- LOG.info("The channel outbound queue size:{}", channelOutboundQueueSize);
+ LOG.debug("The channel outbound queue size:{}", channelOutboundQueueSize);
this.output = new ChannelOutboundQueue(channel, channelOutboundQueueSize, address);
channel.pipeline().addLast(output);
}
}
@Override
- public Future<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
+ public ListenableFuture<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
return sendToSwitchExpectRpcResultFuture(input, BarrierOutput.class, "barrier-input sending failed");
}
@Override
- public Future<RpcResult<EchoOutput>> echo(final EchoInput input) {
+ public ListenableFuture<RpcResult<EchoOutput>> echo(final EchoInput input) {
return sendToSwitchExpectRpcResultFuture(input, EchoOutput.class, "echo-input sending failed");
}
@Override
- public Future<RpcResult<Void>> echoReply(final EchoReplyInput input) {
+ public ListenableFuture<RpcResult<EchoReplyOutput>> echoReply(final EchoReplyInput input) {
return sendToSwitchFuture(input, "echo-reply sending failed");
}
@Override
- public Future<RpcResult<Void>> experimenter(final ExperimenterInput input) {
+ public ListenableFuture<RpcResult<ExperimenterOutput>> experimenter(final ExperimenterInput input) {
return sendToSwitchFuture(input, "experimenter sending failed");
}
@Override
- public Future<RpcResult<Void>> flowMod(final FlowModInput input) {
+ public ListenableFuture<RpcResult<FlowModOutput>> flowMod(final FlowModInput input) {
return sendToSwitchFuture(input, "flow-mod sending failed");
}
@Override
- public Future<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
+ public ListenableFuture<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
return sendToSwitchExpectRpcResultFuture(input, GetConfigOutput.class, "get-config-input sending failed");
}
@Override
- public Future<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
+ public ListenableFuture<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
return sendToSwitchExpectRpcResultFuture(input, GetFeaturesOutput.class, "get-features-input sending failed");
}
@Override
- public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
+ public ListenableFuture<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
return sendToSwitchExpectRpcResultFuture(input, GetQueueConfigOutput.class,
"get-queue-config-input sending failed");
}
@Override
- public Future<RpcResult<Void>> groupMod(final GroupModInput input) {
+ public ListenableFuture<RpcResult<GroupModOutput>> groupMod(final GroupModInput input) {
return sendToSwitchFuture(input, "group-mod-input sending failed");
}
@Override
- public Future<RpcResult<Void>> hello(final HelloInput input) {
+ public ListenableFuture<RpcResult<HelloOutput>> hello(final HelloInput input) {
return sendToSwitchFuture(input, "hello-input sending failed");
}
@Override
- public Future<RpcResult<Void>> meterMod(final MeterModInput input) {
+ public ListenableFuture<RpcResult<MeterModOutput>> meterMod(final MeterModInput input) {
return sendToSwitchFuture(input, "meter-mod-input sending failed");
}
@Override
- public Future<RpcResult<Void>> packetOut(final PacketOutInput input) {
+ public ListenableFuture<RpcResult<PacketOutOutput>> packetOut(final PacketOutInput input) {
return sendToSwitchFuture(input, "packet-out-input sending failed");
}
@Override
- public Future<RpcResult<Void>> multipartRequest(final MultipartRequestInput input) {
+ public ListenableFuture<RpcResult<MultipartRequestOutput>> multipartRequest(final MultipartRequestInput input) {
return sendToSwitchFuture(input, "multi-part-request sending failed");
}
@Override
- public Future<RpcResult<Void>> portMod(final PortModInput input) {
+ public ListenableFuture<RpcResult<PortModOutput>> portMod(final PortModInput input) {
return sendToSwitchFuture(input, "port-mod-input sending failed");
}
@Override
- public Future<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
+ public ListenableFuture<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
return sendToSwitchExpectRpcResultFuture(input, RoleRequestOutput.class,
"role-request-config-input sending failed");
}
@Override
- public Future<RpcResult<Void>> setConfig(final SetConfigInput input) {
+ public ListenableFuture<RpcResult<SetConfigOutput>> setConfig(final SetConfigInput input) {
return sendToSwitchFuture(input, "set-config-input sending failed");
}
@Override
- public Future<RpcResult<Void>> tableMod(final TableModInput input) {
+ public ListenableFuture<RpcResult<TableModOutput>> tableMod(final TableModInput input) {
return sendToSwitchFuture(input, "table-mod-input sending failed");
}
@Override
- public Future<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
+ public ListenableFuture<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
return sendToSwitchExpectRpcResultFuture(input, GetAsyncOutput.class, "get-async-input sending failed");
}
@Override
- public Future<RpcResult<Void>> setAsync(final SetAsyncInput input) {
+ public ListenableFuture<RpcResult<SetAsyncOutput>> setAsync(final SetAsyncInput input) {
return sendToSwitchFuture(input, "set-async-input sending failed");
}
protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>>
sendToSwitchExpectRpcResultFuture(final I input, final Class<O> responseClazz,
final String failureInfo) {
- final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
+ final RpcResponseKey key = new RpcResponseKey(input.getXid().toJava(), responseClazz.getName());
final ResponseExpectedRpcListener<O> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
responseCache, key);
return enqueueMessage(listener);
* <li>else {@link RpcResult} will contain errors and failed status</li>
* </ul>
*/
- protected ListenableFuture<RpcResult<Void>> sendToSwitchFuture(final DataObject input, final String failureInfo) {
- return enqueueMessage(new SimpleRpcListener(input, failureInfo));
+ protected <O extends DataObject> ListenableFuture<RpcResult<O>> sendToSwitchFuture(final Object input,
+ final String failureInfo) {
+ SimpleRpcListener<O> listener = new SimpleRpcListener(input, failureInfo);
+ return enqueueMessage(listener);
}
private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {