import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
* @param callback Callback to be invoked
* @return Optional duration with semantics described above.
*/
- @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
+ @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
+ final RequestCallback callback) {
+ checkNotClosed();
+
final long now = ticker.read();
- final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
+ final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
- // We could have check first, but argument checking needs to happen first
- checkNotClosed();
queue.add(e);
LOG.debug("Enqueued request {} to queue {}", request, this);
}
}
- ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
+ ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
// Responses to different targets may arrive out of order, hence we use an iterator
final Iterator<SequencedQueueEntry> it = queue.iterator();
while (it.hasNext()) {
lastProgress = ticker.read();
it.remove();
LOG.debug("Completing request {} with {}", e, response);
- return e.complete(response);
+ return e.complete(response.getMessage());
}
}