package org.opendaylight.controller.netconf.client;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.Promise;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import javax.annotation.concurrent.GuardedBy;
+
import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.protocol.framework.SessionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
+
+public class NetconfClientSessionListener implements NetconfSessionListener<NetconfClientSession> {
+ private static final class RequestEntry {
+ final Promise<NetconfMessage> promise;
+ final NetconfMessage request;
-public class NetconfClientSessionListener implements
- SessionListener<NetconfMessage, NetconfClientSession, NetconfTerminationReason> {
+ public RequestEntry(Promise<NetconfMessage> future, NetconfMessage request) {
+ this.promise = Preconditions.checkNotNull(future);
+ this.request = Preconditions.checkNotNull(request);
+ }
+ }
private static final Logger logger = LoggerFactory.getLogger(NetconfClientSessionListener.class);
- private AtomicBoolean up = new AtomicBoolean(false);
+
+ @GuardedBy("this")
+ private final Queue<RequestEntry> requests = new ArrayDeque<>();
+
+ @GuardedBy("this")
+ private NetconfClientSession clientSession;
+
+ @GuardedBy("this")
+ private void dispatchRequest() {
+ while (!requests.isEmpty()) {
+ final RequestEntry e = requests.peek();
+ if (e.promise.setUncancellable()) {
+ logger.debug("Sending message {}", e.request);
+ clientSession.sendMessage(e.request);
+ break;
+ }
+
+ logger.debug("Message {} has been cancelled, skipping it", e.request);
+ requests.poll();
+ }
+ }
@Override
- public void onSessionUp(NetconfClientSession clientSession) {
- up.set(true);
+ public final synchronized void onSessionUp(NetconfClientSession clientSession) {
+ this.clientSession = Preconditions.checkNotNull(clientSession);
+ logger.debug("Client session {} went up", clientSession);
+ dispatchRequest();
+ }
+
+ private synchronized void tearDown(final Exception cause) {
+ final RequestEntry e = requests.poll();
+ if (e != null) {
+ e.promise.setFailure(cause);
+ }
+
+ this.clientSession = null;
}
@Override
- public void onSessionDown(NetconfClientSession clientSession, Exception e) {
- logger.debug("Client Session {} down, reason: {}", clientSession, e.getMessage());
- up.set(false);
+ public final void onSessionDown(NetconfClientSession clientSession, Exception e) {
+ logger.debug("Client Session {} went down unexpectedly", clientSession, e);
+ tearDown(e);
}
@Override
- public void onSessionTerminated(NetconfClientSession clientSession,
+ public final void onSessionTerminated(NetconfClientSession clientSession,
NetconfTerminationReason netconfTerminationReason) {
logger.debug("Client Session {} terminated, reason: {}", clientSession,
netconfTerminationReason.getErrorMessage());
- up.set(false);
+ tearDown(new RuntimeException(netconfTerminationReason.getErrorMessage()));
}
@Override
public synchronized void onMessage(NetconfClientSession session, NetconfMessage message) {
- synchronized (messages) {
- this.messages.add(message);
+ logger.debug("New message arrived: {}", message);
+
+ final RequestEntry e = requests.poll();
+ if (e != null) {
+ e.promise.setSuccess(message);
+ dispatchRequest();
+ } else {
+ logger.info("Ignoring unsolicited message {}", message);
}
}
- private int lastReadMessage = -1;
- private List<NetconfMessage> messages = Lists.newArrayList();
-
- public NetconfMessage getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
- Preconditions.checkState(up.get(), "Session was not up yet");
-
- for (int i = 0; i < attempts; i++) {
- synchronized (messages) {
- if (messages.size() - 1 > lastReadMessage) {
- lastReadMessage++;
- return messages.get(lastReadMessage);
- }
- }
+ final synchronized Future<NetconfMessage> sendRequest(NetconfMessage message) {
+ final RequestEntry req = new RequestEntry(GlobalEventExecutor.INSTANCE.<NetconfMessage>newPromise(), message);
- if (up.get() == false)
- throw new IllegalStateException("Session ended while trying to read message");
- Thread.sleep(attemptMsDelay);
+ requests.add(req);
+ if (clientSession != null) {
+ dispatchRequest();
}
- throw new IllegalStateException("No netconf message to read");
+ return req.promise;
}
}