BUG-5280: add READY protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.Request;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Per-connection representation of a local history. This class handles state replication across a single connection.
42  *
43  * @author Robert Varga
44  */
45 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
46     private abstract static class AbstractLocal extends ProxyHistory {
47         private final DataTree dataTree;
48
49         AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
50             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
51             super(connection, identifier);
52             this.dataTree = Preconditions.checkNotNull(dataTree);
53         }
54
55         final DataTreeSnapshot takeSnapshot() {
56             return dataTree.takeSnapshot();
57         }
58     }
59
60     private abstract static class AbstractRemote extends ProxyHistory {
61         AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
62             final LocalHistoryIdentifier identifier) {
63             super(connection, identifier);
64         }
65     }
66
67     private static final class Local extends AbstractLocal {
68         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
69                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
70
71         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
72         // the open one and attempts to create a new transaction again.
73         private LocalReadWriteProxyTransaction lastOpen;
74
75         private volatile LocalReadWriteProxyTransaction lastSealed;
76
77         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
78             final DataTree dataTree) {
79             super(connection, identifier, dataTree);
80         }
81
82         @Override
83         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
84                 final TransactionIdentifier txId, final boolean snapshotOnly) {
85             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
86
87             // onTransactionCompleted() runs concurrently
88             final LocalReadWriteProxyTransaction localSealed = lastSealed;
89             final DataTreeSnapshot baseSnapshot;
90             if (localSealed != null) {
91                 baseSnapshot = localSealed.getSnapshot();
92             } else {
93                 baseSnapshot = takeSnapshot();
94             }
95
96             if (snapshotOnly) {
97                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
98             }
99
100             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
101             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
102             return lastOpen;
103         }
104
105         @Override
106         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
107             return createClient(connection, getIdentifier());
108         }
109
110         @Override
111         void onTransactionAborted(final AbstractProxyTransaction tx) {
112             if (tx.equals(lastOpen)) {
113                 lastOpen = null;
114             }
115         }
116
117         @Override
118         void onTransactionCompleted(final AbstractProxyTransaction tx) {
119             Verify.verify(tx instanceof LocalProxyTransaction);
120             if (tx instanceof LocalReadWriteProxyTransaction) {
121                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
122                     LOG.debug("Completed last sealed transaction {}", tx);
123                 }
124             }
125         }
126
127         @Override
128         void onTransactionSealed(final AbstractProxyTransaction tx) {
129             Preconditions.checkState(tx.equals(lastOpen));
130             lastSealed = lastOpen;
131             lastOpen = null;
132         }
133     }
134
135     private static final class LocalSingle extends AbstractLocal {
136         LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
137             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
138             super(connection, identifier, dataTree);
139         }
140
141         @Override
142         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
143                 final TransactionIdentifier txId, final boolean snapshotOnly) {
144             final DataTreeSnapshot snapshot = takeSnapshot();
145             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
146                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
147         }
148
149         @Override
150         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
151             return createSingle(connection, getIdentifier());
152         }
153     }
154
155     private static final class Remote extends AbstractRemote {
156         Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
157             super(connection, identifier);
158         }
159
160         @Override
161         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
162                 final TransactionIdentifier txId, final boolean snapshotOnly) {
163             return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
164         }
165
166         @Override
167         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
168             return createClient(connection, getIdentifier());
169         }
170     }
171
172     private static final class RemoteSingle extends AbstractRemote {
173         RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
174             final LocalHistoryIdentifier identifier) {
175             super(connection, identifier);
176         }
177
178         @Override
179         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
180                 final TransactionIdentifier txId, final boolean snapshotOnly) {
181             return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
182         }
183
184         @Override
185         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
186             return createSingle(connection, getIdentifier());
187         }
188     }
189
190     private static final class RequestReplayException extends RequestException {
191         private static final long serialVersionUID = 1L;
192
193         RequestReplayException(final String format, final Object... args) {
194             super(String.format(format, args));
195         }
196
197         @Override
198         public boolean isRetriable() {
199             return false;
200         }
201     }
202
203     private final class ReconnectCohort extends ProxyReconnectCohort {
204         @Override
205         public LocalHistoryIdentifier getIdentifier() {
206             return identifier;
207         }
208
209         @GuardedBy("lock")
210         @Override
211         void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
212             // First look for our Create message
213             for (ConnectionEntry e : previousEntries) {
214                 final Request<?, ?> req = e.getRequest();
215                 if (identifier.equals(req.getTarget())) {
216                     Verify.verify(req instanceof LocalHistoryRequest);
217                     if (req instanceof CreateLocalHistoryRequest) {
218                         successor.connection.sendRequest(req, e.getCallback());
219                         break;
220                     }
221                 }
222             }
223
224             for (AbstractProxyTransaction t : proxies.values()) {
225                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
226                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
227                     t.isSnapshotOnly());
228                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
229                 t.replayMessages(newProxy, previousEntries);
230             }
231
232             // Now look for any finalizing messages
233             for (ConnectionEntry e : previousEntries) {
234                 final Request<?, ?> req = e.getRequest();
235                 if (identifier.equals(req.getTarget())) {
236                     Verify.verify(req instanceof LocalHistoryRequest);
237                     successor.connection.sendRequest(req, e.getCallback());
238                 }
239             }
240         }
241
242         @GuardedBy("lock")
243         @Override
244         ProxyHistory finishReconnect() {
245             final ProxyHistory ret = Verify.verifyNotNull(successor);
246
247             for (AbstractProxyTransaction t : proxies.values()) {
248                 t.finishReconnect();
249             }
250
251             LOG.debug("Finished reconnecting proxy history {}", this);
252             lock.unlock();
253             return ret;
254         }
255
256         @Override
257         void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
258                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
259             if (request instanceof TransactionRequest) {
260                 replayTransactionRequest((TransactionRequest<?>) request, callback);
261             } else if (request instanceof LocalHistoryRequest) {
262                 replayTo.accept(request, callback);
263             } else {
264                 throw new IllegalArgumentException("Unhandled request " + request);
265             }
266         }
267
268         private void replayTransactionRequest(final TransactionRequest<?> request,
269                 final Consumer<Response<?, ?>> callback) throws RequestException {
270
271             final AbstractProxyTransaction proxy;
272             lock.lock();
273             try {
274                 proxy = proxies.get(request.getTarget());
275             } finally {
276                 lock.unlock();
277             }
278             if (proxy == null) {
279                 throw new RequestReplayException("Failed to find proxy for %s", request);
280             }
281
282             proxy.replayRequest(request, callback);
283         }
284     }
285
286     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
287
288     private final Lock lock = new ReentrantLock();
289     private final LocalHistoryIdentifier identifier;
290     private final AbstractClientConnection<ShardBackendInfo> connection;
291
292     @GuardedBy("lock")
293     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
294     @GuardedBy("lock")
295     private ProxyHistory successor;
296
297     private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
298             final LocalHistoryIdentifier identifier) {
299         this.connection = Preconditions.checkNotNull(connection);
300         this.identifier = Preconditions.checkNotNull(identifier);
301     }
302
303     static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
304             final LocalHistoryIdentifier identifier) {
305         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
306         return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
307              : new Remote(connection, identifier);
308     }
309
310     static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
311             final LocalHistoryIdentifier identifier) {
312         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
313         return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
314              : new RemoteSingle(connection, identifier);
315     }
316
317     @Override
318     public LocalHistoryIdentifier getIdentifier() {
319         return identifier;
320     }
321
322     final ActorRef localActor() {
323         return connection.localActor();
324     }
325
326     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
327             final boolean snapshotOnly) {
328         lock.lock();
329         try {
330             if (successor != null) {
331                 return successor.createTransactionProxy(txId, snapshotOnly);
332             }
333
334             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
335             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
336             proxies.put(proxyId, ret);
337             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
338             return ret;
339         } finally {
340             lock.unlock();
341         }
342     }
343
344     final void abortTransaction(final AbstractProxyTransaction tx) {
345         lock.lock();
346         try {
347             proxies.remove(tx.getIdentifier());
348             LOG.debug("Proxy {} aborting transaction {}", this, tx);
349             onTransactionAborted(tx);
350         } finally {
351             lock.unlock();
352         }
353     }
354
355     final void completeTransaction(final AbstractProxyTransaction tx) {
356         lock.lock();
357         try {
358             proxies.remove(tx.getIdentifier());
359             LOG.debug("Proxy {} completing transaction {}", this, tx);
360             onTransactionCompleted(tx);
361         } finally {
362             lock.unlock();
363         }
364     }
365
366     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
367         connection.sendRequest(request, callback);
368     }
369
370     @GuardedBy("lock")
371     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
372             TransactionIdentifier txId, boolean snapshotOnly);
373
374     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
375
376     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
377     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
378         lock.lock();
379         if (successor != null) {
380             lock.unlock();
381             throw new IllegalStateException("Proxy history " + this + " already has a successor");
382         }
383
384         successor = createSuccessor(newConnection);
385         LOG.debug("History {} instantiated successor {}", this, successor);
386
387         for (AbstractProxyTransaction t : proxies.values()) {
388             t.startReconnect();
389         }
390
391         return new ReconnectCohort();
392     }
393
394     @GuardedBy("lock")
395     void onTransactionAborted(final AbstractProxyTransaction tx) {
396         // No-op for most implementations
397     }
398
399     @GuardedBy("lock")
400     void onTransactionCompleted(final AbstractProxyTransaction tx) {
401         // No-op for most implementations
402     }
403
404     void onTransactionSealed(final AbstractProxyTransaction tx) {
405         // No-op on most implementations
406     }
407 }