--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.api;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+
+import java.io.IOException;
+
+import org.opendaylight.protocol.framework.AbstractProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractNetconfSession<S extends NetconfSession, L extends NetconfSessionListener<S>> extends AbstractProtocolSession<NetconfMessage> implements NetconfSession {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSession.class);
+ private final L sessionListener;
+ private final long sessionId;
+ private boolean up = false;
+
+ protected final Channel channel;
+
+ protected AbstractNetconfSession(L sessionListener, Channel channel, long sessionId) {
+ this.sessionListener = sessionListener;
+ this.channel = channel;
+ this.sessionId = sessionId;
+ logger.debug("Session {} created", toString());
+ }
+
+ protected abstract S thisInstance();
+
+ @Override
+ public void close() {
+ channel.close();
+ up = false;
+ sessionListener.onSessionTerminated(thisInstance(), new NetconfTerminationReason("Session closed"));
+ }
+
+ @Override
+ protected void handleMessage(NetconfMessage netconfMessage) {
+ logger.debug("handling incoming message");
+ sessionListener.onMessage(thisInstance(), netconfMessage);
+ }
+
+ @Override
+ public ChannelFuture sendMessage(NetconfMessage netconfMessage) {
+ return channel.writeAndFlush(netconfMessage);
+ }
+
+ @Override
+ protected void endOfInput() {
+ logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
+ : "initialized");
+ if (isUp()) {
+ this.sessionListener.onSessionDown(thisInstance(), new IOException("End of input detected. Close the session."));
+ }
+ }
+
+ @Override
+ protected void sessionUp() {
+ logger.debug("Session {} up", toString());
+ sessionListener.onSessionUp(thisInstance());
+ this.up = true;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuffer sb = new StringBuffer("ServerNetconfSession{");
+ sb.append("sessionId=").append(sessionId);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public final boolean isUp() {
+ return up;
+ }
+
+ public final long getSessionId() {
+ return sessionId;
+ }
+}
+
*/
package org.opendaylight.controller.netconf.api;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import java.io.IOException;
+import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
-import org.opendaylight.protocol.framework.SessionListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class NetconfSession extends AbstractProtocolSession<NetconfMessage> {
- private static final Logger logger = LoggerFactory.getLogger(NetconfSession.class);
- private final SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener;
- private final long sessionId;
- private boolean up = false;
-
- protected final Channel channel;
-
- protected NetconfSession(SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener, Channel channel, long sessionId) {
- this.sessionListener = sessionListener;
- this.channel = channel;
- this.sessionId = sessionId;
- logger.debug("Session {} created", toString());
- }
-
- @Override
- public void close() {
- channel.close();
- up = false;
- sessionListener.onSessionTerminated(this, new NetconfTerminationReason("Session closed"));
- }
-
- @Override
- protected void handleMessage(NetconfMessage netconfMessage) {
- logger.debug("handling incoming message");
- sessionListener.onMessage(this, netconfMessage);
- }
-
- public ChannelFuture sendMessage(NetconfMessage netconfMessage) {
- return channel.writeAndFlush(netconfMessage);
- }
-
- @Override
- protected void endOfInput() {
- logger.debug("Session {} end of input detected while session was in state {}", toString(), isUp() ? "up"
- : "initialized");
- if (isUp()) {
- this.sessionListener.onSessionDown(this, new IOException("End of input detected. Close the session."));
- }
- }
-
- @Override
- protected void sessionUp() {
- logger.debug("Session {} up", toString());
- sessionListener.onSessionUp(this);
- this.up = true;
- }
-
- @Override
- public String toString() {
- final StringBuffer sb = new StringBuffer("ServerNetconfSession{");
- sb.append("sessionId=").append(sessionId);
- sb.append('}');
- return sb.toString();
- }
-
- public final boolean isUp() {
- return up;
- }
-
- public final long getSessionId() {
- return sessionId;
- }
+public interface NetconfSession extends ProtocolSession<NetconfMessage> {
+ ChannelFuture sendMessage(NetconfMessage message);
}
-
/**
* Class extending {@link NetconfClientSessionListener} to provide notification capability.
*/
-public abstract class AbstractNetconfClientNotifySessionListener extends NetconfClientSessionListener {
+public abstract class AbstractNetconfClientNotifySessionListener extends SimpleNetconfClientSessionListener {
/*
* Maybe some capabilities could be expressed as internal NetconfClientSessionListener handlers.
* It would enable NetconfClient functionality to be extended by using namespace handlers.
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
+/**
+ * @deprecated Use {@link NetconfClientDispatcher.createClient()} or {@link NetconfClientDispatcher.createReconnectingClient()} instead.
+ */
+@Deprecated
public class NetconfClient implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class);
private NetconfClient(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strat, NetconfClientDispatcher netconfClientDispatcher) throws InterruptedException {
this.label = clientLabelForLogging;
dispatch = netconfClientDispatcher;
- sessionListener = new NetconfClientSessionListener();
+ sessionListener = new SimpleNetconfClientSessionListener();
Future<NetconfClientSession> clientFuture = dispatch.createClient(address, sessionListener, strat);
this.address = address;
clientSession = get(clientFuture);
return new NetconfClient(clientLabelForLogging,address,strategy,netconfClientDispatcher);
}
- public static NetconfClient clientFor(String clientLabelForLogging, InetSocketAddress address, ReconnectStrategy strategy, NetconfClientDispatcher netconfClientDispatcher,NetconfClientSessionListener listener) throws InterruptedException {
+ public static NetconfClient clientFor(String clientLabelForLogging, InetSocketAddress address,
+ ReconnectStrategy strategy, NetconfClientDispatcher netconfClientDispatcher, NetconfClientSessionListener listener) throws InterruptedException {
return new NetconfClient(clientLabelForLogging,address,strategy,netconfClientDispatcher,listener);
}
}
public Future<NetconfMessage> sendRequest(NetconfMessage message) {
- return sessionListener.sendRequest(message);
+ return ((SimpleNetconfClientSessionListener)sessionListener).sendRequest(message);
}
/**
final Stopwatch stopwatch = new Stopwatch().start();
try {
- return sessionListener.sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS);
+ return sendRequest(message).get(attempts * attemptMsDelay, TimeUnit.MILLISECONDS);
} finally {
stopwatch.stop();
logger.debug("Total time spent waiting for response from {}: {} ms", address, stopwatch.elapsed(TimeUnit.MILLISECONDS));
import java.io.Closeable;
import java.net.InetSocketAddress;
-import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
import org.opendaylight.protocol.framework.AbstractDispatcher;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetconfClientDispatcher extends AbstractDispatcher<NetconfClientSession, NetconfClientSessionListener> implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(NetconfClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(NetconfClientDispatcher.class);
private final NetconfClientSessionNegotiatorFactory negotatorFactory;
private final HashedWheelTimer timer;
});
}
- private static class ClientChannelInitializer extends AbstractChannelInitializer {
+ public Future<Void> createReconnectingClient(final InetSocketAddress address,
+ final NetconfClientSessionListener listener,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) {
+ final ClientChannelInitializer init = new ClientChannelInitializer(negotatorFactory, listener);
+
+ return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy,
+ new PipelineInitializer<NetconfClientSession>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfClientSession> promise) {
+ init.initialize(ch, promise);
+ }
+ });
+ }
+
+ private static class ClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
private final NetconfClientSessionListener sessionListener;
}
@Override
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise) {
- super.initialize(ch,promise);
- }
-
- @Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ protected void initializeAfterDecoder(SocketChannel ch, Promise<NetconfClientSession> promise) {
ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(
new SessionListenerFactory<NetconfClientSessionListener>() {
@Override
}
}, ch, promise));
}
-
}
+
@Override
public void close() {
try {
import java.util.Collection;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.protocol.framework.SessionListener;
+import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NetconfClientSession extends NetconfSession {
+public final class NetconfClientSession extends AbstractNetconfSession<NetconfClientSession, NetconfClientSessionListener> {
private static final Logger logger = LoggerFactory.getLogger(NetconfClientSession.class);
private final Collection<String> capabilities;
- public NetconfClientSession(SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener, Channel channel, long sessionId,
+ public NetconfClientSession(NetconfClientSessionListener sessionListener, Channel channel, long sessionId,
Collection<String> capabilities) {
super(sessionListener,channel,sessionId);
this.capabilities = capabilities;
return capabilities;
}
+ @Override
+ protected NetconfClientSession thisInstance() {
+ return this;
+ }
}
/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.netconf.client;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfSessionListener;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class NetconfClientSessionListener implements NetconfSessionListener<NetconfClientSession> {
- private static final class RequestEntry {
- final Promise<NetconfMessage> promise;
- final NetconfMessage request;
-
- public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
- this.promise = Preconditions.checkNotNull(future);
- this.request = Preconditions.checkNotNull(request);
- }
- }
-
- private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class);
-
- @GuardedBy("this")
- private final Queue<RequestEntry> requests = new ArrayDeque<>();
-
- @GuardedBy("this")
- private NetconfClientSession clientSession;
-
- @GuardedBy("this")
- private void dispatchRequest() {
- while (!requests.isEmpty()) {
- final RequestEntry e = requests.peek();
- if (e.promise.setUncancellable()) {
- logger.debug("Sending message {}", e.request);
- clientSession.sendMessage(e.request);
- break;
- }
-
- logger.debug("Message {} has been cancelled, skipping it", e.request);
- requests.poll();
- }
- }
-
- @Override
- public final synchronized void onSessionUp(NetconfClientSession clientSession) {
- this.clientSession = Preconditions.checkNotNull(clientSession);
- logger.debug("Client session {} went up", clientSession);
- dispatchRequest();
- }
-
- private synchronized void tearDown(final Exception cause) {
- final RequestEntry e = requests.poll();
- if (e != null) {
- e.promise.setFailure(cause);
- }
-
- this.clientSession = null;
- }
-
- @Override
- public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
- logger.debug("Client Session {} went down unexpectedly", clientSession, e);
- tearDown(e);
- }
-
- @Override
- public final void onSessionTerminated(NetconfClientSession clientSession,
- NetconfTerminationReason netconfTerminationReason) {
- logger.debug("Client Session {} terminated, reason: {}", clientSession,
- netconfTerminationReason.getErrorMessage());
- tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
- }
-
- @Override
- public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
- logger.debug("New message arrived: {}", message);
-
- final RequestEntry e = requests.poll();
- if (e != null) {
- e.promise.setSuccess(message);
- dispatchRequest();
- } else {
- logger.info("Ignoring unsolicited message {}", message);
- }
- }
-
- final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
- final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
- requests.add(req);
- if (clientSession != null) {
- dispatchRequest();
- }
+public interface NetconfClientSessionListener extends NetconfSessionListener<NetconfClientSession> {
- return req.promise;
- }
}
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
+
+import java.util.Collection;
+import java.util.List;
+
+import javax.annotation.Nullable;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.framework.SessionListener;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
-import javax.annotation.Nullable;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
public class NetconfClientSessionNegotiator extends
- AbstractNetconfSessionNegotiator<NetconfSessionPreferences, NetconfClientSession> {
+ AbstractNetconfSessionNegotiator<NetconfSessionPreferences, NetconfClientSession, NetconfClientSessionListener> {
protected NetconfClientSessionNegotiator(NetconfSessionPreferences sessionPreferences,
- Promise<NetconfClientSession> promise, Channel channel, Timer timer, SessionListener sessionListener,
+ Promise<NetconfClientSession> promise, Channel channel, Timer timer, NetconfClientSessionListener sessionListener,
long connectionTimeoutMillis) {
super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis);
}
}
@Override
- protected NetconfClientSession getSession(SessionListener sessionListener, Channel channel, NetconfMessage message) {
+ protected NetconfClientSession getSession(NetconfClientSessionListener sessionListener, Channel channel, NetconfMessage message) {
return new NetconfClientSession(sessionListener, channel, extractSessionId(message.getDocument()),
getCapabilities(message.getDocument()));
}
package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.io.InputStream;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.SessionNegotiatorFactory;
import org.xml.sax.SAXException;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory {
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
- private final Timer timer;
+public class NetconfClientSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfClientSession, NetconfClientSessionListener> {
private final Optional<String> additionalHeader;
private final long connectionTimeoutMillis;
+ private final Timer timer;
public NetconfClientSessionNegotiatorFactory(Timer timer, Optional<String> additionalHeader, long connectionTimeoutMillis) {
- this.timer = timer;
+ this.timer = Preconditions.checkNotNull(timer);
this.additionalHeader = additionalHeader;
this.connectionTimeoutMillis = connectionTimeoutMillis;
}
}
@Override
- public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel,
- Promise promise) {
+ public SessionNegotiator<NetconfClientSession> getSessionNegotiator(SessionListenerFactory<NetconfClientSessionListener> sessionListenerFactory, Channel channel,
+ Promise<NetconfClientSession> promise) {
// Hello message needs to be recreated every time
NetconfMessage helloMessage = loadHelloMessageTemplate();
if(this.additionalHeader.isPresent()) {
return new NetconfClientSessionNegotiator(proposal, promise, channel, timer,
sessionListenerFactory.getSessionListener(), connectionTimeoutMillis);
}
-
}
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
import org.opendaylight.controller.netconf.util.handler.ssh.SshHandler;
import org.opendaylight.controller.netconf.util.handler.ssh.authentication.AuthenticationHandler;
import org.opendaylight.controller.netconf.util.handler.ssh.client.Invoker;
import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
import org.opendaylight.protocol.framework.SessionListenerFactory;
import com.google.common.base.Optional;
});
}
- private static final class NetconfSshClientInitializer extends AbstractChannelInitializer {
+ @Override
+ public Future<Void> createReconnectingClient(final InetSocketAddress address,
+ final NetconfClientSessionListener listener,
+ final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy) {
+ final NetconfSshClientInitializer init = new NetconfSshClientInitializer(authHandler, negotatorFactory, listener);
+
+ return super.createReconnectingClient(address, connectStrategyFactory, reestablishStrategy,
+ new PipelineInitializer<NetconfClientSession>() {
+ @Override
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfClientSession> promise) {
+ init.initialize(ch, promise);
+ }
+ });
+ }
+
+ private static final class NetconfSshClientInitializer extends AbstractChannelInitializer<NetconfClientSession> {
private final AuthenticationHandler authenticationHandler;
private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
}
@Override
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ public void initialize(SocketChannel ch, Promise<NetconfClientSession> promise) {
try {
Invoker invoker = Invoker.subsystem("netconf");
ch.pipeline().addFirst(new SshHandler(authenticationHandler, invoker));
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ protected void initializeAfterDecoder(SocketChannel ch, Promise<NetconfClientSession> promise) {
ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(new SessionListenerFactory<NetconfClientSessionListener>() {
@Override
public NetconfClientSessionListener getSessionListener() {
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.client;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SimpleNetconfClientSessionListener implements NetconfClientSessionListener {
+ private static final class RequestEntry {
+ final Promise<NetconfMessage> promise;
+ final NetconfMessage request;
+
+ public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
+ this.promise = Preconditions.checkNotNull(future);
+ this.request = Preconditions.checkNotNull(request);
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(SimpleNetconfClientSessionListener.class);
+
+ @GuardedBy("this")
+ private final Queue<RequestEntry> requests = new ArrayDeque<>();
+
+ @GuardedBy("this")
+ private NetconfClientSession clientSession;
+
+ @GuardedBy("this")
+ private void dispatchRequest() {
+ while (!requests.isEmpty()) {
+ final RequestEntry e = requests.peek();
+ if (e.promise.setUncancellable()) {
+ logger.debug("Sending message {}", e.request);
+ clientSession.sendMessage(e.request);
+ break;
+ }
+
+ logger.debug("Message {} has been cancelled, skipping it", e.request);
+ requests.poll();
+ }
+ }
+
+ @Override
+ public final synchronized void onSessionUp(NetconfClientSession clientSession) {
+ this.clientSession = Preconditions.checkNotNull(clientSession);
+ logger.debug("Client session {} went up", clientSession);
+ dispatchRequest();
+ }
+
+ private synchronized void tearDown(final Exception cause) {
+ final RequestEntry e = requests.poll();
+ if (e != null) {
+ e.promise.setFailure(cause);
+ }
+
+ this.clientSession = null;
+ }
+
+ @Override
+ public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
+ logger.debug("Client Session {} went down unexpectedly", clientSession, e);
+ tearDown(e);
+ }
+
+ @Override
+ public final void onSessionTerminated(NetconfClientSession clientSession,
+ NetconfTerminationReason netconfTerminationReason) {
+ logger.debug("Client Session {} terminated, reason: {}", clientSession,
+ netconfTerminationReason.getErrorMessage());
+ tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
+ }
+
+ @Override
+ public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
+ logger.debug("New message arrived: {}", message);
+
+ final RequestEntry e = requests.poll();
+ if (e != null) {
+ e.promise.setSuccess(message);
+ dispatchRequest();
+ } else {
+ logger.info("Ignoring unsolicited message {}", message);
+ }
+ }
+
+ final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
+ final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
+
+ requests.add(req);
+ if (clientSession != null) {
+ dispatchRequest();
+ }
+
+ return req.promise;
+ }
+}
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Promise;
-import org.opendaylight.controller.netconf.api.NetconfSession;
+
+import java.net.InetSocketAddress;
+
import org.opendaylight.controller.netconf.impl.util.DeserializerExceptionHandler;
import org.opendaylight.controller.netconf.util.AbstractChannelInitializer;
import org.opendaylight.protocol.framework.AbstractDispatcher;
-import java.net.InetSocketAddress;
-
-public class NetconfServerDispatcher extends AbstractDispatcher<NetconfSession, NetconfServerSessionListener> {
+public class NetconfServerDispatcher extends AbstractDispatcher<NetconfServerSession, NetconfServerSessionListener> {
private final ServerChannelInitializer initializer;
public ChannelFuture createServer(InetSocketAddress address) {
- return super.createServer(address, new PipelineInitializer<NetconfSession>() {
+ return super.createServer(address, new PipelineInitializer<NetconfServerSession>() {
@Override
- public void initializeChannel(final SocketChannel ch, final Promise<NetconfSession> promise) {
+ public void initializeChannel(final SocketChannel ch, final Promise<NetconfServerSession> promise) {
initializer.initialize(ch, promise);
}
});
}
- public static class ServerChannelInitializer extends AbstractChannelInitializer {
+ public static class ServerChannelInitializer extends AbstractChannelInitializer<NetconfServerSession> {
private final NetconfServerSessionNegotiatorFactory negotiatorFactory;
private final NetconfServerSessionListenerFactory listenerFactory;
}
@Override
- protected void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise) {
+ protected void initializeAfterDecoder(SocketChannel ch, Promise<NetconfServerSession> promise) {
ch.pipeline().addLast("deserializerExHandler", new DeserializerExceptionHandler());
ch.pipeline().addLast("negotiator", negotiatorFactory.getSessionNegotiator(listenerFactory, ch, promise));
}
-
}
}
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
-import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
-import org.opendaylight.protocol.framework.SessionListener;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.DomainName;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.monitoring.extension.rev131210.NetconfTcp;
import com.google.common.base.Preconditions;
-public class NetconfServerSession extends NetconfSession implements NetconfManagementSession {
+public final class NetconfServerSession extends AbstractNetconfSession<NetconfServerSession, NetconfServerSessionListener> implements NetconfManagementSession {
private static final Logger logger = LoggerFactory.getLogger(NetconfServerSession.class);
private Date loginTime;
private long inRpcSuccess, inRpcFail, outRpcError;
- public NetconfServerSession(SessionListener<NetconfMessage, NetconfSession, NetconfTerminationReason> sessionListener, Channel channel, long sessionId,
+ public NetconfServerSession(NetconfServerSessionListener sessionListener, Channel channel, long sessionId,
NetconfServerSessionNegotiator.AdditionalHeader header) {
super(sessionListener, channel, sessionId);
this.header = header;
private Class<? extends Transport> getTransportForString(String transport) {
switch(transport) {
- case "ssh" : return NetconfSsh.class;
- case "tcp" : return NetconfTcp.class;
- default: throw new IllegalArgumentException("Unknown transport type " + transport);
+ case "ssh" : return NetconfSsh.class;
+ case "tcp" : return NetconfTcp.class;
+ default: throw new IllegalArgumentException("Unknown transport type " + transport);
}
}
return dateFormat.format(loginTime);
}
+ @Override
+ protected NetconfServerSession thisInstance() {
+ return this;
+ }
}
package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Optional;
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
+
+import java.net.InetSocketAddress;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences;
import org.opendaylight.controller.netconf.impl.util.AdditionalHeaderUtil;
import org.opendaylight.controller.netconf.util.AbstractNetconfSessionNegotiator;
-import org.opendaylight.protocol.framework.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
+import com.google.common.base.Optional;
public class NetconfServerSessionNegotiator extends
- AbstractNetconfSessionNegotiator<NetconfServerSessionPreferences, NetconfServerSession> {
+ AbstractNetconfSessionNegotiator<NetconfServerSessionPreferences, NetconfServerSession, NetconfServerSessionListener> {
static final Logger logger = LoggerFactory.getLogger(NetconfServerSessionNegotiator.class);
protected NetconfServerSessionNegotiator(NetconfServerSessionPreferences sessionPreferences,
- Promise<NetconfServerSession> promise, Channel channel, Timer timer, SessionListener sessionListener,
+ Promise<NetconfServerSession> promise, Channel channel, Timer timer, NetconfServerSessionListener sessionListener,
long connectionTimeoutMillis) {
super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis);
}
@Override
- protected NetconfServerSession getSession(SessionListener sessionListener, Channel channel, NetconfMessage message) {
+ protected NetconfServerSession getSession(NetconfServerSessionListener sessionListener, Channel channel, NetconfMessage message) {
Optional<String> additionalHeader = message.getAdditionalHeader();
AdditionalHeader parsedHeader;
package org.opendaylight.controller.netconf.impl;
-import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
+
+import java.io.InputStream;
+
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfServerSessionPreferences;
import org.opendaylight.controller.netconf.impl.mapping.CapabilityProvider;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import java.io.InputStream;
+import com.google.common.base.Preconditions;
-public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory {
+public class NetconfServerSessionNegotiatorFactory implements SessionNegotiatorFactory<NetconfMessage, NetconfServerSession, NetconfServerSessionListener> {
public static final String SERVER_HELLO_XML_LOCATION = "/server_hello.xml";
}
@Override
- public SessionNegotiator getSessionNegotiator(SessionListenerFactory sessionListenerFactory, Channel channel,
- Promise promise) {
+ public SessionNegotiator<NetconfServerSession> getSessionNegotiator(SessionListenerFactory<NetconfServerSessionListener> sessionListenerFactory, Channel channel,
+ Promise<NetconfServerSession> promise) {
long sessionId = idProvider.getNextSessionId();
NetconfServerSessionPreferences proposal = new NetconfServerSessionPreferences(createHelloMessage(sessionId),
}
private synchronized Document getHelloTemplateClone() {
- return (Document) this.helloMessageTemplate.cloneNode(true);
+ return (Document) helloMessageTemplate.cloneNode(true);
}
}
private class NetconfOperationExecution implements NetconfOperationFilterChain {
private final NetconfOperation operationWithHighestPriority;
- private NetconfOperationExecution(NetconfOperation operationWithHighestPriority) {
- this.operationWithHighestPriority = operationWithHighestPriority;
- }
-
public NetconfOperationExecution(TreeMap<HandlingPriority, Set<NetconfOperation>> sortedPriority,
HandlingPriority highestFoundPriority) {
operationWithHighestPriority = sortedPriority.get(highestFoundPriority).iterator().next();
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
-public abstract class AbstractChannelInitializer {
+public abstract class AbstractChannelInitializer<S extends NetconfSession> {
- public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise){
+ public void initialize(SocketChannel ch, Promise<S> promise){
ch.pipeline().addLast("aggregator", new NetconfMessageAggregator(FramingMechanism.EOM));
ch.pipeline().addLast(new NetconfXMLToMessageDecoder());
initializeAfterDecoder(ch, promise);
ch.pipeline().addLast(new NetconfMessageToXMLEncoder());
}
- protected abstract void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise);
+ protected abstract void initializeAfterDecoder(SocketChannel ch, Promise<S> promise);
}
package org.opendaylight.controller.netconf.util;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
+
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
-import org.opendaylight.protocol.framework.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
-public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends NetconfSession>
- extends AbstractSessionNegotiator<NetconfMessage, S> {
+public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
+extends AbstractSessionNegotiator<NetconfMessage, S> {
private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
protected final P sessionPreferences;
- private final SessionListener sessionListener;
+ private final L sessionListener;
private Timeout timeout;
/**
private final long connectionTimeoutMillis;
protected AbstractNetconfSessionNegotiator(P sessionPreferences, Promise<S> promise, Channel channel, Timer timer,
- SessionListener sessionListener, long connectionTimeoutMillis) {
+ L sessionListener, long connectionTimeoutMillis) {
super(promise, channel);
this.sessionPreferences = sessionPreferences;
this.timer = timer;
}
}
- protected abstract S getSession(SessionListener sessionListener, Channel channel, NetconfMessage message);
+ protected abstract S getSession(L sessionListener, Channel channel, NetconfMessage message);
private boolean isHelloMessage(Document doc) {
try {