From d2705d7b11d14d33b7777e4e069001aadcb7c70c Mon Sep 17 00:00:00 2001 From: Michal Rehak Date: Thu, 12 Sep 2013 22:37:27 +0200 Subject: [PATCH] add lib-plugin interaction implementations on library side - startup, shutdown of multiple servers creates chained future result - update yangtool version to 0.5.8-SNAPSHOT Change-Id: Id8f2fe0f5a4050592d62af2c5b416574150b6f9d Signed-off-by: Michal Rehak --- openflow-protocol-api/pom.xml | 12 +- .../connection/ConnectionConfiguration.java | 51 ++++ .../connection/ConnectionAdapterImpl.java | 265 ++++++++++++++---- .../impl/connection/MessageConsumer.java | 23 ++ .../impl/connection/OnlineProvider.java | 24 ++ .../impl/connection/RpcResponseKey.java | 79 ++++++ .../impl/connection/ServerFacade.java | 18 ++ .../impl/connection/ShutdownProvider.java | 25 ++ .../SwitchConnectionProviderImpl.java | 94 +++++++ .../core/PublishingChannelInitializer.java | 22 +- .../protocol/impl/core/TcpHandler.java | 30 +- .../protocol/impl/core/TcpHandlerTest.java | 8 +- .../protocol/impl/core/TlsDetectorTest.java | 2 - .../connection/SwitchConnectionProvider.java | 11 +- 14 files changed, 588 insertions(+), 76 deletions(-) create mode 100644 openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionConfiguration.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/MessageConsumer.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/OnlineProvider.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/RpcResponseKey.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ServerFacade.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ShutdownProvider.java create mode 100644 openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SwitchConnectionProviderImpl.java diff --git a/openflow-protocol-api/pom.xml b/openflow-protocol-api/pom.xml index 63757a77..65374963 100644 --- a/openflow-protocol-api/pom.xml +++ b/openflow-protocol-api/pom.xml @@ -8,13 +8,17 @@ openflow-protocol-api Openflow Protocol Library - API + + + 0.5.8-SNAPSHOT + org.opendaylight.yangtools yang-maven-plugin - 0.5.7-SNAPSHOT + ${yangtools.version} @@ -41,7 +45,7 @@ org.opendaylight.yangtools maven-sal-api-gen-plugin - 0.5.7-SNAPSHOT + ${yangtools.version} jar @@ -108,12 +112,12 @@ org.opendaylight.yangtools yang-binding - 0.5.7-SNAPSHOT + ${yangtools.version} org.opendaylight.yangtools yang-common - 0.5.7-SNAPSHOT + ${yangtools.version} org.opendaylight.yangtools.model diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionConfiguration.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionConfiguration.java new file mode 100644 index 00000000..16ffde3c --- /dev/null +++ b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionConfiguration.java @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.api.connection; + +import java.net.InetAddress; + +/** + * @author mirehak + * + */ +public interface ConnectionConfiguration { + + /** + * connection functionality support types + */ + public enum FEATURE_SUPPORT { + /** feature is not supported at all */ + NOT_SUPPORTED, + /** feature is supported */ + SUPPORTED, + /** feature is supported and has to be used by clients */ + REQUIRED + } + + /** + * @return address to bind, if null, all available interfaces will be used + */ + public InetAddress getAddress(); + + /** + * @return port to bind + */ + public int getPort(); + + /** + * @return transport protocol to use + */ + public Object getTransferProtocol(); + + /** + * @return encryption feature support + */ + public FEATURE_SUPPORT getTlsSupport(); + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java index 0423d25b..5f670132 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ConnectionAdapterImpl.java @@ -14,6 +14,7 @@ import io.netty.util.concurrent.GenericFutureListener; import java.util.Collection; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; @@ -23,8 +24,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput; @@ -35,20 +40,34 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; @@ -56,12 +75,39 @@ import com.google.common.util.concurrent.SettableFuture; * @author mirehak * */ -public class ConnectionAdapterImpl implements ConnectionAdapter { +public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer { + + /** after this time, rpc future response objects will be thrown away (in minutes) */ + public static final int RPC_RESPONSE_EXPIRATION = 1; + + protected static final Logger LOG = LoggerFactory + .getLogger(ConnectionAdapterImpl.class); private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY"; private static final String TAG = "OPENFLOW"; private Channel channel; private OpenflowProtocolListener messageListener; + /** expiring cache for future rpcResponses */ + protected Cache> responseCache; + + + /** + * default ctor + */ + public ConnectionAdapterImpl() { + responseCache = CacheBuilder.newBuilder() + .concurrencyLevel(1) + .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES) + .removalListener(new RemovalListener>() { + + @Override + public void onRemoval( + RemovalNotification> notification) { + LOG.warn("rpc response discarded: "+notification.getKey()); + notification.getValue().cancel(true); + } + }).build(); + } /** * @param channel the channel to set @@ -72,118 +118,102 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { @Override public Future> barrier(BarrierInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, BarrierOutput.class, "barrier-input sending failed"); } @Override public Future> echo(EchoInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, EchoOutput.class, "echo-input sending failed"); } @Override public Future> echoReply(EchoReplyInput input) { - return sendToSwitchFuture(input, "echo reply sending failed"); + return sendToSwitchFuture(input, "echo-reply sending failed"); } @Override public Future> experimenter(ExperimenterInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "experimenter sending failed"); } @Override public Future> flowMod(FlowModInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "flow-mod sending failed"); } @Override public Future> getConfig(GetConfigInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, GetConfigOutput.class, "get-config-input sending failed"); } @Override public Future> getFeatures( GetFeaturesInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, GetFeaturesOutput.class, "get-features-input sending failed"); } @Override public Future> getQueueConfig( GetQueueConfigInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, GetQueueConfigOutput.class, "get-queue-config-input sending failed"); } @Override public Future> groupMod(GroupModInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "group-mod-input sending failed"); } @Override public Future> hello(HelloInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "hello-input sending failed"); } @Override public Future> meterMod(MeterModInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "meter-mod-input sending failed"); } @Override public Future> packetOut(PacketOutInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "packet-out-input sending failed"); } @Override public Future> portMod(PortModInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "port-mod-input sending failed"); } @Override public Future> roleRequest( RoleRequestInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, RoleRequestOutput.class, "role-request-config-input sending failed"); } @Override public Future> setConfig(SetConfigInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "set-config-input sending failed"); } @Override public Future> tableMod(TableModInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "table-mod-input sending failed"); } - /* (non-Javadoc) - * @see org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService#getAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput) - */ @Override public Future> getAsync(GetAsyncInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchExpectRpcResultFuture( + input, GetAsyncOutput.class, "get-async-input sending failed"); } - /* (non-Javadoc) - * @see org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService#setAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput) - */ @Override public Future> setAsync(SetAsyncInput input) { - // TODO Auto-generated method stub - return null; + return sendToSwitchFuture(input, "set-async-input sending failed"); } @Override @@ -205,18 +235,90 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { public void setMessageListener(OpenflowProtocolListener messageListener) { this.messageListener = messageListener; } + + @Override + public void consume(DataObject message) { + if (message instanceof Notification) { + if (message instanceof EchoRequestMessage) { + messageListener.onEchoRequestMessage((EchoRequestMessage) message); + } else if (message instanceof ErrorMessage) { + messageListener.onErrorMessage((ErrorMessage) message); + } else if (message instanceof ExperimenterMessage) { + messageListener.onExperimenterMessage((ExperimenterMessage) message); + } else if (message instanceof FlowRemovedMessage) { + messageListener.onFlowRemovedMessage((FlowRemovedMessage) message); + } else if (message instanceof HelloMessage) { + messageListener.onHelloMessage((HelloMessage) message); + } else if (message instanceof MultipartReplyMessage) { + messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); + } else if (message instanceof MultipartRequestMessage) { + messageListener.onMultipartRequestMessage((MultipartRequestMessage) message); + } else if (message instanceof PacketInMessage) { + messageListener.onPacketInMessage((PacketInMessage) message); + } else if (message instanceof PortStatusMessage) { + messageListener.onPortStatusMessage((PortStatusMessage) message); + } else { + LOG.warn("message listening not supported for type: "+message.getClass()); + } + } else { + if (message instanceof OfHeader) { + RpcResponseKey key = createRpcResponseKey((OfHeader) message); + SettableFuture> rpcFuture = findRpcResponse(key); + if (rpcFuture != null) { + rpcFuture.set(Rpcs.getRpcResult(true, message, null)); + responseCache.invalidate(key); + } else { + LOG.warn("received unexpected rpc response: "+key); + } + + } else { + LOG.warn("message listening not supported for type: "+message.getClass()); + } + } + } /** - * @param input - * @return + * sends given message to switch, sending result will be reported via return value + * @param input message to send + * @param failureInfo describes, what type of message caused failure by sending + * @return future object,
    + *
  • if send successful, {@link RpcResult} without errors and successful + * status will be returned,
  • + *
  • else {@link RpcResult} will contain errors and failed status
  • + *
*/ private SettableFuture> sendToSwitchFuture( - EchoReplyInput input, final String failureInfo) { + DataObject input, final String failureInfo) { ChannelFuture resultFuture = channel.writeAndFlush(input); ErrorSeverity errorSeverity = ErrorSeverity.ERROR; - String message = "check switch connection"; - return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, message); + String errorMessage = "check switch connection"; + return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, errorMessage); + } + + /** + * sends given message to switch, sending result or switch response will be reported via return value + * @param input message to send + * @param responseClazz type of response + * @param failureInfo describes, what type of message caused failure by sending + * @return future object,
    + *
  • if send fails, {@link RpcResult} will contain errors and failed status
  • + *
  • else {@link RpcResult} will be stored in responseCache and wait for particular timeout + * ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}), + *
    • either switch will manage to answer + * and then corresponding response message will be set into returned future
    • + *
    • or response in cache will expire and returned future will be cancelled
    + *
  • + *
+ */ + private SettableFuture> sendToSwitchExpectRpcResultFuture( + IN input, Class responseClazz, final String failureInfo) { + ChannelFuture resultFuture = channel.writeAndFlush(input); + + ErrorSeverity errorSeverity = ErrorSeverity.ERROR; + String errorMessage = "check switch connection"; + return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, + errorMessage, input, responseClazz); } /** @@ -226,7 +328,7 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { */ private SettableFuture> handleRpcChannelFuture( ChannelFuture resultFuture, final String failureInfo, - final ErrorSeverity errorSeverity, final String message) { + final ErrorSeverity errorSeverity, final String errorMessage) { final SettableFuture> rpcResult = SettableFuture.create(); @@ -240,7 +342,7 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { if (future.cause() != null) { RpcError rpcError = buildRpcError(failureInfo, - errorSeverity, message, future.cause()); + errorSeverity, errorMessage, future.cause()); errors = Lists.newArrayList(rpcError); } @@ -254,12 +356,58 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { return rpcResult; } + /** + * @param input + * @param responseClazz + * @param resultFuture + * @param failureInfo + * @param errorSeverity + * @param errorMessage + * @return + */ + private SettableFuture> handleRpcChannelFutureWithResponse( + ChannelFuture resultFuture, final String failureInfo, + final ErrorSeverity errorSeverity, final String errorMessage, + final IN input, Class responseClazz) { + final SettableFuture> rpcResult = SettableFuture.create(); + + resultFuture.addListener(new GenericFutureListener>() { + + @Override + public void operationComplete( + io.netty.util.concurrent.Future future) + throws Exception { + + if (future.cause() != null) { + Collection errors = null; + RpcError rpcError = buildRpcError(failureInfo, + errorSeverity, errorMessage, future.cause()); + errors = Lists.newArrayList(rpcError); + rpcResult.set(Rpcs.getRpcResult( + future.isSuccess(), + (OUT) null, + errors) + ); + } else { + RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString()); + if (responseCache.getIfPresent(key) != null) { + responseCache.invalidate(key); + } + responseCache.put(key, rpcResult); + } + } + }); + return rpcResult; + } + /** * @param resultFuture * @param failureInfo + * @param errorSeverity + * @param message * @return */ - private SettableFuture handleTransportChannelFuture( + private static SettableFuture handleTransportChannelFuture( ChannelFuture resultFuture, final String failureInfo, final ErrorSeverity errorSeverity, final String message) { @@ -272,6 +420,7 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { io.netty.util.concurrent.Future future) throws Exception { transportResult.set(future.isSuccess()); + transportResult.setException(future.cause()); } }); return transportResult; @@ -299,4 +448,20 @@ public class ConnectionAdapterImpl implements ConnectionAdapter { return error; } + /** + * @param message + * @return + */ + private static RpcResponseKey createRpcResponseKey(OfHeader message) { + return new RpcResponseKey(message.getXid(), message.getClass().toString()); + } + + /** + * @return + */ + @SuppressWarnings("unchecked") + private SettableFuture> findRpcResponse(RpcResponseKey key) { + return (SettableFuture>) responseCache.getIfPresent(key); + } + } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/MessageConsumer.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/MessageConsumer.java new file mode 100644 index 00000000..ca6d7f76 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/MessageConsumer.java @@ -0,0 +1,23 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.connection; + +import org.opendaylight.yangtools.yang.binding.DataObject; + +/** + * @author mirehak + * + */ +public interface MessageConsumer { + + /** + * @param message to process + */ + public void consume(DataObject message); +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/OnlineProvider.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/OnlineProvider.java new file mode 100644 index 00000000..dc68ca63 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/OnlineProvider.java @@ -0,0 +1,24 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.connection; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * @author mirehak + * + */ +public interface OnlineProvider { + + /** + * @return the isOnlineFuture + */ + public ListenableFuture getIsOnlineFuture(); + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/RpcResponseKey.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/RpcResponseKey.java new file mode 100644 index 00000000..1ef1df00 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/RpcResponseKey.java @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.connection; + + +/** + * @author mirehak + * + */ +public class RpcResponseKey { + + private final long xid; + private final String outputClazz; + /** + * @param xid + * @param outputClazz + */ + public RpcResponseKey(long xid, String outputClazz) { + super(); + this.xid = xid; + this.outputClazz = outputClazz; + } + + /** + * @return the xid + */ + public long getXid() { + return xid; + } + + /** + * @return the outputClazz + */ + public String getOutputClazz() { + return outputClazz; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((outputClazz == null) ? 0 : outputClazz.hashCode()); + result = prime * result + (int) (xid ^ (xid >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + RpcResponseKey other = (RpcResponseKey) obj; + if (outputClazz == null) { + if (other.outputClazz != null) + return false; + } else if (!outputClazz.equals(other.outputClazz)) + return false; + if (xid != other.xid) + return false; + return true; + } + + @Override + public String toString() { + return "RpcResultKey [xid=" + xid + ", outputClazz=" + outputClazz + + "]"; + } + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ServerFacade.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ServerFacade.java new file mode 100644 index 00000000..108e53aa --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ServerFacade.java @@ -0,0 +1,18 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.connection; + +/** + * @author mirehak + * + */ +public interface ServerFacade extends ShutdownProvider, OnlineProvider, Runnable { + + // empty unifying superinterface +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ShutdownProvider.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ShutdownProvider.java new file mode 100644 index 00000000..1be047ae --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/ShutdownProvider.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.connection; + +import com.google.common.util.concurrent.ListenableFuture; + + +/** + * @author mirehak + * + */ +public interface ShutdownProvider { + + /** + * @return shutdown future + */ + public ListenableFuture shutdown(); + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SwitchConnectionProviderImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SwitchConnectionProviderImpl.java new file mode 100644 index 00000000..858abfa7 --- /dev/null +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/connection/SwitchConnectionProviderImpl.java @@ -0,0 +1,94 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.openflowjava.protocol.impl.connection; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Future; + +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; +import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; +import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler; +import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * @author mirehak + * + */ +public class SwitchConnectionProviderImpl implements SwitchConnectionProvider { + + private static final Logger LOG = LoggerFactory + .getLogger(SwitchConnectionProviderImpl.class); + private SwitchConnectionHandler switchConnectionHandler; + private Set serverLot; + + @Override + public void configure(Collection connConfigs) { + LOG.debug("configurating .."); + + //TODO - add and configure servers according to configuration + serverLot = new HashSet<>(); + serverLot.add(new TcpHandler(6633)); + } + + @Override + public void setSwitchConnectionListener(SwitchConnectionHandler switchConnectionHandler) { + this.switchConnectionHandler = switchConnectionHandler; + } + + @Override + public Future shutdown() { + LOG.debug("shutdown summoned"); + for (ServerFacade server : serverLot) { + server.shutdown(); + } + return null; + } + + @Override + public Future> startup() { + LOG.debug("startup summoned"); + ListenableFuture> result = SettableFuture.create(); + try { + //TODO - check config, status of servers + if (switchConnectionHandler == null) { + throw new IllegalStateException("switchConnectionHandler is not set"); + } + + // starting + List> starterChain = new ArrayList<>(); + for (ServerFacade server : serverLot) { + new Thread(server).start(); + ListenableFuture isOnlineFuture = server.getIsOnlineFuture(); + starterChain.add(isOnlineFuture); + } + + if (!starterChain.isEmpty()) { + result = Futures.allAsList(starterChain); + } else { + throw new IllegalStateException("no servers configured"); + } + } catch (Exception e) { + SettableFuture> exFuture = SettableFuture.create(); + exFuture.setException(e); + result = exFuture; + } + return result; + } + +} diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java index 0cf8a5bd..5f339fda 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/PublishingChannelInitializer.java @@ -3,18 +3,22 @@ package org.opendaylight.openflowjava.protocol.impl.core; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; import java.util.Iterator; +import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; +import org.opendaylight.openflowjava.protocol.impl.connection.ConnectionAdapterFactory; import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler.COMPONENT_NAMES; /** * @author michal.polkorab * */ -public class PublishingChannelInitializer extends ChannelInitializer { +public class PublishingChannelInitializer extends ChannelInitializer { private DefaultChannelGroup allChannels; + private SwitchConnectionHandler switchConnectionHandler; /** * default ctor @@ -24,9 +28,15 @@ public class PublishingChannelInitializer extends ChannelInitializer { } @Override - protected void initChannel(Channel ch) throws Exception { + protected void initChannel(SocketChannel ch) throws Exception { allChannels.add(ch); + //TODO - create inBoundHandler + if (switchConnectionHandler != null) { + switchConnectionHandler.onSwitchConnected(ConnectionAdapterFactory.createConnectionAdapter(ch)); + //TODO - check OpenflowProtocolListener, set it to inBoundHandler + } ch.pipeline().addLast(COMPONENT_NAMES.TLS_DETECTOR.name(), new TlsDetector()); + //TODO - chain inBoundHandler to pipe } /** @@ -43,6 +53,10 @@ public class PublishingChannelInitializer extends ChannelInitializer { return allChannels.size(); } - - + /** + * @param switchListener the switchListener to set + */ + public void setSwitchConnectionHandler(SwitchConnectionHandler switchListener) { + this.switchConnectionHandler = switchListener; + } } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java index 5d342fde..2d12c5c7 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java @@ -8,12 +8,15 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.util.concurrent.GenericFutureListener; import java.net.InetSocketAddress; +import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; /** @@ -21,7 +24,7 @@ import com.google.common.util.concurrent.SettableFuture; * * @author michal.polkorab */ -public class TcpHandler extends Thread { +public class TcpHandler implements ServerFacade { private int port; private String address; @@ -111,9 +114,22 @@ public class TcpHandler extends Thread { /** * Shuts down {@link TcpHandler}} */ - public void shutdown() { + @Override + public ListenableFuture shutdown() { + final SettableFuture result = SettableFuture.create(); workerGroup.shutdownGracefully(); - bossGroup.shutdownGracefully(); + // boss will shutdown as soon, as worker is down + bossGroup.shutdownGracefully().addListener(new GenericFutureListener>() { + + @Override + public void operationComplete( + io.netty.util.concurrent.Future downResult) throws Exception { + result.set(downResult.isSuccess()); + result.setException(downResult.cause()); + } + + }); + return result; } /** @@ -144,13 +160,11 @@ public class TcpHandler extends Thread { } else { port = 6633; } - new TcpHandler(port).start(); + new Thread(new TcpHandler(port)).start(); } - /** - * @return the isOnlineFuture - */ - public SettableFuture getIsOnlineFuture() { + @Override + public ListenableFuture getIsOnlineFuture() { return isOnlineFuture; } diff --git a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java index 47b2a334..f06f0804 100644 --- a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java +++ b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandlerTest.java @@ -45,7 +45,7 @@ public class TcpHandlerTest { @Before public void setUp() throws InterruptedException, ExecutionException { tcphandler = new TcpHandler(0); - tcphandler.start(); + new Thread(tcphandler).start(); tcphandler.getIsOnlineFuture().get(); port = tcphandler.getPort(); address = tcphandler.getAddress(); @@ -53,10 +53,12 @@ public class TcpHandlerTest { /** * stop {@link TcpHandler} + * @throws ExecutionException + * @throws InterruptedException */ @After - public void tearDown() { - tcphandler.shutdown(); + public void tearDown() throws InterruptedException, ExecutionException { + tcphandler.shutdown().get(); } /** diff --git a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetectorTest.java b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetectorTest.java index 1279ef6b..f0d94116 100644 --- a/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetectorTest.java +++ b/openflow-protocol-impl/src/test/java/org/opendaylight/openflowjava/protocol/impl/core/TlsDetectorTest.java @@ -7,8 +7,6 @@ import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler; -import org.opendaylight.openflowjava.protocol.impl.core.TlsDetector; import org.opendaylight.openflowjava.protocol.impl.core.TcpHandler.COMPONENT_NAMES; /** diff --git a/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java b/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java index c6f8ba48..2d1a16ca 100644 --- a/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java +++ b/openflow-protocol-spi/src/main/java/org/opendaylight/openflowjava/protocol/spi/connection/SwitchConnectionProvider.java @@ -8,8 +8,11 @@ package org.opendaylight.openflowjava.protocol.spi.connection; +import java.util.Collection; +import java.util.List; import java.util.concurrent.Future; +import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler; /** @@ -19,17 +22,16 @@ import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHan public interface SwitchConnectionProvider { /** - * @param configuration contains protocols, ports, addresses and similar connection parameters - * TODO - create configuration interface proposal + * @param configurations list of [protocol, port, address and supported features] */ - public void configure(Object configuration); + public void configure(Collection configurations); /** * start listening to switches, but please don't forget to do * {@link #setSwitchConnectionListener(SwitchConnectionHandler)} first * @return future, triggered to true, when all listening channels are up and running */ - public Future startup(); + public Future> startup(); /** * stop listening to switches @@ -37,7 +39,6 @@ public interface SwitchConnectionProvider { */ public Future shutdown(); - /** * @param switchConListener instance being informed when new switch connects */ -- 2.36.6