2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Collections2;
12 import com.google.common.collect.Maps;
13 import java.util.Collection;
15 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
16 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
17 import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
18 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
19 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
20 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 // Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor
28 final class BouncingReconnectForwarder extends ReconnectForwarder {
29 private static final class CohortNotFoundException extends RequestException {
30 private static final long serialVersionUID = 1L;
32 CohortNotFoundException(final LocalHistoryIdentifier historyId) {
33 super("Cohort for " + historyId + " not found");
37 public boolean isRetriable() {
42 private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
44 private final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts;
46 private BouncingReconnectForwarder(final ConnectedClientConnection<?> successor,
47 final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts) {
49 this.cohorts = Preconditions.checkNotNull(cohorts);
52 static ReconnectForwarder forCohorts(final ConnectedClientConnection<?> successor,
53 final Collection<HistoryReconnectCohort> cohorts) {
54 return new BouncingReconnectForwarder(successor, Maps.uniqueIndex(Collections2.transform(cohorts,
55 HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
59 protected void forwardEntry(final ConnectionEntry entry, final long now) {
61 findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
62 } catch (RequestException e) {
63 entry.complete(entry.getRequest().toRequestFailure(e));
68 protected void replayEntry(final ConnectionEntry entry, final long now) {
70 findCohort(entry).replayEntry(entry, this::replayToSuccessor);
71 } catch (RequestException e) {
72 entry.complete(entry.getRequest().toRequestFailure(e));
76 private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
77 final Request<? , ?> request = entry.getRequest();
79 final LocalHistoryIdentifier historyId;
80 if (request instanceof TransactionRequest) {
81 historyId = ((TransactionRequest<?>) request).getTarget().getHistoryId();
82 } else if (request instanceof LocalHistoryRequest) {
83 historyId = ((LocalHistoryRequest<?>) request).getTarget();
85 throw new IllegalArgumentException("Unhandled request " + request);
88 final ProxyReconnectCohort cohort = cohorts.get(historyId);
90 LOG.warn("Cohort for request {} not found, aborting it", request);
91 throw new CohortNotFoundException(historyId);