2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalCause;
16 import com.google.common.cache.RemovalListener;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import java.net.InetSocketAddress;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModOutput;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
73 * {@link ConnectionAdapter} interface contains couple of OF message handling approaches.
74 * {@link AbstractConnectionAdapter} class contains direct RPC processing from OpenflowProtocolService
75 * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService}
77 abstract class AbstractConnectionAdapter implements ConnectionAdapter {
79 private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionAdapter.class);
81 /** after this time, RPC future response objects will be thrown away (in minutes). */
82 private static final int RPC_RESPONSE_EXPIRATION = 1;
84 private static final Exception QUEUE_FULL_EXCEPTION = new RejectedExecutionException("Output queue is full");
86 protected static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER =
88 if (!notification.getCause().equals(RemovalCause.EXPLICIT)) {
89 notification.getValue().discard();
93 protected final Channel channel;
94 protected final InetSocketAddress address;
95 protected boolean disconnectOccured = false;
96 protected final ChannelOutboundQueue output;
98 /** expiring cache for future rpcResponses. */
99 protected Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
102 AbstractConnectionAdapter(@NonNull final Channel channel, @Nullable final InetSocketAddress address,
103 @Nullable final int channelOutboundQueueSize) {
104 this.channel = requireNonNull(channel);
105 this.address = address;
107 responseCache = CacheBuilder.newBuilder().concurrencyLevel(1)
108 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES).removalListener(REMOVAL_LISTENER).build();
109 LOG.debug("The channel outbound queue size:{}", channelOutboundQueueSize);
110 this.output = new ChannelOutboundQueue(channel, channelOutboundQueueSize, address);
111 channel.pipeline().addLast(output);
115 public Future<Boolean> disconnect() {
116 final ChannelFuture disconnectResult = channel.disconnect();
117 responseCache.invalidateAll();
118 disconnectOccured = true;
120 return handleTransportChannelFuture(disconnectResult);
124 public ListenableFuture<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
125 return sendToSwitchExpectRpcResultFuture(input, BarrierOutput.class, "barrier-input sending failed");
129 public ListenableFuture<RpcResult<EchoOutput>> echo(final EchoInput input) {
130 return sendToSwitchExpectRpcResultFuture(input, EchoOutput.class, "echo-input sending failed");
134 public ListenableFuture<RpcResult<EchoReplyOutput>> echoReply(final EchoReplyInput input) {
135 return sendToSwitchFuture(input, "echo-reply sending failed");
139 public ListenableFuture<RpcResult<ExperimenterOutput>> experimenter(final ExperimenterInput input) {
140 return sendToSwitchFuture(input, "experimenter sending failed");
144 public ListenableFuture<RpcResult<FlowModOutput>> flowMod(final FlowModInput input) {
145 return sendToSwitchFuture(input, "flow-mod sending failed");
149 public ListenableFuture<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
150 return sendToSwitchExpectRpcResultFuture(input, GetConfigOutput.class, "get-config-input sending failed");
154 public ListenableFuture<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
155 return sendToSwitchExpectRpcResultFuture(input, GetFeaturesOutput.class, "get-features-input sending failed");
159 public ListenableFuture<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
160 return sendToSwitchExpectRpcResultFuture(input, GetQueueConfigOutput.class,
161 "get-queue-config-input sending failed");
165 public ListenableFuture<RpcResult<GroupModOutput>> groupMod(final GroupModInput input) {
166 return sendToSwitchFuture(input, "group-mod-input sending failed");
170 public ListenableFuture<RpcResult<HelloOutput>> hello(final HelloInput input) {
171 return sendToSwitchFuture(input, "hello-input sending failed");
175 public ListenableFuture<RpcResult<MeterModOutput>> meterMod(final MeterModInput input) {
176 return sendToSwitchFuture(input, "meter-mod-input sending failed");
180 public ListenableFuture<RpcResult<PacketOutOutput>> packetOut(final PacketOutInput input) {
181 return sendToSwitchFuture(input, "packet-out-input sending failed");
185 public ListenableFuture<RpcResult<MultipartRequestOutput>> multipartRequest(final MultipartRequestInput input) {
186 return sendToSwitchFuture(input, "multi-part-request sending failed");
190 public ListenableFuture<RpcResult<PortModOutput>> portMod(final PortModInput input) {
191 return sendToSwitchFuture(input, "port-mod-input sending failed");
195 public ListenableFuture<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
196 return sendToSwitchExpectRpcResultFuture(input, RoleRequestOutput.class,
197 "role-request-config-input sending failed");
201 public ListenableFuture<RpcResult<SetConfigOutput>> setConfig(final SetConfigInput input) {
202 return sendToSwitchFuture(input, "set-config-input sending failed");
206 public ListenableFuture<RpcResult<TableModOutput>> tableMod(final TableModInput input) {
207 return sendToSwitchFuture(input, "table-mod-input sending failed");
211 public ListenableFuture<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
212 return sendToSwitchExpectRpcResultFuture(input, GetAsyncOutput.class, "get-async-input sending failed");
216 public ListenableFuture<RpcResult<SetAsyncOutput>> setAsync(final SetAsyncInput input) {
217 return sendToSwitchFuture(input, "set-async-input sending failed");
221 public boolean isAlive() {
222 return channel.isOpen();
226 public boolean isAutoRead() {
227 return channel.config().isAutoRead();
231 public void setAutoRead(final boolean autoRead) {
232 channel.config().setAutoRead(autoRead);
236 public InetSocketAddress getRemoteAddress() {
237 return (InetSocketAddress) channel.remoteAddress();
241 * Used only for testing purposes.
243 * @param cache replacement
246 void setResponseCache(final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
247 this.responseCache = cache;
251 * Return cached RpcListener or {@code null} if not cached.
253 protected ResponseExpectedRpcListener<?> findRpcResponse(final RpcResponseKey key) {
254 return responseCache.getIfPresent(key);
258 * Sends given message to switch, sending result or switch response will be reported via return value.
260 * @param input message to send
261 * @param responseClazz type of response
262 * @param failureInfo describes, what type of message caused failure by sending
263 * @return future object,
265 * <li>if send fails, {@link RpcResult} will contain errors and failed status</li>
266 * <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout (
267 * {@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}),
269 * <li>either switch will manage to answer and then corresponding response message will be set into returned
271 * <li>or response in cache will expire and returned future will be cancelled</li>
276 protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>>
277 sendToSwitchExpectRpcResultFuture(final I input, final Class<O> responseClazz,
278 final String failureInfo) {
279 final RpcResponseKey key = new RpcResponseKey(input.getXid().toJava(), responseClazz.getName());
280 final ResponseExpectedRpcListener<O> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
282 return enqueueMessage(listener);
286 * Sends given message to switch, sending result will be reported via return value.
288 * @param input message to send
289 * @param failureInfo describes, what type of message caused failure by sending
290 * @return future object,
292 * <li>if send successful, {@link RpcResult} without errors and successful status will be returned,</li>
293 * <li>else {@link RpcResult} will contain errors and failed status</li>
296 protected <O extends DataObject> ListenableFuture<RpcResult<O>> sendToSwitchFuture(final Object input,
297 final String failureInfo) {
298 SimpleRpcListener<O> listener = new SimpleRpcListener(input, failureInfo);
299 return enqueueMessage(listener);
302 private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {
303 LOG.debug("Submitting promise {}", promise);
305 if (!output.enqueue(promise)) {
306 LOG.debug("Message queue is full, rejecting execution");
307 promise.failedRpc(QUEUE_FULL_EXCEPTION);
309 LOG.debug("Promise enqueued successfully");
312 return promise.getResult();
315 private static SettableFuture<Boolean> handleTransportChannelFuture(final ChannelFuture resultFuture) {
317 final SettableFuture<Boolean> transportResult = SettableFuture.create();
319 resultFuture.addListener(future -> {
320 transportResult.set(future.isSuccess());
321 if (!future.isSuccess()) {
322 transportResult.setException(future.cause());
325 return transportResult;