34e8ba37a379669de09013099ad83a8e9ceaa38c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Per-connection representation of a local history. This class handles state replication across a single connection.
47  *
48  * @author Robert Varga
49  */
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51     private abstract static class AbstractLocal extends ProxyHistory {
52         private final DataTree dataTree;
53
54         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56             super(parent, connection, identifier);
57             this.dataTree = Preconditions.checkNotNull(dataTree);
58         }
59
60         final DataTreeSnapshot takeSnapshot() {
61             return dataTree.takeSnapshot();
62         }
63     }
64
65     private abstract static class AbstractRemote extends ProxyHistory {
66         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67             final LocalHistoryIdentifier identifier) {
68             super(parent, connection, identifier);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalReadWriteProxyTransaction lastOpen;
79
80         private volatile LocalReadWriteProxyTransaction lastSealed;
81
82         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84             super(parent, connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId, final boolean snapshotOnly) {
90             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91
92             // onTransactionCompleted() runs concurrently
93             final LocalReadWriteProxyTransaction localSealed = lastSealed;
94             final DataTreeSnapshot baseSnapshot;
95             if (localSealed != null) {
96                 baseSnapshot = localSealed.getSnapshot();
97             } else {
98                 baseSnapshot = takeSnapshot();
99             }
100
101             if (snapshotOnly) {
102                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
103             }
104
105             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
106             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
107             return lastOpen;
108         }
109
110         @Override
111         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
112             return createClient(parent(), connection, getIdentifier());
113         }
114
115         @Override
116         void onTransactionAborted(final AbstractProxyTransaction tx) {
117             if (tx.equals(lastOpen)) {
118                 lastOpen = null;
119             }
120         }
121
122         @Override
123         void onTransactionCompleted(final AbstractProxyTransaction tx) {
124             Verify.verify(tx instanceof LocalProxyTransaction);
125             if (tx instanceof LocalReadWriteProxyTransaction) {
126                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
127                     LOG.debug("Completed last sealed transaction {}", tx);
128                 }
129             }
130         }
131
132         @Override
133         void onTransactionSealed(final AbstractProxyTransaction tx) {
134             Preconditions.checkState(tx.equals(lastOpen));
135             lastSealed = lastOpen;
136             lastOpen = null;
137         }
138     }
139
140     private static final class LocalSingle extends AbstractLocal {
141         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
142             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
143             super(parent, connection, identifier, dataTree);
144         }
145
146         @Override
147         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
148                 final TransactionIdentifier txId, final boolean snapshotOnly) {
149             final DataTreeSnapshot snapshot = takeSnapshot();
150             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
151                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
152         }
153
154         @Override
155         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
156             return createSingle(parent(), connection, getIdentifier());
157         }
158     }
159
160     private static final class Remote extends AbstractRemote {
161         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
162             final LocalHistoryIdentifier identifier) {
163             super(parent, connection, identifier);
164         }
165
166         @Override
167         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
168                 final TransactionIdentifier txId, final boolean snapshotOnly) {
169             return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
170         }
171
172         @Override
173         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
174             return createClient(parent(), connection, getIdentifier());
175         }
176     }
177
178     private static final class RemoteSingle extends AbstractRemote {
179         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
180             final LocalHistoryIdentifier identifier) {
181             super(parent, connection, identifier);
182         }
183
184         @Override
185         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
186                 final TransactionIdentifier txId, final boolean snapshotOnly) {
187             return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
188         }
189
190         @Override
191         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
192             return createSingle(parent(), connection, getIdentifier());
193         }
194     }
195
196     private static final class RequestReplayException extends RequestException {
197         private static final long serialVersionUID = 1L;
198
199         RequestReplayException(final String format, final Object... args) {
200             super(String.format(format, args));
201         }
202
203         @Override
204         public boolean isRetriable() {
205             return false;
206         }
207     }
208
209     private final class ReconnectCohort extends ProxyReconnectCohort {
210         @Override
211         public LocalHistoryIdentifier getIdentifier() {
212             return identifier;
213         }
214
215         @GuardedBy("lock")
216         @Override
217         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
218             // First look for our Create message
219             Iterator<ConnectionEntry> it = previousEntries.iterator();
220             while (it.hasNext()) {
221                 final ConnectionEntry e = it.next();
222                 final Request<?, ?> req = e.getRequest();
223                 if (identifier.equals(req.getTarget())) {
224                     Verify.verify(req instanceof LocalHistoryRequest);
225                     if (req instanceof CreateLocalHistoryRequest) {
226                         successor.connection.sendRequest(req, e.getCallback());
227                         it.remove();
228                         break;
229                     }
230                 }
231             }
232
233             for (AbstractProxyTransaction t : proxies.values()) {
234                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
235                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
236                     t.isSnapshotOnly());
237                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
238                 t.replayMessages(newProxy, previousEntries);
239             }
240
241             // Now look for any finalizing messages
242             it = previousEntries.iterator();
243             while (it.hasNext()) {
244                 final ConnectionEntry e  = it.next();
245                 final Request<?, ?> req = e.getRequest();
246                 if (identifier.equals(req.getTarget())) {
247                     Verify.verify(req instanceof LocalHistoryRequest);
248                     if (req instanceof DestroyLocalHistoryRequest) {
249                         successor.connection.sendRequest(req, e.getCallback());
250                         it.remove();
251                         break;
252                     }
253                 }
254             }
255         }
256
257         @GuardedBy("lock")
258         @Override
259         ProxyHistory finishReconnect() {
260             final ProxyHistory ret = Verify.verifyNotNull(successor);
261
262             for (AbstractProxyTransaction t : proxies.values()) {
263                 t.finishReconnect();
264             }
265
266             LOG.debug("Finished reconnecting proxy history {}", this);
267             lock.unlock();
268             return ret;
269         }
270
271         @Override
272         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
273                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
274             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
275             //        period required to get into the queue.
276             if (request instanceof TransactionRequest) {
277                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
278             } else if (request instanceof LocalHistoryRequest) {
279                 forwardTo.accept(request, callback);
280             } else {
281                 throw new IllegalArgumentException("Unhandled request " + request);
282             }
283         }
284
285         private void forwardTransactionRequest(final TransactionRequest<?> request,
286                 final Consumer<Response<?, ?>> callback) throws RequestException {
287
288             final AbstractProxyTransaction proxy;
289             lock.lock();
290             try {
291                 proxy = proxies.get(request.getTarget());
292             } finally {
293                 lock.unlock();
294             }
295             if (proxy == null) {
296                 throw new RequestReplayException("Failed to find proxy for %s", request);
297             }
298
299             proxy.forwardRequest(request, callback);
300         }
301     }
302
303     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
304
305     private final Lock lock = new ReentrantLock();
306     private final LocalHistoryIdentifier identifier;
307     private final AbstractClientConnection<ShardBackendInfo> connection;
308     private final AbstractClientHistory parent;
309
310     @GuardedBy("lock")
311     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
312     @GuardedBy("lock")
313     private ProxyHistory successor;
314
315     private ProxyHistory(final AbstractClientHistory parent,
316             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
317         this.parent = Preconditions.checkNotNull(parent);
318         this.connection = Preconditions.checkNotNull(connection);
319         this.identifier = Preconditions.checkNotNull(identifier);
320     }
321
322     static ProxyHistory createClient(final AbstractClientHistory parent,
323             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
324         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
325         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
326              : new Remote(parent, connection, identifier);
327     }
328
329     static ProxyHistory createSingle(final AbstractClientHistory parent,
330             final AbstractClientConnection<ShardBackendInfo> connection,
331             final LocalHistoryIdentifier identifier) {
332         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
333         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
334              : new RemoteSingle(parent, connection, identifier);
335     }
336
337     @Override
338     public LocalHistoryIdentifier getIdentifier() {
339         return identifier;
340     }
341
342     final ClientActorContext context() {
343         return connection.context();
344     }
345
346     final long currentTime() {
347         return connection.currentTime();
348     }
349
350     final ActorRef localActor() {
351         return connection.localActor();
352     }
353
354     final AbstractClientHistory parent() {
355         return parent;
356     }
357
358     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
359             final boolean snapshotOnly) {
360         lock.lock();
361         try {
362             if (successor != null) {
363                 return successor.createTransactionProxy(txId, snapshotOnly);
364             }
365
366             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
367             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
368             proxies.put(proxyId, ret);
369             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
370             return ret;
371         } finally {
372             lock.unlock();
373         }
374     }
375
376     final void abortTransaction(final AbstractProxyTransaction tx) {
377         lock.lock();
378         try {
379             // Removal will be completed once purge completes
380             LOG.debug("Proxy {} aborted transaction {}", this, tx);
381             onTransactionAborted(tx);
382         } finally {
383             lock.unlock();
384         }
385     }
386
387     final void completeTransaction(final AbstractProxyTransaction tx) {
388         lock.lock();
389         try {
390             proxies.remove(tx.getIdentifier());
391             LOG.debug("Proxy {} completing transaction {}", this, tx);
392             onTransactionCompleted(tx);
393         } finally {
394             lock.unlock();
395         }
396     }
397
398     final void close() {
399         lock.lock();
400         try {
401             if (successor != null) {
402                 successor.close();
403                 return;
404             }
405
406             LOG.debug("Proxy {} invoking destroy", this);
407             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
408                 this::onDestroyComplete);
409         } finally {
410             lock.unlock();
411         }
412     }
413
414     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
415             final long enqueuedTicks) {
416         connection.enqueueRequest(request, callback, enqueuedTicks);
417     }
418
419     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
420         connection.sendRequest(request, callback);
421     }
422
423     @GuardedBy("lock")
424     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
425             TransactionIdentifier txId, boolean snapshotOnly);
426
427     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
428
429     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
430     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
431         lock.lock();
432         if (successor != null) {
433             lock.unlock();
434             throw new IllegalStateException("Proxy history " + this + " already has a successor");
435         }
436
437         successor = createSuccessor(newConnection);
438         LOG.debug("History {} instantiated successor {}", this, successor);
439
440         for (AbstractProxyTransaction t : proxies.values()) {
441             t.startReconnect();
442         }
443
444         return new ReconnectCohort();
445     }
446
447     private void onDestroyComplete(final Response<?, ?> response) {
448         LOG.debug("Proxy {} destroy completed with {}", this, response);
449
450         lock.lock();
451         try {
452             parent.onProxyDestroyed(this);
453             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
454                 this::onPurgeComplete);
455         } finally {
456             lock.unlock();
457         }
458     }
459
460     private void onPurgeComplete(final Response<?, ?> response) {
461         LOG.debug("Proxy {} purge completed with {}", this, response);
462     }
463
464     @GuardedBy("lock")
465     void onTransactionAborted(final AbstractProxyTransaction tx) {
466         // No-op for most implementations
467     }
468
469     @GuardedBy("lock")
470     void onTransactionCompleted(final AbstractProxyTransaction tx) {
471         // No-op for most implementations
472     }
473
474     void onTransactionSealed(final AbstractProxyTransaction tx) {
475         // No-op on most implementations
476     }
477 }