X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fclient%2FNetconfClientSessionListener.java;h=1ac2e7e26462c84a239757f93ee6b2d926f53e46;hp=d3c1b22c845a9c0a30888cbd933e935322cfedd8;hb=8ffb3e08bfa609405e883444480642b93c83242a;hpb=fdde6f06dc64a4bde7593625fa538ec3241fce37 diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java index d3c1b22c84..1ac2e7e264 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSessionListener.java @@ -8,68 +8,108 @@ package org.opendaylight.controller.netconf.client; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.Promise; + +import java.util.ArrayDeque; +import java.util.Queue; + +import javax.annotation.concurrent.GuardedBy; + import org.opendaylight.controller.netconf.api.NetconfMessage; +import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfTerminationReason; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Preconditions; + +public class NetconfClientSessionListener implements NetconfSessionListener { + private static final class RequestEntry { + final Promise promise; + final NetconfMessage request; -public class NetconfClientSessionListener implements - SessionListener { + public RequestEntry(Promise future, NetconfMessage request) { + this.promise = Preconditions.checkNotNull(future); + this.request = Preconditions.checkNotNull(request); + } + } private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class); - private AtomicBoolean up = new AtomicBoolean(false); + + @GuardedBy("this") + private final Queue requests = new ArrayDeque<>(); + + @GuardedBy("this") + private NetconfClientSession clientSession; + + @GuardedBy("this") + private void dispatchRequest() { + while (!requests.isEmpty()) { + final RequestEntry e = requests.peek(); + if (e.promise.setUncancellable()) { + logger.debug("Sending message {}", e.request); + clientSession.sendMessage(e.request); + break; + } + + logger.debug("Message {} has been cancelled, skipping it", e.request); + requests.poll(); + } + } @Override - public void onSessionUp(NetconfClientSession clientSession) { - up.set(true); + public final synchronized void onSessionUp(NetconfClientSession clientSession) { + this.clientSession = Preconditions.checkNotNull(clientSession); + logger.debug("Client session {} went up", clientSession); + dispatchRequest(); + } + + private synchronized void tearDown(final Exception cause) { + final RequestEntry e = requests.poll(); + if (e != null) { + e.promise.setFailure(cause); + } + + this.clientSession = null; } @Override - public void onSessionDown(NetconfClientSession clientSession, Exception e) { - logger.debug("Client Session {} down, reason: {}", clientSession, e.getMessage()); - up.set(false); + public final void onSessionDown(NetconfClientSession clientSession, Exception e) { + logger.debug("Client Session {} went down unexpectedly", clientSession, e); + tearDown(e); } @Override - public void onSessionTerminated(NetconfClientSession clientSession, + public final void onSessionTerminated(NetconfClientSession clientSession, NetconfTerminationReason netconfTerminationReason) { logger.debug("Client Session {} terminated, reason: {}", clientSession, netconfTerminationReason.getErrorMessage()); - up.set(false); + tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage())); } @Override public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) { - synchronized (messages) { - this.messages.add(message); + logger.debug("New message arrived: {}", message); + + final RequestEntry e = requests.poll(); + if (e != null) { + e.promise.setSuccess(message); + dispatchRequest(); + } else { + logger.info("Ignoring unsolicited message {}", message); } } - private int lastReadMessage = -1; - private List messages = Lists.newArrayList(); - - public NetconfMessage getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException { - Preconditions.checkState(up.get(), "Session was not up yet"); - - for (int i = 0; i < attempts; i++) { - synchronized (messages) { - if (messages.size() - 1 > lastReadMessage) { - lastReadMessage++; - return messages.get(lastReadMessage); - } - } + final synchronized Future sendRequest(NetconfMessage message) { + final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.newPromise(), message); - if (up.get() == false) - throw new IllegalStateException("Session ended while trying to read message"); - Thread.sleep(attemptMsDelay); + requests.add(req); + if (clientSession != null) { + dispatchRequest(); } - throw new IllegalStateException("No netconf message to read"); + return req.promise; } }