c6564b66fbf507a9f8d9c0d778284e18853c1527
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / BouncingReconnectForwarder.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Collections2;
12 import com.google.common.collect.Maps;
13 import java.util.Collection;
14 import java.util.Map;
15 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
16 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
17 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
18 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
19 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 // Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor
27 // connection
28 final class BouncingReconnectForwarder extends ReconnectForwarder {
29     private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
30
31     private static final RequestException FAILED_TO_REPLAY_EXCEPTION = new RequestException("Cohort not found") {
32         private static final long serialVersionUID = 1L;
33
34         @Override
35         public boolean isRetriable() {
36             return false;
37         }
38     };
39
40     private final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts;
41
42     private BouncingReconnectForwarder(final ConnectedClientConnection<?> successor,
43             final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts) {
44         super(successor);
45         this.cohorts = Preconditions.checkNotNull(cohorts);
46     }
47
48     static ReconnectForwarder forCohorts(final ConnectedClientConnection<?> successor,
49             final Collection<HistoryReconnectCohort> cohorts) {
50         return new BouncingReconnectForwarder(successor, Maps.uniqueIndex(Collections2.transform(cohorts,
51             HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
52     }
53
54
55     @Override
56     protected void forwardEntry(final ConnectionEntry entry) {
57         final Request<? , ?> request = entry.getRequest();
58
59         final LocalHistoryIdentifier historyId;
60         if (request instanceof TransactionRequest) {
61             historyId = ((TransactionRequest<?>) request).getTarget().getHistoryId();
62         } else if (request instanceof LocalHistoryRequest) {
63             historyId = ((LocalHistoryRequest<?>) request).getTarget();
64         } else {
65             throw new IllegalArgumentException("Unhandled request " + request);
66         }
67
68         try {
69             final ProxyReconnectCohort cohort = cohorts.get(historyId);
70             if (cohort == null) {
71                 LOG.warn("Cohort for request {} not found, aborting it", request);
72                 throw FAILED_TO_REPLAY_EXCEPTION;
73             }
74
75             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
76             //        period required to get into the queue.
77             cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
78         } catch (RequestException e) {
79             entry.complete(request.toRequestFailure(e));
80         }
81     }
82 }