2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.openflowjava.protocol.impl.connection;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.socket.SocketChannel;
15 import io.netty.util.concurrent.GenericFutureListener;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
20 import org.opendaylight.controller.sal.common.util.RpcErrors;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
61 import org.opendaylight.yangtools.yang.binding.DataContainer;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.Notification;
64 import org.opendaylight.yangtools.yang.common.RpcError;
65 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
66 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
67 import org.opendaylight.yangtools.yang.common.RpcResult;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 import com.google.common.base.Preconditions;
72 import com.google.common.cache.Cache;
73 import com.google.common.cache.CacheBuilder;
74 import com.google.common.cache.RemovalListener;
75 import com.google.common.cache.RemovalNotification;
76 import com.google.common.util.concurrent.ListenableFuture;
77 import com.google.common.util.concurrent.SettableFuture;
80 * Handles messages (notifications + rpcs) and connections
82 * @author michal.polkorab
84 public class ConnectionAdapterImpl implements ConnectionFacade {
85 /** after this time, RPC future response objects will be thrown away (in minutes) */
86 public static final int RPC_RESPONSE_EXPIRATION = 1;
88 private static final Logger LOG = LoggerFactory
89 .getLogger(ConnectionAdapterImpl.class);
91 private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
92 private static final String TAG = "OPENFLOW";
93 private final Channel channel;
95 /** expiring cache for future rpcResponses */
96 private final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
98 private ConnectionReadyListener connectionReadyListener;
99 private OpenflowProtocolListener messageListener;
100 private SystemNotificationsListener systemListener;
101 private boolean disconnectOccured = false;
105 * @param channel the channel to be set - used for communication
107 public ConnectionAdapterImpl(final SocketChannel channel) {
108 responseCache = CacheBuilder.newBuilder()
110 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
111 .removalListener(new ResponseRemovalListener()).build();
112 this.channel = Preconditions.checkNotNull(channel);
113 LOG.debug("ConnectionAdapter created");
117 public Future<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
118 return sendToSwitchExpectRpcResultFuture(
119 input, BarrierOutput.class, "barrier-input sending failed");
123 public Future<RpcResult<EchoOutput>> echo(final EchoInput input) {
124 return sendToSwitchExpectRpcResultFuture(
125 input, EchoOutput.class, "echo-input sending failed");
129 public Future<RpcResult<Void>> echoReply(final EchoReplyInput input) {
130 return sendToSwitchFuture(input, "echo-reply sending failed");
134 public Future<RpcResult<Void>> experimenter(final ExperimenterInput input) {
135 return sendToSwitchFuture(input, "experimenter sending failed");
139 public Future<RpcResult<Void>> flowMod(final FlowModInput input) {
140 return sendToSwitchFuture(input, "flow-mod sending failed");
144 public Future<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
145 return sendToSwitchExpectRpcResultFuture(
146 input, GetConfigOutput.class, "get-config-input sending failed");
150 public Future<RpcResult<GetFeaturesOutput>> getFeatures(
151 final GetFeaturesInput input) {
152 return sendToSwitchExpectRpcResultFuture(
153 input, GetFeaturesOutput.class, "get-features-input sending failed");
157 public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
158 final GetQueueConfigInput input) {
159 return sendToSwitchExpectRpcResultFuture(
160 input, GetQueueConfigOutput.class, "get-queue-config-input sending failed");
164 public Future<RpcResult<Void>> groupMod(final GroupModInput input) {
165 return sendToSwitchFuture(input, "group-mod-input sending failed");
169 public Future<RpcResult<Void>> hello(final HelloInput input) {
170 return sendToSwitchFuture(input, "hello-input sending failed");
174 public Future<RpcResult<Void>> meterMod(final MeterModInput input) {
175 return sendToSwitchFuture(input, "meter-mod-input sending failed");
179 public Future<RpcResult<Void>> packetOut(final PacketOutInput input) {
180 return sendToSwitchFuture(input, "packet-out-input sending failed");
184 public Future<RpcResult<Void>> multipartRequest(final MultipartRequestInput input) {
185 return sendToSwitchFuture(input, "multi-part-request sending failed");
189 public Future<RpcResult<Void>> portMod(final PortModInput input) {
190 return sendToSwitchFuture(input, "port-mod-input sending failed");
194 public Future<RpcResult<RoleRequestOutput>> roleRequest(
195 final RoleRequestInput input) {
196 return sendToSwitchExpectRpcResultFuture(
197 input, RoleRequestOutput.class, "role-request-config-input sending failed");
201 public Future<RpcResult<Void>> setConfig(final SetConfigInput input) {
202 return sendToSwitchFuture(input, "set-config-input sending failed");
206 public Future<RpcResult<Void>> tableMod(final TableModInput input) {
207 return sendToSwitchFuture(input, "table-mod-input sending failed");
211 public Future<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
212 return sendToSwitchExpectRpcResultFuture(
213 input, GetAsyncOutput.class, "get-async-input sending failed");
217 public Future<RpcResult<Void>> setAsync(final SetAsyncInput input) {
218 return sendToSwitchFuture(input, "set-async-input sending failed");
222 public Future<Boolean> disconnect() {
223 ChannelFuture disconnectResult = channel.disconnect();
224 responseCache.invalidateAll();
225 disconnectOccured = true;
227 String failureInfo = "switch disconnecting failed";
228 ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
229 String message = "Check the switch connection";
230 return handleTransportChannelFuture(disconnectResult, failureInfo, errorSeverity, message);
234 public boolean isAlive() {
235 return channel.isOpen();
239 public void setMessageListener(final OpenflowProtocolListener messageListener) {
240 this.messageListener = messageListener;
244 public void consume(final DataObject message) {
245 LOG.debug("ConsumeIntern msg");
246 if (disconnectOccured ) {
249 if (message instanceof Notification) {
251 if (message instanceof DisconnectEvent) {
252 systemListener.onDisconnectEvent((DisconnectEvent) message);
253 responseCache.invalidateAll();
254 disconnectOccured = true;
255 } else if (message instanceof SwitchIdleEvent) {
256 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
259 else if (message instanceof EchoRequestMessage) {
260 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
261 } else if (message instanceof ErrorMessage) {
262 messageListener.onErrorMessage((ErrorMessage) message);
263 } else if (message instanceof ExperimenterMessage) {
264 messageListener.onExperimenterMessage((ExperimenterMessage) message);
265 } else if (message instanceof FlowRemovedMessage) {
266 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
267 } else if (message instanceof HelloMessage) {
268 LOG.info("Hello received / branch");
269 messageListener.onHelloMessage((HelloMessage) message);
270 } else if (message instanceof MultipartReplyMessage) {
271 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
272 } else if (message instanceof PacketInMessage) {
273 messageListener.onPacketInMessage((PacketInMessage) message);
274 } else if (message instanceof PortStatusMessage) {
275 messageListener.onPortStatusMessage((PortStatusMessage) message);
277 LOG.warn("message listening not supported for type: {}", message.getClass());
280 if (message instanceof OfHeader) {
281 LOG.debug("OFheader msg received");
282 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
283 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
284 if (listener != null) {
285 LOG.debug("corresponding rpcFuture found");
286 listener.completed((OfHeader)message);
287 LOG.debug("after setting rpcFuture");
288 responseCache.invalidate(key);
290 LOG.warn("received unexpected rpc response: {}", key);
293 LOG.warn("message listening not supported for type: {}", message.getClass());
299 * sends given message to switch, sending result will be reported via return value
300 * @param input message to send
301 * @param failureInfo describes, what type of message caused failure by sending
302 * @return future object, <ul>
303 * <li>if send successful, {@link RpcResult} without errors and successful
304 * status will be returned, </li>
305 * <li>else {@link RpcResult} will contain errors and failed status</li>
308 private ListenableFuture<RpcResult<Void>> sendToSwitchFuture(
309 final DataObject input, final String failureInfo) {
310 final SimpleRpcListener listener = new SimpleRpcListener(failureInfo);
312 LOG.debug("going to flush");
313 channel.writeAndFlush(input).addListener(listener);
314 LOG.debug("flushed");
316 return listener.getResult();
320 * sends given message to switch, sending result or switch response will be reported via return value
321 * @param input message to send
322 * @param responseClazz type of response
323 * @param failureInfo describes, what type of message caused failure by sending
324 * @return future object, <ul>
325 * <li>if send fails, {@link RpcResult} will contain errors and failed status </li>
326 * <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout
327 * ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}),
328 * <ul><li>either switch will manage to answer
329 * and then corresponding response message will be set into returned future</li>
330 * <li>or response in cache will expire and returned future will be cancelled</li></ul>
334 private <IN extends OfHeader, OUT extends OfHeader> ListenableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
335 final IN input, final Class<OUT> responseClazz, final String failureInfo) {
336 final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
337 final ResponseExpectedRpcListener<OUT> listener =
338 new ResponseExpectedRpcListener<>(failureInfo, responseCache, key);
340 LOG.debug("going to flush");
341 channel.writeAndFlush(input).addListener(listener);
342 LOG.debug("flushed");
344 return listener.getResult();
348 * @param resultFuture
350 * @param errorSeverity
354 private static SettableFuture<Boolean> handleTransportChannelFuture(
355 final ChannelFuture resultFuture, final String failureInfo,
356 final ErrorSeverity errorSeverity, final String message) {
358 final SettableFuture<Boolean> transportResult = SettableFuture.create();
360 resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
363 public void operationComplete(
364 final io.netty.util.concurrent.Future<? super Void> future)
366 transportResult.set(future.isSuccess());
367 if (!future.isSuccess()) {
368 transportResult.setException(future.cause());
372 return transportResult;
379 static RpcError buildRpcError(final String info, final ErrorSeverity severity, final String message,
380 final Throwable cause) {
381 RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message,
382 ErrorType.RPC, cause);
390 protected static RpcError buildTransportError(final String info, final ErrorSeverity severity, final String message,
391 final Throwable cause) {
392 RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message,
393 ErrorType.TRANSPORT, cause);
401 private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
402 return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName());
408 private ResponseExpectedRpcListener<?> findRpcResponse(final RpcResponseKey key) {
409 return responseCache.getIfPresent(key);
413 public void setSystemListener(final SystemNotificationsListener systemListener) {
414 this.systemListener = systemListener;
418 public void checkListeners() {
419 StringBuffer buffer = new StringBuffer();
420 if (systemListener == null) {
421 buffer.append("SystemListener ");
423 if (messageListener == null) {
424 buffer.append("MessageListener ");
426 if (connectionReadyListener == null) {
427 buffer.append("ConnectionReadyListener ");
430 if (buffer.length() > 0) {
431 throw new IllegalStateException("Missing listeners: " + buffer.toString());
435 static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> {
437 public void onRemoval(
438 final RemovalNotification<RpcResponseKey, ResponseExpectedRpcListener<?>> notification) {
439 notification.getValue().discard();
444 * Class is used ONLY for exiting msgQueue processing thread
445 * @author michal.polkorab
447 static class ExitingDataObject implements DataObject {
449 public Class<? extends DataContainer> getImplementedInterface() {
455 public void fireConnectionReadyNotification() {
456 new Thread(new Runnable() {
459 connectionReadyListener.onConnectionReady();
466 public void setConnectionReadyListener(
467 final ConnectionReadyListener connectionReadyListener) {
468 this.connectionReadyListener = connectionReadyListener;