70b5960a056d943ef8f927094e3b433fb85f3021
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import akka.actor.ActorRef;
16 import com.google.common.collect.ImmutableList;
17 import com.google.common.primitives.UnsignedLong;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.LinkedHashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29 import java.util.function.Consumer;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.checkerframework.checker.lock.qual.Holding;
32 import org.eclipse.jdt.annotation.NonNull;
33 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
34 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
35 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
36 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
37 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
38 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
39 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
40 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
41 import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
42 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
43 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
44 import org.opendaylight.controller.cluster.access.concepts.Request;
45 import org.opendaylight.controller.cluster.access.concepts.RequestException;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
48 import org.opendaylight.yangtools.concepts.Identifiable;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Per-connection representation of a local history. This class handles state replication across a single connection.
56  *
57  * @author Robert Varga
58  */
59 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
60     private abstract static class AbstractLocal extends ProxyHistory {
61         private final ReadOnlyDataTree dataTree;
62
63         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
64             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
65             super(parent, connection, identifier);
66             this.dataTree = requireNonNull(dataTree);
67         }
68
69         final DataTreeSnapshot takeSnapshot() {
70             return dataTree.takeSnapshot();
71         }
72     }
73
74     private abstract static class AbstractRemote extends ProxyHistory {
75         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
76             final LocalHistoryIdentifier identifier) {
77             super(parent, connection, identifier);
78         }
79     }
80
81     private static final class Local extends AbstractLocal {
82         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
83                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
84
85         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
86         // the open one and attempts to create a new transaction again.
87         private LocalReadWriteProxyTransaction lastOpen;
88
89         private volatile LocalReadWriteProxyTransaction lastSealed;
90
91         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
92             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
93             super(parent, connection, identifier, dataTree);
94         }
95
96         @Override
97         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
98                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
99             checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
100
101             if (isDone) {
102                 // Done transactions do not register on our radar on should not have any state associated.
103                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
104                         : new LocalReadWriteProxyTransaction(this, txId);
105             }
106
107             // onTransactionCompleted() runs concurrently
108             final LocalReadWriteProxyTransaction localSealed = lastSealed;
109             final DataTreeSnapshot baseSnapshot;
110             if (localSealed != null) {
111                 baseSnapshot = localSealed.getSnapshot();
112             } else {
113                 baseSnapshot = takeSnapshot();
114             }
115
116             if (snapshotOnly) {
117                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
118             }
119
120             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
121             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
122             return lastOpen;
123         }
124
125         @Override
126         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
127             return createClient(parent(), connection, getIdentifier());
128         }
129
130         @Override
131         void onTransactionAborted(final AbstractProxyTransaction tx) {
132             if (tx.equals(lastOpen)) {
133                 lastOpen = null;
134             }
135         }
136
137         @Override
138         void onTransactionCompleted(final AbstractProxyTransaction tx) {
139             verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx);
140             if (tx instanceof LocalReadWriteProxyTransaction
141                     && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
142                 LOG.debug("Completed last sealed transaction {}", tx);
143             }
144         }
145
146         @Override
147         void onTransactionSealed(final AbstractProxyTransaction tx) {
148             checkState(tx.equals(lastOpen));
149             lastSealed = lastOpen;
150             lastOpen = null;
151         }
152     }
153
154     private static final class LocalSingle extends AbstractLocal {
155         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
156             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
157             super(parent, connection, identifier, dataTree);
158         }
159
160         @Override
161         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
162                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
163             final DataTreeSnapshot snapshot = takeSnapshot();
164             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
165                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
166         }
167
168         @Override
169         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
170             return createSingle(parent(), connection, getIdentifier());
171         }
172     }
173
174     private static final class Remote extends AbstractRemote {
175         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
176             final LocalHistoryIdentifier identifier) {
177             super(parent, connection, identifier);
178         }
179
180         @Override
181         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
182                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
183             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
184         }
185
186         @Override
187         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
188             return createClient(parent(), connection, getIdentifier());
189         }
190     }
191
192     private static final class RemoteSingle extends AbstractRemote {
193         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
194             final LocalHistoryIdentifier identifier) {
195             super(parent, connection, identifier);
196         }
197
198         @Override
199         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
200                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
201             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
202         }
203
204         @Override
205         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
206             return createSingle(parent(), connection, getIdentifier());
207         }
208     }
209
210     private static final class RequestReplayException extends RequestException {
211         private static final long serialVersionUID = 1L;
212
213         RequestReplayException(final String format, final Object... args) {
214             super(String.format(format, args));
215         }
216
217         @Override
218         public boolean isRetriable() {
219             return false;
220         }
221     }
222
223     private final class ReconnectCohort extends ProxyReconnectCohort {
224         @Override
225         public LocalHistoryIdentifier getIdentifier() {
226             return identifier;
227         }
228
229         @Holding("lock")
230         @Override
231         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
232             // First look for our Create message
233             Iterator<ConnectionEntry> it = previousEntries.iterator();
234             while (it.hasNext()) {
235                 final ConnectionEntry e = it.next();
236                 final Request<?, ?> req = e.getRequest();
237                 if (identifier.equals(req.getTarget())) {
238                     verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
239                     if (req instanceof CreateLocalHistoryRequest) {
240                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
241                         it.remove();
242                         break;
243                     }
244                 }
245             }
246
247             for (AbstractProxyTransaction t : proxies.values()) {
248                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
249                 t.replayMessages(successor, previousEntries);
250             }
251
252             // Forward any skipped transactions
253             final var local = skippedTransactions;
254             if (local != null) {
255                 LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
256                 successor.skipTransactions(local);
257                 skippedTransactions = null;
258             }
259
260             // Now look for any finalizing messages
261             it = previousEntries.iterator();
262             while (it.hasNext()) {
263                 final ConnectionEntry e  = it.next();
264                 final Request<?, ?> req = e.getRequest();
265                 if (identifier.equals(req.getTarget())) {
266                     verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
267                     if (req instanceof DestroyLocalHistoryRequest) {
268                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
269                         it.remove();
270                         break;
271                     }
272                 }
273             }
274         }
275
276         @Holding("lock")
277         @Override
278         ProxyHistory finishReconnect() {
279             final ProxyHistory ret = verifyNotNull(successor);
280
281             for (AbstractProxyTransaction t : proxies.values()) {
282                 t.finishReconnect();
283             }
284
285             LOG.debug("Finished reconnecting proxy history {}", this);
286             lock.unlock();
287             return ret;
288         }
289
290         @Override
291         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
292                 throws RequestException {
293             final Request<?, ?> request = entry.getRequest();
294             if (request instanceof TransactionRequest) {
295                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
296                     entry.getEnqueuedTicks());
297             } else if (request instanceof LocalHistoryRequest) {
298                 replayTo.accept(entry);
299             } else {
300                 throw new IllegalArgumentException("Unhandled request " + request);
301             }
302         }
303
304         @Override
305         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
306                 throws RequestException {
307             final Request<?, ?> request = entry.getRequest();
308             if (request instanceof TransactionRequest) {
309                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
310             } else if (request instanceof LocalHistoryRequest) {
311                 forwardTo.accept(entry);
312             } else {
313                 throw new IllegalArgumentException("Unhandled request " + request);
314             }
315         }
316
317         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
318                 throws RequestReplayException {
319             final AbstractProxyTransaction proxy;
320             lock.lock();
321             try {
322                 proxy = proxies.get(request.getTarget());
323             } finally {
324                 lock.unlock();
325             }
326             if (proxy != null) {
327                 return proxy;
328             }
329
330             throw new RequestReplayException("Failed to find proxy for %s", request);
331         }
332     }
333
334     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
335
336     private final Lock lock = new ReentrantLock();
337     private final @NonNull LocalHistoryIdentifier identifier;
338     private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
339     private final @NonNull AbstractClientHistory parent;
340
341     @GuardedBy("lock")
342     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
343     @GuardedBy("lock")
344     private ProxyHistory successor;
345
346     // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
347     // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
348     // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
349     //
350     // <p>
351     // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
352     // into purge requests when:
353     // - we are about to allocate a new transaction
354     // - we get a successor proxy
355     // - the list grows unreasonably long
356     //
357     // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
358     //       have a {@code List<long>}.
359     // FIXME: this is not tuneable, but perhaps should be
360     // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
361     private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
362
363     @GuardedBy("lock")
364     private volatile List<TransactionIdentifier> skippedTransactions;
365
366     private ProxyHistory(final AbstractClientHistory parent,
367             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
368         this.parent = requireNonNull(parent);
369         this.connection = requireNonNull(connection);
370         this.identifier = requireNonNull(identifier);
371     }
372
373     static ProxyHistory createClient(final AbstractClientHistory parent,
374             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
375         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
376         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
377              : new Remote(parent, connection, identifier);
378     }
379
380     static ProxyHistory createSingle(final AbstractClientHistory parent,
381             final AbstractClientConnection<ShardBackendInfo> connection,
382             final LocalHistoryIdentifier identifier) {
383         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
384         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
385              : new RemoteSingle(parent, connection, identifier);
386     }
387
388     @Override
389     // Non-final for mocking
390     public LocalHistoryIdentifier getIdentifier() {
391         return identifier;
392     }
393
394     final ClientActorContext context() {
395         return connection.context();
396     }
397
398     final long currentTime() {
399         return connection.currentTime();
400     }
401
402     final ActorRef localActor() {
403         return connection.localActor();
404     }
405
406     final AbstractClientHistory parent() {
407         return parent;
408     }
409
410     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
411             final boolean snapshotOnly) {
412         return createTransactionProxy(txId, snapshotOnly, false);
413     }
414
415     // Non-final for mocking
416     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
417             final boolean isDone) {
418         lock.lock();
419         try {
420             if (successor != null) {
421                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
422             }
423
424             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
425             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
426             proxies.put(proxyId, ret);
427             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
428             return ret;
429         } finally {
430             lock.unlock();
431         }
432     }
433
434     final void skipTransaction(final TransactionIdentifier txId) {
435         lock.lock();
436         try {
437             if (successor != null) {
438                 successor.skipTransaction(txId);
439                 return;
440             }
441
442             var local = skippedTransactions;
443             if (local == null) {
444                 skippedTransactions = local = new ArrayList<>();
445             }
446             local.add(txId);
447             LOG.debug("Recorded skipped transaction {}", txId);
448             skipIfNeeded(local);
449         } finally {
450             lock.unlock();
451         }
452     }
453
454     @Holding("lock")
455     private void skipIfNeeded(final List<TransactionIdentifier> current) {
456         if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
457             skippedTransactions = null;
458             doSkipTransactions(current);
459         }
460     }
461
462     private void skipTransactions(final List<TransactionIdentifier> toSkip) {
463         lock.lock();
464         try {
465             if (successor != null) {
466                 successor.skipTransactions(toSkip);
467                 return;
468             }
469
470             var local = skippedTransactions;
471             if (local != null) {
472                 local.addAll(toSkip);
473             } else {
474                 skippedTransactions = local = toSkip;
475             }
476             skipIfNeeded(local);
477         } finally {
478             lock.unlock();
479         }
480     }
481
482     private void skipTransactions() {
483         var local = skippedTransactions;
484         if (local != null) {
485             lock.lock();
486             try {
487                 local = skippedTransactions;
488                 if (local != null && successor == null) {
489                     skippedTransactions = null;
490                     doSkipTransactions(local);
491                 }
492             } finally {
493                 lock.unlock();
494             }
495         }
496     }
497
498     @Holding("lock")
499     private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
500         final var txIds = toSkip.stream()
501             .mapToLong(TransactionIdentifier::getTransactionId)
502             .distinct()
503             .sorted()
504             .mapToObj(UnsignedLong::fromLongBits)
505             .collect(ImmutableList.toImmutableList());
506
507         LOG.debug("Proxy {} skipping transactions {}", this, txIds);
508         connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
509             txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
510                 LOG.debug("Proxy {} confirmed transaction skip", this);
511             }, connection.currentTime());
512     }
513
514     final void abortTransaction(final AbstractProxyTransaction tx) {
515         lock.lock();
516         try {
517             // Removal will be completed once purge completes
518             LOG.debug("Proxy {} aborted transaction {}", this, tx);
519             onTransactionAborted(tx);
520         } finally {
521             lock.unlock();
522         }
523     }
524
525     final void completeTransaction(final AbstractProxyTransaction tx) {
526         lock.lock();
527         try {
528             // Removal will be completed once purge completes
529             LOG.debug("Proxy {} completing transaction {}", this, tx);
530             onTransactionCompleted(tx);
531         } finally {
532             lock.unlock();
533         }
534     }
535
536     final void purgeTransaction(final AbstractProxyTransaction tx) {
537         lock.lock();
538         try {
539             proxies.remove(tx.getIdentifier());
540             LOG.debug("Proxy {} purged transaction {}", this, tx);
541         } finally {
542             lock.unlock();
543         }
544     }
545
546     final void close() {
547         lock.lock();
548         try {
549             if (successor != null) {
550                 successor.close();
551                 return;
552             }
553
554             LOG.debug("Proxy {} invoking destroy", this);
555             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
556                 this::onDestroyComplete);
557         } finally {
558             lock.unlock();
559         }
560     }
561
562     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
563             final long enqueuedTicks) {
564         skipTransactions();
565         connection.enqueueRequest(request, callback, enqueuedTicks);
566     }
567
568     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
569         skipTransactions();
570         connection.sendRequest(request, callback);
571     }
572
573     @Holding("lock")
574     @SuppressWarnings("checkstyle:hiddenField")
575     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
576             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
577
578     @Holding("lock")
579     @SuppressWarnings("checkstyle:hiddenField")
580     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
581
582     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
583     final ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
584         lock.lock();
585         if (successor != null) {
586             lock.unlock();
587             throw new IllegalStateException("Proxy history " + this + " already has a successor");
588         }
589
590         successor = createSuccessor(newConnection);
591         LOG.debug("History {} instantiated successor {}", this, successor);
592
593         for (AbstractProxyTransaction t : proxies.values()) {
594             t.startReconnect();
595         }
596
597         return new ReconnectCohort();
598     }
599
600     private void onDestroyComplete(final Response<?, ?> response) {
601         LOG.debug("Proxy {} destroy completed with {}", this, response);
602
603         lock.lock();
604         try {
605             parent.onProxyDestroyed(this);
606             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
607                 this::onPurgeComplete);
608         } finally {
609             lock.unlock();
610         }
611     }
612
613     private void onPurgeComplete(final Response<?, ?> response) {
614         LOG.debug("Proxy {} purge completed with {}", this, response);
615     }
616
617     @Holding("lock")
618     void onTransactionAborted(final AbstractProxyTransaction tx) {
619         // No-op for most implementations
620     }
621
622     @Holding("lock")
623     void onTransactionCompleted(final AbstractProxyTransaction tx) {
624         // No-op for most implementations
625     }
626
627     @Holding("lock")
628     void onTransactionSealed(final AbstractProxyTransaction tx) {
629         // No-op on most implementations
630     }
631 }