BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicLong;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
21 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
22 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
23 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
32  * frontend.
33  *
34  * <p>
35  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
36  * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
37  *
38  * <p>
39  * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
40  *            When touching internal state, be mindful of the execution context from which execution context, Actor
41  *            or POJO, is the state being accessed or modified.
42  *
43  * <p>
44  * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
45  *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
46  *                doubt, feel free to synchronize on this object.
47  *
48  * <p>
49  * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
50  *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
51  *              for correctness and performance impact.
52  *
53  * <p>
54  * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
55  *             for performing work and charging applications for it. That has two positive effects:
56  *             - CPU usage is distributed across applications, minimizing work done in the actor thread
57  *             - CPU usage provides back-pressure towards the application.
58  *
59  * @author Robert Varga
60  */
61 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
62         implements DistributedDataStoreClient {
63     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
64
65     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
66     private final AtomicLong nextHistoryId = new AtomicLong(1);
67     private final SingleClientHistory singleHistory;
68
69     private volatile Throwable aborted;
70
71     AbstractDataStoreClientBehavior(final ClientActorContext context,
72             final BackendInfoResolver<ShardBackendInfo> resolver) {
73         super(context, resolver);
74         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
75     }
76
77     //
78     //
79     // Methods below are invoked from the client actor thread
80     //
81     //
82
83     @Override
84     protected final void haltClient(final Throwable cause) {
85         // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
86         // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
87         // thread.
88         if (aborted != null) {
89             abortOperations(cause);
90         }
91     }
92
93     private void abortOperations(final Throwable cause) {
94         // This acts as a barrier, application threads check this after they have added an entry in the maps,
95         // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
96         aborted = cause;
97
98         for (ClientLocalHistory h : histories.values()) {
99             h.localAbort(cause);
100         }
101         histories.clear();
102     }
103
104     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
105         abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
106         return null;
107     }
108
109     @Override
110     protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
111         if (command instanceof GetClientRequest) {
112             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
113         } else {
114             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
115         }
116
117         return this;
118     }
119
120     /*
121      * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
122      * involved, as the messages need to be replayed to the individual proxies.
123      */
124     @Override
125     @GuardedBy("connectionsLock")
126     protected final ConnectedClientConnection<ShardBackendInfo> connectionUp(
127             final AbstractClientConnection<ShardBackendInfo> conn, final ShardBackendInfo backend) {
128
129         // Step 0: create a new connected connection
130         final ConnectedClientConnection<ShardBackendInfo> newConn = new ConnectedClientConnection<>(conn.context(),
131                 conn.cookie(), backend);
132
133         LOG.debug("{}: resolving connection {} to {}", persistenceId(), conn, newConn);
134
135         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
136         try {
137             // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
138             //         further TransactionProxies can be created and we can safely traverse maps without risking
139             //         missing an entry
140             startReconnect(singleHistory, newConn, cohorts);
141             for (ClientLocalHistory h : histories.values()) {
142                 startReconnect(h, newConn, cohorts);
143             }
144
145             // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
146             //         the non-throttling interface to the connection, hence we use a wrapper consumer
147             for (HistoryReconnectCohort c : cohorts) {
148                 c.replaySuccessfulRequests();
149             }
150
151             // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
152             //         requests will be immediately sent to it and requests being sent concurrently will get forwarded
153             //         once they hit the new connection.
154             conn.setForwarder(BouncingReconnectForwarder.forCohorts(newConn, cohorts));
155         } finally {
156             // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
157             for (HistoryReconnectCohort c : cohorts) {
158                 c.close();
159             }
160         }
161
162         return newConn;
163     }
164
165     private static void startReconnect(final AbstractClientHistory history,
166             final ConnectedClientConnection<ShardBackendInfo> newConn,
167             final Collection<HistoryReconnectCohort> cohorts) {
168         final HistoryReconnectCohort cohort = history.startReconnect(newConn);
169         if (cohort != null) {
170             cohorts.add(cohort);
171         }
172     }
173
174     //
175     //
176     // Methods below are invoked from application threads
177     //
178     //
179
180     @Override
181     public final ClientLocalHistory createLocalHistory() {
182         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
183             nextHistoryId.getAndIncrement());
184         final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
185         LOG.debug("{}: creating a new local history {}", persistenceId(), history);
186
187         Verify.verify(histories.put(historyId, history) == null);
188
189         final Throwable a = aborted;
190         if (a != null) {
191             history.localAbort(a);
192             histories.remove(historyId, history);
193             throw Throwables.propagate(a);
194         }
195
196         return history;
197     }
198
199     @Override
200     public final ClientTransaction createTransaction() {
201         return singleHistory.createTransaction();
202     }
203
204     @Override
205     public final void close() {
206         context().executeInActor(this::shutdown);
207     }
208
209     abstract Long resolveShardForPath(final YangInstanceIdentifier path);
210 }