From: Robert Varga Date: Sun, 9 Feb 2014 22:41:45 +0000 (+0100) Subject: Rework NETCONF interfaces X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~472^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=950be361f30f0dc9df109c1c649deb92d9cebb05;hp=-c Rework NETCONF interfaces NetconfSession and NetconfClientSessionListener are supposed to be interfaces, not classes. Convert them to such and introduce utility classes which retain their current functionality. Remove an unused method and fix type safety warnings while we're at it. Change-Id: Id7d78c831e3c3d46abb4379efe4a5ca353cd55ff Signed-off-by: Robert Varga --- 950be361f30f0dc9df109c1c649deb92d9cebb05 diff --git a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/AbstractNetconfSession.java b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/AbstractNetconfSession.java new file mode 100644 index 0000000000..bd75c27dd6 --- /dev/null +++ b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/AbstractNetconfSession.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.netconf.api; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + +import java.io.IOException; + +import org.opendaylight.protocol.framework.AbstractProtocolSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractNetconfSession> extends AbstractProtocolSession implements NetconfSession { + private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSession.class); + private final L sessionListener; + private final long sessionId; + private boolean up = false; + + protected final Channel channel; + + protected AbstractNetconfSession(L sessionListener, Channel channel, long sessionId) { + this.sessionListener = sessionListener; + this.channel = channel; + this.sessionId = sessionId; + logger.debug("Session {} created", toString()); + } + + protected abstract S thisInstance(); + + @Override + public void close() { + channel.close(); + up = false; + sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed")); + } + + @Override + protected void handleMessage(NetconfMessage netconfMessage) { + logger.debug("handling incoming message"); + sessionListener.onMessage(thisInstance(), netconfMessage); + } + + @Override + public ChannelFuture sendMessage(NetconfMessage netconfMessage) { + return channel.writeAndFlush(netconfMessage); + } + + @Override + protected void endOfInput() { + logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up" + : "initialized"); + if (isUp()) { + this.sessionListener.onSessionDown(thisInstance(), new IOException("End of input detected. Close the session.")); + } + } + + @Override + protected void sessionUp() { + logger.debug("Session {} up", toString()); + sessionListener.onSessionUp(thisInstance()); + this.up = true; + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("ServerNetconfSession{"); + sb.append("sessionId=").append(sessionId); + sb.append('}'); + return sb.toString(); + } + + public final boolean isUp() { + return up; + } + + public final long getSessionId() { + return sessionId; + } +} + diff --git a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSession.java b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSession.java index 4770cb128f..e52e71ceea 100644 --- a/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSession.java +++ b/opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSession.java @@ -7,78 +7,10 @@ */ package org.opendaylight.controller.netconf.api; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import java.io.IOException; +import org.opendaylight.protocol.framework.ProtocolSession; -import org.opendaylight.protocol.framework.AbstractProtocolSession; -import org.opendaylight.protocol.framework.SessionListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class NetconfSession extends AbstractProtocolSession { - private static final Logger logger = LoggerFactory.getLogger(NetconfSession.class); - private final SessionListener sessionListener; - private final long sessionId; - private boolean up = false; - - protected final Channel channel; - - protected NetconfSession(SessionListener sessionListener, Channel channel, long sessionId) { - this.sessionListener = sessionListener; - this.channel = channel; - this.sessionId = sessionId; - logger.debug("Session {} created", toString()); - } - - @Override - public void close() { - channel.close(); - up = false; - sessionListener.onSessionTerminated(this, new NetconfTerminationReason("Session closed")); - } - - @Override - protected void handleMessage(NetconfMessage netconfMessage) { - logger.debug("handling incoming message"); - sessionListener.onMessage(this, netconfMessage); - } - - public ChannelFuture sendMessage(NetconfMessage netconfMessage) { - return channel.writeAndFlush(netconfMessage); - } - - @Override - protected void endOfInput() { - logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up" - : "initialized"); - if (isUp()) { - this.sessionListener.onSessionDown(this, new IOException("End of input detected. Close the session.")); - } - } - - @Override - protected void sessionUp() { - logger.debug("Session {} up", toString()); - sessionListener.onSessionUp(this); - this.up = true; - } - - @Override - public String toString() { - final StringBuffer sb = new StringBuffer("ServerNetconfSession{"); - sb.append("sessionId=").append(sessionId); - sb.append('}'); - return sb.toString(); - } - - public final boolean isUp() { - return up; - } - - public final long getSessionId() { - return sessionId; - } +public interface NetconfSession extends ProtocolSession { + ChannelFuture sendMessage(NetconfMessage message); } - diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java index aee4085599..6ae966d1f7 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/AbstractNetconfClientNotifySessionListener.java @@ -15,7 +15,7 @@ import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; /** * Class extending {@link NetconfClientSessionListener} to provide notification capability. */ -public abstract class AbstractNetconfClientNotifySessionListener extends NetconfClientSessionListener { +public abstract class AbstractNetconfClientNotifySessionListener extends SimpleNetconfClientSessionListener { /* * Maybe some capabilities could be expressed as internal NetconfClientSessionListener handlers. * It would enable NetconfClient functionality to be extended by using namespace handlers. diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java index a9dd2c3394..4cdca208bc 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClient.java @@ -31,6 +31,10 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; +/** + * @deprecated Use {@link NetconfClientDispatcher.createClient()} or {@link NetconfClientDispatcher.createReconnectingClient()} instead. + */ +@Deprecated public class NetconfClient implements Closeable { private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class); @@ -53,7 +57,7 @@ public class NetconfClient implements Closeable { private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException { this.label = clientLabelForLogging; dispatch = netconfClientDispatcher; - sessionListener = new NetconfClientSessionListener(); + sessionListener = new SimpleNetconfClientSessionListener(); Future clientFuture = dispatch.createClient(address, sessionListener, strat); this.address = address; clientSession = get(clientFuture); @@ -74,7 +78,8 @@ public class NetconfClient implements Closeable { return new NetconfClient(clientLabelForLogging,address,strategy,netconfClientDispatcher); } - public static NetconfClient clientFor(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strategy, NetconfClientDispatcher netconfClientDispatcher,NetconfClientSessionListener listener) throws InterruptedException { + public static NetconfClient clientFor(String clientLabelForLogging, InetSocketAddress address, + ReconnectStrategy strategy, NetconfClientDispatcher netconfClientDispatcher, NetconfClientSessionListener listener) throws InterruptedException { return new NetconfClient(clientLabelForLogging,address,strategy,netconfClientDispatcher,listener); } @@ -102,7 +107,7 @@ public class NetconfClient implements Closeable { } public Future sendRequest(NetconfMessage message) { - return sessionListener.sendRequest(message); + return ((SimpleNetconfClientSessionListener)sessionListener).sendRequest(message); } /** @@ -122,7 +127,7 @@ public class NetconfClient implements Closeable { final Stopwatch stopwatch = new Stopwatch().start(); try { - return sessionListener.sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS); + return sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS); } finally { stopwatch.stop(); logger.debug("Total time spent waiting for response from {}: {} ms", address, stopwatch.elapsed(TimeUnit.MILLISECONDS)); diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java index dd08bf565c..bff2a54c58 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientDispatcher.java @@ -17,10 +17,10 @@ import io.netty.util.concurrent.Promise; import java.io.Closeable; import java.net.InetSocketAddress; -import org.opendaylight.controller.netconf.api.NetconfSession; import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; import org.opendaylight.protocol.framework.AbstractDispatcher; import org.opendaylight.protocol.framework.ReconnectStrategy; +import org.opendaylight.protocol.framework.ReconnectStrategyFactory; import org.opendaylight.protocol.framework.SessionListenerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +29,7 @@ import com.google.common.base.Optional; public class NetconfClientDispatcher extends AbstractDispatcher implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class); + private static final Logger logger = LoggerFactory.getLogger(NetconfClientDispatcher.class); private final NetconfClientSessionNegotiatorFactory negotatorFactory; private final HashedWheelTimer timer; @@ -62,7 +62,21 @@ public class NetconfClientDispatcher extends AbstractDispatcher createReconnectingClient(final InetSocketAddress address, + final NetconfClientSessionListener listener, + final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) { + final ClientChannelInitializer init = new ClientChannelInitializer(negotatorFactory, listener); + + return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, + new PipelineInitializer() { + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise) { + init.initialize(ch, promise); + } + }); + } + + private static class ClientChannelInitializer extends AbstractChannelInitializer { private final NetconfClientSessionNegotiatorFactory negotiatorFactory; private final NetconfClientSessionListener sessionListener; @@ -74,12 +88,7 @@ public class NetconfClientDispatcher extends AbstractDispatcher promise) { - super.initialize(ch,promise); - } - - @Override - protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { + protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator( new SessionListenerFactory() { @Override @@ -88,8 +97,8 @@ public class NetconfClientDispatcher extends AbstractDispatcher { private static final Logger logger = LoggerFactory.getLogger(NetconfClientSession.class); private final Collection capabilities; - public NetconfClientSession(SessionListener sessionListener, Channel channel, long sessionId, + public NetconfClientSession(NetconfClientSessionListener sessionListener, Channel channel, long sessionId, Collection capabilities) { super(sessionListener,channel,sessionId); this.capabilities = capabilities; @@ -35,4 +32,8 @@ public class NetconfClientSession extends NetconfSession { return capabilities; } + @Override + protected NetconfClientSession thisInstance() { + return this; + } } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java index 1ac2e7e264..21be3a8cab 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java @@ -1,115 +1,14 @@ /* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.netconf.client; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.Promise; - -import java.util.ArrayDeque; -import java.util.Queue; - -import javax.annotation.concurrent.GuardedBy; - -import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSessionListener; -import org.opendaylight.controller.netconf.api.NetconfTerminationReason; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public class NetconfClientSessionListener implements NetconfSessionListener { - private static final class RequestEntry { - final Promise promise; - final NetconfMessage request; - - public RequestEntry(Promise future, NetconfMessage request) { - this.promise = Preconditions.checkNotNull(future); - this.request = Preconditions.checkNotNull(request); - } - } - - private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class); - - @GuardedBy("this") - private final Queue requests = new ArrayDeque<>(); - - @GuardedBy("this") - private NetconfClientSession clientSession; - - @GuardedBy("this") - private void dispatchRequest() { - while (!requests.isEmpty()) { - final RequestEntry e = requests.peek(); - if (e.promise.setUncancellable()) { - logger.debug("Sending message {}", e.request); - clientSession.sendMessage(e.request); - break; - } - - logger.debug("Message {} has been cancelled, skipping it", e.request); - requests.poll(); - } - } - - @Override - public final synchronized void onSessionUp(NetconfClientSession clientSession) { - this.clientSession = Preconditions.checkNotNull(clientSession); - logger.debug("Client session {} went up", clientSession); - dispatchRequest(); - } - - private synchronized void tearDown(final Exception cause) { - final RequestEntry e = requests.poll(); - if (e != null) { - e.promise.setFailure(cause); - } - - this.clientSession = null; - } - - @Override - public final void onSessionDown(NetconfClientSession clientSession, Exception e) { - logger.debug("Client Session {} went down unexpectedly", clientSession, e); - tearDown(e); - } - - @Override - public final void onSessionTerminated(NetconfClientSession clientSession, - NetconfTerminationReason netconfTerminationReason) { - logger.debug("Client Session {} terminated, reason: {}", clientSession, - netconfTerminationReason.getErrorMessage()); - tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage())); - } - - @Override - public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) { - logger.debug("New message arrived: {}", message); - - final RequestEntry e = requests.poll(); - if (e != null) { - e.promise.setSuccess(message); - dispatchRequest(); - } else { - logger.info("Ignoring unsolicited message {}", message); - } - } - - final synchronized Future sendRequest(NetconfMessage message) { - final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.newPromise(), message); - requests.add(req); - if (clientSession != null) { - dispatchRequest(); - } +public interface NetconfClientSessionListener extends NetconfSessionListener { - return req.promise; - } } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java index 100b98c15a..3c2e814d89 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiator.java @@ -8,11 +8,17 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; import io.netty.channel.Channel; import io.netty.util.Timer; import io.netty.util.concurrent.Promise; + +import java.util.Collection; +import java.util.List; + +import javax.annotation.Nullable; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; + import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator; @@ -20,21 +26,17 @@ import org.opendaylight.controller.netconf.util.xml.XMLNetconfUtil; import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; -import org.opendaylight.protocol.framework.SessionListener; import org.w3c.dom.Document; import org.w3c.dom.Node; -import javax.annotation.Nullable; -import javax.xml.xpath.XPathConstants; -import javax.xml.xpath.XPathExpression; -import java.util.Collection; -import java.util.List; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; public class NetconfClientSessionNegotiator extends - AbstractNetconfSessionNegotiator { + AbstractNetconfSessionNegotiator { protected NetconfClientSessionNegotiator(NetconfSessionPreferences sessionPreferences, - Promise promise, Channel channel, Timer timer, SessionListener sessionListener, + Promise promise, Channel channel, Timer timer, NetconfClientSessionListener sessionListener, long connectionTimeoutMillis) { super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis); } @@ -69,7 +71,7 @@ public class NetconfClientSessionNegotiator extends } @Override - protected NetconfClientSession getSession(SessionListener sessionListener, Channel channel, NetconfMessage message) { + protected NetconfClientSession getSession(NetconfClientSessionListener sessionListener, Channel channel, NetconfMessage message) { return new NetconfClientSession(sessionListener, channel, extractSessionId(message.getDocument()), getCapabilities(message.getDocument())); } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorFactory.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorFactory.java index db6c024e5a..e678a601ff 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorFactory.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionNegotiatorFactory.java @@ -8,11 +8,13 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.util.Timer; import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.io.InputStream; + import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.xml.XmlUtil; @@ -21,18 +23,17 @@ import org.opendaylight.protocol.framework.SessionNegotiator; import org.opendaylight.protocol.framework.SessionNegotiatorFactory; import org.xml.sax.SAXException; -import java.io.IOException; -import java.io.InputStream; - -public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory { +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; - private final Timer timer; +public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory { private final Optional additionalHeader; private final long connectionTimeoutMillis; + private final Timer timer; public NetconfClientSessionNegotiatorFactory(Timer timer, Optional additionalHeader, long connectionTimeoutMillis) { - this.timer = timer; + this.timer = Preconditions.checkNotNull(timer); this.additionalHeader = additionalHeader; this.connectionTimeoutMillis = connectionTimeoutMillis; } @@ -48,8 +49,8 @@ public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorF } @Override - public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel, - Promise promise) { + public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel, + Promise promise) { // Hello message needs to be recreated every time NetconfMessage helloMessage = loadHelloMessageTemplate(); if(this.additionalHeader.isPresent()) { @@ -59,5 +60,4 @@ public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorF return new NetconfClientSessionNegotiator(proposal, promise, channel, timer, sessionListenerFactory.getSessionListener(), connectionTimeoutMillis); } - } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java index 25beb65179..0737279b04 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfSshClientDispatcher.java @@ -17,12 +17,12 @@ import io.netty.util.concurrent.Promise; import java.io.IOException; import java.net.InetSocketAddress; -import org.opendaylight.controller.netconf.api.NetconfSession; import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler; import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler; import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker; import org.opendaylight.protocol.framework.ReconnectStrategy; +import org.opendaylight.protocol.framework.ReconnectStrategyFactory; import org.opendaylight.protocol.framework.SessionListenerFactory; import com.google.common.base.Optional; @@ -62,7 +62,22 @@ public class NetconfSshClientDispatcher extends NetconfClientDispatcher { }); } - private static final class NetconfSshClientInitializer extends AbstractChannelInitializer { + @Override + public Future createReconnectingClient(final InetSocketAddress address, + final NetconfClientSessionListener listener, + final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) { + final NetconfSshClientInitializer init = new NetconfSshClientInitializer(authHandler, negotatorFactory, listener); + + return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy, + new PipelineInitializer() { + @Override + public void initializeChannel(final SocketChannel ch, final Promise promise) { + init.initialize(ch, promise); + } + }); + } + + private static final class NetconfSshClientInitializer extends AbstractChannelInitializer { private final AuthenticationHandler authenticationHandler; private final NetconfClientSessionNegotiatorFactory negotiatorFactory; @@ -77,7 +92,7 @@ public class NetconfSshClientDispatcher extends NetconfClientDispatcher { } @Override - public void initialize(SocketChannel ch, Promise promise) { + public void initialize(SocketChannel ch, Promise promise) { try { Invoker invoker = Invoker.subsystem("netconf"); ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker)); @@ -88,7 +103,7 @@ public class NetconfSshClientDispatcher extends NetconfClientDispatcher { } @Override - protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { + protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory() { @Override public NetconfClientSessionListener getSessionListener() { diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListener.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListener.java new file mode 100644 index 0000000000..e96161c29a --- /dev/null +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SimpleNetconfClientSessionListener.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.client; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; + +import java.util.ArrayDeque; +import java.util.Queue; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.api.NetconfTerminationReason; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class SimpleNetconfClientSessionListener implements NetconfClientSessionListener { + private static final class RequestEntry { + final Promise promise; + final NetconfMessage request; + + public RequestEntry(Promise future, NetconfMessage request) { + this.promise = Preconditions.checkNotNull(future); + this.request = Preconditions.checkNotNull(request); + } + } + + private static final Logger logger = LoggerFactory.getLogger(SimpleNetconfClientSessionListener.class); + + @GuardedBy("this") + private final Queue requests = new ArrayDeque<>(); + + @GuardedBy("this") + private NetconfClientSession clientSession; + + @GuardedBy("this") + private void dispatchRequest() { + while (!requests.isEmpty()) { + final RequestEntry e = requests.peek(); + if (e.promise.setUncancellable()) { + logger.debug("Sending message {}", e.request); + clientSession.sendMessage(e.request); + break; + } + + logger.debug("Message {} has been cancelled, skipping it", e.request); + requests.poll(); + } + } + + @Override + public final synchronized void onSessionUp(NetconfClientSession clientSession) { + this.clientSession = Preconditions.checkNotNull(clientSession); + logger.debug("Client session {} went up", clientSession); + dispatchRequest(); + } + + private synchronized void tearDown(final Exception cause) { + final RequestEntry e = requests.poll(); + if (e != null) { + e.promise.setFailure(cause); + } + + this.clientSession = null; + } + + @Override + public final void onSessionDown(NetconfClientSession clientSession, Exception e) { + logger.debug("Client Session {} went down unexpectedly", clientSession, e); + tearDown(e); + } + + @Override + public final void onSessionTerminated(NetconfClientSession clientSession, + NetconfTerminationReason netconfTerminationReason) { + logger.debug("Client Session {} terminated, reason: {}", clientSession, + netconfTerminationReason.getErrorMessage()); + tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage())); + } + + @Override + public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) { + logger.debug("New message arrived: {}", message); + + final RequestEntry e = requests.poll(); + if (e != null) { + e.promise.setSuccess(message); + dispatchRequest(); + } else { + logger.info("Ignoring unsolicited message {}", message); + } + } + + final synchronized Future sendRequest(NetconfMessage message) { + final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.newPromise(), message); + + requests.add(req); + if (clientSession != null) { + dispatchRequest(); + } + + return req.promise; + } +} diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerDispatcher.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerDispatcher.java index 7c5bd0cb21..bd39049c56 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerDispatcher.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerDispatcher.java @@ -12,14 +12,14 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.Promise; -import org.opendaylight.controller.netconf.api.NetconfSession; + +import java.net.InetSocketAddress; + import org.opendaylight.controller.netconf.impl.util.DeserializerExceptionHandler; import org.opendaylight.controller.netconf.util.AbstractChannelInitializer; import org.opendaylight.protocol.framework.AbstractDispatcher; -import java.net.InetSocketAddress; - -public class NetconfServerDispatcher extends AbstractDispatcher { +public class NetconfServerDispatcher extends AbstractDispatcher { private final ServerChannelInitializer initializer; @@ -31,15 +31,15 @@ public class NetconfServerDispatcher extends AbstractDispatcher() { + return super.createServer(address, new PipelineInitializer() { @Override - public void initializeChannel(final SocketChannel ch, final Promise promise) { + public void initializeChannel(final SocketChannel ch, final Promise promise) { initializer.initialize(ch, promise); } }); } - public static class ServerChannelInitializer extends AbstractChannelInitializer { + public static class ServerChannelInitializer extends AbstractChannelInitializer { private final NetconfServerSessionNegotiatorFactory negotiatorFactory; private final NetconfServerSessionListenerFactory listenerFactory; @@ -51,11 +51,10 @@ public class NetconfServerDispatcher extends AbstractDispatcher promise) { + protected void initializeAfterDecoder(SocketChannel ch, Promise promise) { ch.pipeline().addLast("deserializerExHandler", new DeserializerExceptionHandler()); ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise)); } - } } diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSession.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSession.java index 1d22fa050b..93d4e55410 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSession.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSession.java @@ -15,11 +15,8 @@ import java.util.Date; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.api.NetconfSession; -import org.opendaylight.controller.netconf.api.NetconfTerminationReason; +import org.opendaylight.controller.netconf.api.AbstractNetconfSession; import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession; -import org.opendaylight.protocol.framework.SessionListener; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.NetconfTcp; @@ -37,7 +34,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -public class NetconfServerSession extends NetconfSession implements NetconfManagementSession { +public final class NetconfServerSession extends AbstractNetconfSession implements NetconfManagementSession { private static final Logger logger = LoggerFactory.getLogger(NetconfServerSession.class); @@ -46,7 +43,7 @@ public class NetconfServerSession extends NetconfSession implements NetconfManag private Date loginTime; private long inRpcSuccess, inRpcFail, outRpcError; - public NetconfServerSession(SessionListener sessionListener, Channel channel, long sessionId, + public NetconfServerSession(NetconfServerSessionListener sessionListener, Channel channel, long sessionId, NetconfServerSessionNegotiator.AdditionalHeader header) { super(sessionListener, channel, sessionId); this.header = header; @@ -108,9 +105,9 @@ public class NetconfServerSession extends NetconfSession implements NetconfManag private Class getTransportForString(String transport) { switch(transport) { - case "ssh" : return NetconfSsh.class; - case "tcp" : return NetconfTcp.class; - default: throw new IllegalArgumentException("Unknown transport type " + transport); + case "ssh" : return NetconfSsh.class; + case "tcp" : return NetconfTcp.class; + default: throw new IllegalArgumentException("Unknown transport type " + transport); } } @@ -119,4 +116,8 @@ public class NetconfServerSession extends NetconfSession implements NetconfManag return dateFormat.format(loginTime); } + @Override + protected NetconfServerSession thisInstance() { + return this; + } } diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiator.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiator.java index 91555861dc..1303d11435 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiator.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiator.java @@ -8,33 +8,34 @@ package org.opendaylight.controller.netconf.impl; -import com.google.common.base.Optional; import io.netty.channel.Channel; import io.netty.util.Timer; import io.netty.util.concurrent.Promise; + +import java.net.InetSocketAddress; + import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences; import org.opendaylight.controller.netconf.impl.util.AdditionalHeaderUtil; import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetSocketAddress; +import com.google.common.base.Optional; public class NetconfServerSessionNegotiator extends - AbstractNetconfSessionNegotiator { + AbstractNetconfSessionNegotiator { static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionNegotiator.class); protected NetconfServerSessionNegotiator(NetconfServerSessionPreferences sessionPreferences, - Promise promise, Channel channel, Timer timer, SessionListener sessionListener, + Promise promise, Channel channel, Timer timer, NetconfServerSessionListener sessionListener, long connectionTimeoutMillis) { super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis); } @Override - protected NetconfServerSession getSession(SessionListener sessionListener, Channel channel, NetconfMessage message) { + protected NetconfServerSession getSession(NetconfServerSessionListener sessionListener, Channel channel, NetconfMessage message) { Optional additionalHeader = message.getAdditionalHeader(); AdditionalHeader parsedHeader; diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java index 98462b8025..8086b748d7 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java @@ -8,10 +8,15 @@ package org.opendaylight.controller.netconf.impl; -import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.util.Timer; import io.netty.util.concurrent.Promise; + +import java.io.InputStream; + +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; + import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences; import org.opendaylight.controller.netconf.impl.mapping.CapabilityProvider; @@ -27,11 +32,9 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; -import javax.xml.xpath.XPathConstants; -import javax.xml.xpath.XPathExpression; -import java.io.InputStream; +import com.google.common.base.Preconditions; -public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory { +public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory { public static final String SERVER_HELLO_XML_LOCATION = "/server_hello.xml"; @@ -59,8 +62,8 @@ public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorF } @Override - public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel, - Promise promise) { + public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel, + Promise promise) { long sessionId = idProvider.getNextSessionId(); NetconfServerSessionPreferences proposal = new NetconfServerSessionPreferences(createHelloMessage(sessionId), @@ -97,6 +100,6 @@ public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorF } private synchronized Document getHelloTemplateClone() { - return (Document) this.helloMessageTemplate.cloneNode(true); + return (Document) helloMessageTemplate.cloneNode(true); } } diff --git a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java index bc68db85de..ece9d47ee9 100644 --- a/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java +++ b/opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java @@ -234,10 +234,6 @@ public class NetconfOperationRouterImpl implements NetconfOperationRouter { private class NetconfOperationExecution implements NetconfOperationFilterChain { private final NetconfOperation operationWithHighestPriority; - private NetconfOperationExecution(NetconfOperation operationWithHighestPriority) { - this.operationWithHighestPriority = operationWithHighestPriority; - } - public NetconfOperationExecution(TreeMap> sortedPriority, HandlingPriority highestFoundPriority) { operationWithHighestPriority = sortedPriority.get(highestFoundPriority).iterator().next(); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java index 5b4b3d02ea..7068de8526 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java @@ -18,9 +18,9 @@ import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncod import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; -public abstract class AbstractChannelInitializer { +public abstract class AbstractChannelInitializer { - public void initialize(SocketChannel ch, Promise promise){ + public void initialize(SocketChannel ch, Promise promise){ ch.pipeline().addLast("aggregator", new NetconfMessageAggregator(FramingMechanism.EOM)); ch.pipeline().addLast(new NetconfXMLToMessageDecoder()); initializeAfterDecoder(ch, promise); @@ -28,6 +28,6 @@ public abstract class AbstractChannelInitializer { ch.pipeline().addLast(new NetconfMessageToXMLEncoder()); } - protected abstract void initializeAfterDecoder(SocketChannel ch, Promise promise); + protected abstract void initializeAfterDecoder(SocketChannel ch, Promise promise); } diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index a485a4ea94..9c35c7225f 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.netconf.util; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -20,8 +18,12 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; + +import java.util.concurrent.TimeUnit; + +import org.opendaylight.controller.netconf.api.AbstractNetconfSession; import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.api.NetconfSession; +import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; @@ -31,23 +33,23 @@ import org.opendaylight.controller.netconf.util.xml.XmlElement; import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.protocol.framework.AbstractSessionNegotiator; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; -public abstract class AbstractNetconfSessionNegotiator

- extends AbstractSessionNegotiator { +public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> +extends AbstractSessionNegotiator { private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; protected final P sessionPreferences; - private final SessionListener sessionListener; + private final L sessionListener; private Timeout timeout; /** @@ -62,7 +64,7 @@ public abstract class AbstractNetconfSessionNegotiator

promise, Channel channel, Timer timer, - SessionListener sessionListener, long connectionTimeoutMillis) { + L sessionListener, long connectionTimeoutMillis) { super(promise, channel); this.sessionPreferences = sessionPreferences; this.timer = timer; @@ -168,7 +170,7 @@ public abstract class AbstractNetconfSessionNegotiator