BUG-5280: handle TransactionPurgeRequest replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractDataStoreClientBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.StampedLock;
20 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
21 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
22 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
23 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
24 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
31  * frontend.
32  *
33  * <p>
34  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
35  * Java world outside of actor containment are captured in {@link DataStoreClient}.
36  *
37  * <p>
38  * IMPORTANT: this class breaks actor containment via methods implementing {@link DataStoreClient} contract.
39  *            When touching internal state, be mindful of the execution context from which execution context, Actor
40  *            or POJO, is the state being accessed or modified.
41  *
42  * <p>
43  * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
44  *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
45  *                doubt, feel free to synchronize on this object.
46  *
47  * <p>
48  * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
49  *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
50  *              for correctness and performance impact.
51  *
52  * <p>
53  * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
54  *             for performing work and charging applications for it. That has two positive effects:
55  *             - CPU usage is distributed across applications, minimizing work done in the actor thread
56  *             - CPU usage provides back-pressure towards the application.
57  *
58  * @author Robert Varga
59  */
60 abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo>
61         implements DataStoreClient {
62     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
63
64     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
65     private final AtomicLong nextHistoryId = new AtomicLong(1);
66     private final StampedLock lock = new StampedLock();
67     private final SingleClientHistory singleHistory;
68
69     private volatile Throwable aborted;
70
71     AbstractDataStoreClientBehavior(final ClientActorContext context,
72             final BackendInfoResolver<ShardBackendInfo> resolver) {
73         super(context, resolver);
74         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
75     }
76
77     //
78     //
79     // Methods below are invoked from the client actor thread
80     //
81     //
82
83     @Override
84     protected final void haltClient(final Throwable cause) {
85         // If we have encountered a previous problem there is no cleanup necessary, as we have already cleaned up
86         // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
87         // thread.
88         if (aborted != null) {
89             abortOperations(cause);
90         }
91     }
92
93     private void abortOperations(final Throwable cause) {
94         final long stamp = lock.writeLock();
95         try {
96             // This acts as a barrier, application threads check this after they have added an entry in the maps,
97             // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
98             aborted = cause;
99
100             for (ClientLocalHistory h : histories.values()) {
101                 h.localAbort(cause);
102             }
103             histories.clear();
104         } finally {
105             lock.unlockWrite(stamp);
106         }
107     }
108
109     private AbstractDataStoreClientBehavior shutdown(final ClientActorBehavior<ShardBackendInfo> currentBehavior) {
110         abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
111         return null;
112     }
113
114     @Override
115     protected final AbstractDataStoreClientBehavior onCommand(final Object command) {
116         if (command instanceof GetClientRequest) {
117             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
118         } else {
119             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
120         }
121
122         return this;
123     }
124
125     /*
126      * The connection has resolved, which means we have to potentially perform message adaptation. This is a bit more
127      * involved, as the messages need to be replayed to the individual proxies.
128      */
129     @Override
130     protected final ConnectionConnectCohort connectionUp(final ConnectedClientConnection<ShardBackendInfo> newConn) {
131         final long stamp = lock.writeLock();
132
133         // Step 1: Freeze all AbstractProxyHistory instances pointing to that shard. This indirectly means that no
134         //         further TransactionProxies can be created and we can safely traverse maps without risking
135         //         missing an entry
136         final Collection<HistoryReconnectCohort> cohorts = new ArrayList<>();
137         startReconnect(singleHistory, newConn, cohorts);
138         for (ClientLocalHistory h : histories.values()) {
139             startReconnect(h, newConn, cohorts);
140         }
141
142         return previousEntries -> {
143             try {
144                 // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
145                 //         the non-throttling interface to the connection, hence we use a wrapper consumer
146                 for (HistoryReconnectCohort c : cohorts) {
147                     c.replaySuccessfulRequests(previousEntries);
148                 }
149
150                 // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
151                 //         requests will be immediately sent to it and requests being sent concurrently will get
152                 //         forwarded once they hit the new connection.
153                 return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
154             } finally {
155                 try {
156                     // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
157                     for (HistoryReconnectCohort c : cohorts) {
158                         c.close();
159                     }
160                 } finally {
161                     lock.unlockWrite(stamp);
162                 }
163             }
164         };
165     }
166
167     private static void startReconnect(final AbstractClientHistory history,
168             final ConnectedClientConnection<ShardBackendInfo> newConn,
169             final Collection<HistoryReconnectCohort> cohorts) {
170         final HistoryReconnectCohort cohort = history.startReconnect(newConn);
171         if (cohort != null) {
172             cohorts.add(cohort);
173         }
174     }
175
176     //
177     //
178     // Methods below are invoked from application threads
179     //
180     //
181
182     @Override
183     public final ClientLocalHistory createLocalHistory() {
184         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
185             nextHistoryId.getAndIncrement());
186
187         final long stamp = lock.readLock();
188         try {
189             if (aborted != null) {
190                 throw Throwables.propagate(aborted);
191             }
192
193             final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
194             LOG.debug("{}: creating a new local history {}", persistenceId(), history);
195
196             Verify.verify(histories.put(historyId, history) == null);
197             return history;
198         } finally {
199             lock.unlockRead(stamp);
200         }
201     }
202
203     @Override
204     public final ClientTransaction createTransaction() {
205         return singleHistory.createTransaction();
206     }
207
208     @Override
209     public final ClientSnapshot createSnapshot() {
210         return singleHistory.doCreateSnapshot();
211     }
212
213     @Override
214     public final void close() {
215         context().executeInActor(this::shutdown);
216     }
217
218     abstract Long resolveShardForPath(YangInstanceIdentifier path);
219 }