2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import com.google.common.base.MoreObjects.ToStringHelper;
11 import java.util.Optional;
12 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
17 * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
18 * subclasses. It allows us to share some code.
20 * @author Robert Varga
22 * @param <T> Concrete {@link BackendInfo} type
24 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
25 private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
28 * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
29 * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
30 * reached half of the target.
33 * By multiplying the advertised maximum by four, our queue steady-state should end up with:
34 * - the backend pipeline being full,
35 * - another full batch of messages being in the queue while not paying any throttling cost
36 * - another 2 full batches of messages with incremental throttling cost
38 private static final int MESSAGE_QUEUE_FACTOR = 4;
40 private final T backend;
42 // To be called by ConnectedClientConnection only.
43 AbstractReceivingClientConnection(final AbstractClientConnection<T> oldConnection, final T newBackend) {
44 super(oldConnection, newBackend, targetQueueSize(newBackend));
45 this.backend = newBackend;
48 // To be called by ReconnectingClientConnection only.
49 AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
51 this.backend = oldConnection.backend;
54 private static int targetQueueSize(final BackendInfo backend) {
55 return backend.getMaxMessages() * MESSAGE_QUEUE_FACTOR;
59 public final Optional<T> getBackendInfo() {
60 return Optional.of(backend);
64 final void receiveResponse(final ResponseEnvelope<?> envelope) {
65 if (envelope.getSessionId() != backend.getSessionId()) {
66 LOG.debug("Response {} does not match session ID {}, ignoring it", envelope, backend.getSessionId());
68 super.receiveResponse(envelope);
77 ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
78 return super.addToStringAttributes(toStringHelper).add("backend", backend);