Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / BouncingReconnectForwarder.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.Collections2;
13 import com.google.common.collect.Maps;
14 import java.util.Collection;
15 import java.util.Map;
16 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
17 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
18 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
19 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
20 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 // Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor
28 // connection
29 final class BouncingReconnectForwarder extends ReconnectForwarder {
30     private static final class CohortNotFoundException extends RequestException {
31         private static final long serialVersionUID = 1L;
32
33         CohortNotFoundException(final LocalHistoryIdentifier historyId) {
34             super("Cohort for " + historyId + " not found");
35         }
36
37         @Override
38         public boolean isRetriable() {
39             return false;
40         }
41     }
42
43     private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
44
45     private final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts;
46
47     private BouncingReconnectForwarder(final ConnectedClientConnection<?> successor,
48             final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts) {
49         super(successor);
50         this.cohorts = requireNonNull(cohorts);
51     }
52
53     static ReconnectForwarder forCohorts(final ConnectedClientConnection<?> successor,
54             final Collection<HistoryReconnectCohort> cohorts) {
55         return new BouncingReconnectForwarder(successor, Maps.uniqueIndex(Collections2.transform(cohorts,
56             HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
57     }
58
59     @Override
60     protected void forwardEntry(final ConnectionEntry entry, final long now) {
61         try {
62             findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
63         } catch (RequestException e) {
64             entry.complete(entry.getRequest().toRequestFailure(e));
65         }
66     }
67
68     @Override
69     protected void replayEntry(final ConnectionEntry entry, final long now) {
70         try {
71             findCohort(entry).replayEntry(entry, this::replayToSuccessor);
72         } catch (RequestException e) {
73             entry.complete(entry.getRequest().toRequestFailure(e));
74         }
75     }
76
77     private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
78         final Request<? , ?> request = entry.getRequest();
79
80         final LocalHistoryIdentifier historyId;
81         if (request instanceof TransactionRequest) {
82             historyId = ((TransactionRequest<?>) request).getTarget().getHistoryId();
83         } else if (request instanceof LocalHistoryRequest) {
84             historyId = ((LocalHistoryRequest<?>) request).getTarget();
85         } else {
86             throw new IllegalArgumentException("Unhandled request " + request);
87         }
88
89         final ProxyReconnectCohort cohort = cohorts.get(historyId);
90         if (cohort == null) {
91             LOG.warn("Cohort for request {} not found, aborting it", request);
92             throw new CohortNotFoundException(historyId);
93         }
94
95         return cohort;
96     }
97 }