2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import com.google.common.base.MoreObjects.ToStringHelper;
11 import com.google.common.base.Preconditions;
12 import java.util.Optional;
13 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
18 * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
19 * subclasses. It allows us to share some code.
21 * @author Robert Varga
23 * @param <T> Concrete {@link BackendInfo} type
25 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
26 private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
29 * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
30 * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
31 * reached half of the target.
34 * By multiplying the advertised maximum by four, our queue steady-state should end up with:
35 * - the backend pipeline being full,
36 * - another full batch of messages being in the queue while not paying any throttling cost
37 * - another 2 full batches of messages with incremental throttling cost
39 private static final int MESSAGE_QUEUE_FACTOR = 4;
41 private final T backend;
43 AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
44 super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend));
45 this.backend = Preconditions.checkNotNull(backend);
48 AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
49 super(oldConnection, targetQueueSize(oldConnection.backend));
50 this.backend = oldConnection.backend;
53 private static int targetQueueSize(final BackendInfo backend) {
54 return backend.getMaxMessages() * MESSAGE_QUEUE_FACTOR;
58 public final Optional<T> getBackendInfo() {
59 return Optional.of(backend);
63 final void receiveResponse(final ResponseEnvelope<?> envelope) {
64 if (envelope.getSessionId() != backend.getSessionId()) {
65 LOG.debug("Response {} does not match session ID {}, ignoring it", envelope, backend.getSessionId());
67 super.receiveResponse(envelope);
76 ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
77 return super.addToStringAttributes(toStringHelper).add("backend", backend);