X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fconnection%2FConnectionAdapterImpl.java;h=d2c8f1be4278590ea18de8b3b1c9d0d33e174e7e;hb=156b1d174392438b48a76fd82ad3f84f618e8042;hp=9457373d3ac7ee80551fd1174a4a6544d6cc334c;hpb=8dda3700d7c10753ead351f87d6e4b4699761fe4;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java index 9457373d..d2c8f1be 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java @@ -1,10 +1,4 @@ -/** - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ +/* Copyright (C)2013 Pantheon Technologies, s.r.o. All rights reserved. */ package org.opendaylight.openflowjava.protocol.impl.connection; @@ -13,11 +7,14 @@ import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput; @@ -54,6 +51,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.RpcError; @@ -74,7 +75,7 @@ import com.google.common.util.concurrent.SettableFuture; * @author mirehak * @author michal.polkorab */ -public class ConnectionAdapterImpl implements CommunicationFacade { +public class ConnectionAdapterImpl implements ConnectionFacade { /** after this time, rpc future response objects will be thrown away (in minutes) */ public static final int RPC_RESPONSE_EXPIRATION = 1; @@ -86,11 +87,13 @@ public class ConnectionAdapterImpl implements CommunicationFacade { private static final String TAG = "OPENFLOW"; private Channel channel; private OpenflowProtocolListener messageListener; - private int version; /** expiring cache for future rpcResponses */ protected Cache> responseCache; - - + private SystemNotificationsListener systemListener; + private boolean disconnectOccured = false; + + protected ConnectionReadyListener connectionReadyListener; + /** * default ctor */ @@ -98,22 +101,13 @@ public class ConnectionAdapterImpl implements CommunicationFacade { responseCache = CacheBuilder.newBuilder() .concurrencyLevel(1) .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES) - .removalListener(new RemovalListener>() { - - @Override - public void onRemoval( - RemovalNotification> notification) { - LOG.warn("rpc response discarded: "+notification.getKey()); - notification.getValue().cancel(true); - } - }).build(); + .removalListener(new ResponseRemovalListener()).build(); LOG.info("ConnectionAdapter created"); } /** * @param channel the channel to be set - used for communication */ - @Override public void setChannel(Channel channel) { this.channel = channel; } @@ -221,7 +215,9 @@ public class ConnectionAdapterImpl implements CommunicationFacade { @Override public Future disconnect() { ChannelFuture disconnectResult = channel.disconnect(); - + responseCache.invalidateAll(); + disconnectOccured = true; + String failureInfo = "switch disconnecting failed"; ErrorSeverity errorSeverity = ErrorSeverity.ERROR; String message = "Check the switch connection"; @@ -240,13 +236,21 @@ public class ConnectionAdapterImpl implements CommunicationFacade { @Override public void consume(DataObject message) { - if (message == null) { - LOG.error("message is null"); - } else { - LOG.debug("message is ok"); + LOG.debug("ConsumeIntern msg"); + if (disconnectOccured ) { + return; } if (message instanceof Notification) { - if (message instanceof EchoRequestMessage) { + // System events + if (message instanceof DisconnectEvent) { + systemListener.onDisconnectEvent((DisconnectEvent) message); + responseCache.invalidateAll(); + disconnectOccured = true; + } else if (message instanceof SwitchIdleEvent) { + systemListener.onSwitchIdleEvent((SwitchIdleEvent) message); + } + // OpenFlow messages + else if (message instanceof EchoRequestMessage) { messageListener.onEchoRequestMessage((EchoRequestMessage) message); } else if (message instanceof ErrorMessage) { messageListener.onErrorMessage((ErrorMessage) message); @@ -270,15 +274,19 @@ public class ConnectionAdapterImpl implements CommunicationFacade { } } else { if (message instanceof OfHeader) { + LOG.debug("OFheader msg received"); RpcResponseKey key = createRpcResponseKey((OfHeader) message); - SettableFuture> rpcFuture = findRpcResponse(key); + final SettableFuture> rpcFuture = findRpcResponse(key); if (rpcFuture != null) { - rpcFuture.set(Rpcs.getRpcResult(true, message, null)); + LOG.debug("corresponding rpcFuture found"); + List errors = Collections.emptyList(); + LOG.debug("before setting rpcFuture"); + rpcFuture.set(Rpcs.getRpcResult(true, message, errors)); + LOG.debug("after setting rpcFuture"); responseCache.invalidate(key); } else { LOG.warn("received unexpected rpc response: "+key); } - } else { LOG.warn("message listening not supported for type: "+message.getClass()); } @@ -297,7 +305,9 @@ public class ConnectionAdapterImpl implements CommunicationFacade { */ private SettableFuture> sendToSwitchFuture( DataObject input, final String failureInfo) { + LOG.debug("going to flush"); ChannelFuture resultFuture = channel.writeAndFlush(input); + LOG.debug("flushed"); ErrorSeverity errorSeverity = ErrorSeverity.ERROR; String errorMessage = "check switch connection"; @@ -321,12 +331,18 @@ public class ConnectionAdapterImpl implements CommunicationFacade { */ private SettableFuture> sendToSwitchExpectRpcResultFuture( IN input, Class responseClazz, final String failureInfo) { + LOG.debug("going to flush"); + SettableFuture> rpcResult = SettableFuture.create(); + RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz); + responseCache.put(key, rpcResult); ChannelFuture resultFuture = channel.writeAndFlush(input); + LOG.debug("flushed"); ErrorSeverity errorSeverity = ErrorSeverity.ERROR; String errorMessage = "check switch connection"; + return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, - errorMessage, input, responseClazz); + errorMessage, input, responseClazz, rpcResult, key); } /** @@ -339,16 +355,18 @@ public class ConnectionAdapterImpl implements CommunicationFacade { final ErrorSeverity errorSeverity, final String errorMessage) { final SettableFuture> rpcResult = SettableFuture.create(); - + LOG.debug("handlerpcchannelfuture"); resultFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete( io.netty.util.concurrent.Future future) throws Exception { - Collection errors = null; + LOG.debug("operation complete"); + Collection errors = Collections.emptyList(); if (future.cause() != null) { + LOG.debug("future.cause != null"); RpcError rpcError = buildRpcError(failureInfo, errorSeverity, errorMessage, future.cause()); errors = Lists.newArrayList(rpcError); @@ -365,19 +383,20 @@ public class ConnectionAdapterImpl implements CommunicationFacade { } /** - * @param input - * @param responseClazz * @param resultFuture * @param failureInfo * @param errorSeverity * @param errorMessage + * @param input + * @param responseClazz + * @param key of rpcResponse * @return */ private SettableFuture> handleRpcChannelFutureWithResponse( ChannelFuture resultFuture, final String failureInfo, final ErrorSeverity errorSeverity, final String errorMessage, - final IN input, Class responseClazz) { - final SettableFuture> rpcResult = SettableFuture.create(); + final IN input, Class responseClazz, final SettableFuture> rpcResult, final RpcResponseKey key) { + LOG.debug("handleRpcchanfuture with response"); resultFuture.addListener(new GenericFutureListener>() { @@ -386,8 +405,10 @@ public class ConnectionAdapterImpl implements CommunicationFacade { io.netty.util.concurrent.Future future) throws Exception { + LOG.debug("operation complete"); + Collection errors = Collections.emptyList(); if (future.cause() != null) { - Collection errors = null; + LOG.debug("ChannelFuture.cause != null"); RpcError rpcError = buildRpcError(failureInfo, errorSeverity, errorMessage, future.cause()); errors = Lists.newArrayList(rpcError); @@ -396,12 +417,12 @@ public class ConnectionAdapterImpl implements CommunicationFacade { (OUT) null, errors) ); + responseCache.invalidate(key); } else { - RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString()); - if (responseCache.getIfPresent(key) != null) { - responseCache.invalidate(key); + LOG.debug("ChannelFuture.cause == null"); + if (responseCache.getIfPresent(key) == null) { + LOG.debug("responcache: key wasn't present"); } - responseCache.put(key, rpcResult); } } }); @@ -428,7 +449,9 @@ public class ConnectionAdapterImpl implements CommunicationFacade { io.netty.util.concurrent.Future future) throws Exception { transportResult.set(future.isSuccess()); - transportResult.setException(future.cause()); + if (!future.isSuccess()) { + transportResult.setException(future.cause()); + } } }); return transportResult; @@ -461,7 +484,7 @@ public class ConnectionAdapterImpl implements CommunicationFacade { * @return */ private static RpcResponseKey createRpcResponseKey(OfHeader message) { - return new RpcResponseKey(message.getXid(), message.getClass().toString()); + return new RpcResponseKey(message.getXid(), message.getClass()); } /** @@ -472,12 +495,67 @@ public class ConnectionAdapterImpl implements CommunicationFacade { return (SettableFuture>) responseCache.getIfPresent(key); } - /* (non-Javadoc) - * @see org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter#setVersion(int) - */ @Override - public void setVersion(int version) { - this.version = version; + public void setSystemListener(SystemNotificationsListener systemListener) { + this.systemListener = systemListener; + } + + @Override + public void checkListeners() { + StringBuffer buffer = new StringBuffer(); + if (systemListener == null) { + buffer.append("SystemListener "); + } + if (messageListener == null) { + buffer.append("MessageListener "); + } + if (connectionReadyListener == null) { + buffer.append("ConnectionReadyListener "); + } + + if (buffer.length() > 0) { + throw new IllegalStateException("Missing listeners: " + buffer.toString()); + } } + static class ResponseRemovalListener implements RemovalListener> { + @Override + public void onRemoval( + RemovalNotification> notification) { + SettableFuture future = notification.getValue(); + if (!future.isDone()) { + LOG.warn("rpc response discarded: " + notification.getKey()); + future.cancel(true); + } + } + } + + /** + * Class is used ONLY for exiting msgQueue processing thread + * @author michal.polkorab + */ + static class ExitingDataObject implements DataObject { + @Override + public Class getImplementedInterface() { + return null; + } + } + + @Override + public void fireConnectionReadyNotification() { + new Thread(new Runnable() { + @Override + public void run() { + connectionReadyListener.onConnectionReady(); + } + }).start(); + } + + + @Override + public void setConnectionReadyListener( + ConnectionReadyListener connectionReadyListener) { + this.connectionReadyListener = connectionReadyListener; + } + }