BUG-8445: check sessionId before propagating failures
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / AbstractReceivingClientConnection.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import com.google.common.base.MoreObjects.ToStringHelper;
11 import com.google.common.base.Preconditions;
12 import java.util.Optional;
13 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 /**
18  * Implementation-internal intermediate subclass between {@link AbstractClientConnection} and two-out of three of its
19  * subclasses. It allows us to share some code.
20  *
21  * @author Robert Varga
22  *
23  * @param <T> Concrete {@link BackendInfo} type
24  */
25 abstract class AbstractReceivingClientConnection<T extends BackendInfo> extends AbstractClientConnection<T> {
26     private static final Logger LOG = LoggerFactory.getLogger(AbstractReceivingClientConnection.class);
27
28     /**
29      * Multiplication factor applied to remote's advertised limit on outstanding messages. Our default strategy
30      * rate-limiting strategy in {@link AveragingProgressTracker} does not penalize threads as long as we have not
31      * reached half of the target.
32      *
33      * <p>
34      * By multiplying the advertised maximum by four, our queue steady-state should end up with:
35      * - the backend pipeline being full,
36      * - another full batch of messages being in the queue while not paying any throttling cost
37      * - another 2 full batches of messages with incremental throttling cost
38      */
39     private static final int MESSAGE_QUEUE_FACTOR = 4;
40
41     private final T backend;
42
43     AbstractReceivingClientConnection(final ClientActorContext context, final Long cookie, final T backend) {
44         super(context, cookie, new TransmitQueue.Transmitting(targetQueueSize(backend), backend));
45         this.backend = Preconditions.checkNotNull(backend);
46     }
47
48     AbstractReceivingClientConnection(final AbstractReceivingClientConnection<T> oldConnection) {
49         super(oldConnection, targetQueueSize(oldConnection.backend));
50         this.backend = oldConnection.backend;
51     }
52
53     private static int targetQueueSize(final BackendInfo backend) {
54         return backend.getMaxMessages() * MESSAGE_QUEUE_FACTOR;
55     }
56
57     @Override
58     public final Optional<T> getBackendInfo() {
59         return Optional.of(backend);
60     }
61
62     @Override
63     final void receiveResponse(final ResponseEnvelope<?> envelope) {
64         if (envelope.getSessionId() != backend.getSessionId()) {
65             LOG.debug("Response {} does not match session ID {}, ignoring it", envelope, backend.getSessionId());
66         } else {
67             super.receiveResponse(envelope);
68         }
69     }
70
71     final T backend() {
72         return backend;
73     }
74
75     @Override
76     ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
77         return super.addToStringAttributes(toStringHelper).add("backend", backend);
78     }
79 }