X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=827c19e526fcbd5f6b3cc1c33dfdc40698af11e9;hp=b3b604b7f08ad7e42abb4a224626673b97b8b8e3;hb=047566574ea74d1dfe24fa8075f8ba137faa698c;hpb=b4d95acff78952020e9fbde4372d13b461fd7469 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index b3b604b7f0..827c19e526 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -24,7 +24,9 @@ import org.opendaylight.controller.cluster.access.client.AbstractClientConnectio import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -32,7 +34,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; @@ -47,9 +48,9 @@ abstract class ProxyHistory implements Identifiable { private abstract static class AbstractLocal extends ProxyHistory { private final DataTree dataTree; - AbstractLocal(final AbstractClientConnection connection, + AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, final DataTree dataTree) { - super(connection, identifier); + super(parent, connection, identifier); this.dataTree = Preconditions.checkNotNull(dataTree); } @@ -59,40 +60,34 @@ abstract class ProxyHistory implements Identifiable { } private abstract static class AbstractRemote extends ProxyHistory { - AbstractRemote(final AbstractClientConnection connection, + AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); - } - - @Override - final AbstractProxyTransaction doCreateTransactionProxy( - final AbstractClientConnection connection, final TransactionIdentifier txId) { - return new RemoteProxyTransaction(this, txId); + super(parent, connection, identifier); } } private static final class Local extends AbstractLocal { - private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed"); + private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed"); // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting // the open one and attempts to create a new transaction again. - private LocalProxyTransaction lastOpen; + private LocalReadWriteProxyTransaction lastOpen; - private volatile LocalProxyTransaction lastSealed; + private volatile LocalReadWriteProxyTransaction lastSealed; - Local(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, - final DataTree dataTree) { - super(connection, identifier, dataTree); + Local(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final DataTree dataTree) { + super(parent, connection, identifier, dataTree); } @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { + final TransactionIdentifier txId, final boolean snapshotOnly) { Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); // onTransactionCompleted() runs concurrently - final LocalProxyTransaction localSealed = lastSealed; + final LocalReadWriteProxyTransaction localSealed = lastSealed; final DataTreeSnapshot baseSnapshot; if (localSealed != null) { baseSnapshot = localSealed.getSnapshot(); @@ -100,29 +95,34 @@ abstract class ProxyHistory implements Identifiable { baseSnapshot = takeSnapshot(); } - lastOpen = new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) baseSnapshot.newModification()); + if (snapshotOnly) { + return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot); + } + + lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot); LOG.debug("Proxy {} open transaction {}", this, lastOpen); return lastOpen; } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createClient(connection, getIdentifier()); + return createClient(parent(), connection, getIdentifier()); } @Override void onTransactionAborted(final AbstractProxyTransaction tx) { - Preconditions.checkState(tx.equals(lastOpen)); - lastOpen = null; + if (tx.equals(lastOpen)) { + lastOpen = null; + } } @Override void onTransactionCompleted(final AbstractProxyTransaction tx) { Verify.verify(tx instanceof LocalProxyTransaction); - - if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) { - LOG.debug("Completed last sealed transaction {}", tx); + if (tx instanceof LocalReadWriteProxyTransaction) { + if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { + LOG.debug("Completed last sealed transaction {}", tx); + } } } @@ -135,44 +135,58 @@ abstract class ProxyHistory implements Identifiable { } private static final class LocalSingle extends AbstractLocal { - LocalSingle(final AbstractClientConnection connection, + LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, final DataTree dataTree) { - super(connection, identifier, dataTree); + super(parent, connection, identifier, dataTree); } @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { - return new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) takeSnapshot().newModification()); + final TransactionIdentifier txId, final boolean snapshotOnly) { + final DataTreeSnapshot snapshot = takeSnapshot(); + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) : + new LocalReadWriteProxyTransaction(this, txId, snapshot); } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createSingle(connection, getIdentifier()); + return createSingle(parent(), connection, getIdentifier()); } } private static final class Remote extends AbstractRemote { - Remote(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + Remote(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + super(parent, connection, identifier); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, true); } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createClient(connection, getIdentifier()); + return createClient(parent(), connection, getIdentifier()); } } private static final class RemoteSingle extends AbstractRemote { - RemoteSingle(final AbstractClientConnection connection, + RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + super(parent, connection, identifier); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, false); } @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createSingle(connection, getIdentifier()); + return createSingle(parent(), connection, getIdentifier()); } } @@ -197,7 +211,7 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests(final Iterable previousEntries) { + void replayRequests(final Iterable previousEntries) { // First look for our Create message for (ConnectionEntry e : previousEntries) { final Request req = e.getRequest(); @@ -212,7 +226,8 @@ abstract class ProxyHistory implements Identifiable { for (AbstractProxyTransaction t : proxies.values()) { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); + final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(), + t.isSnapshotOnly()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); t.replayMessages(newProxy, previousEntries); } @@ -242,18 +257,18 @@ abstract class ProxyHistory implements Identifiable { } @Override - void replayRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> replayTo) throws RequestException { + void forwardRequest(final Request request, final Consumer> callback, + final BiConsumer, Consumer>> forwardTo) throws RequestException { if (request instanceof TransactionRequest) { - replayTransactionRequest((TransactionRequest) request, callback); + forwardTransactionRequest((TransactionRequest) request, callback); } else if (request instanceof LocalHistoryRequest) { - replayTo.accept(request, callback); + forwardTo.accept(request, callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void replayTransactionRequest(final TransactionRequest request, + private void forwardTransactionRequest(final TransactionRequest request, final Consumer> callback) throws RequestException { final AbstractProxyTransaction proxy; @@ -267,7 +282,7 @@ abstract class ProxyHistory implements Identifiable { throw new RequestReplayException("Failed to find proxy for %s", request); } - proxy.replayRequest(request, callback); + proxy.forwardRequest(request, callback); } } @@ -276,30 +291,33 @@ abstract class ProxyHistory implements Identifiable { private final Lock lock = new ReentrantLock(); private final LocalHistoryIdentifier identifier; private final AbstractClientConnection connection; + private final AbstractClientHistory parent; @GuardedBy("lock") private final Map proxies = new LinkedHashMap<>(); @GuardedBy("lock") private ProxyHistory successor; - private ProxyHistory(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier) { + private ProxyHistory(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { + this.parent = Preconditions.checkNotNull(parent); this.connection = Preconditions.checkNotNull(connection); this.identifier = Preconditions.checkNotNull(identifier); } - static ProxyHistory createClient(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier) { + static ProxyHistory createClient(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get()) - : new Remote(connection, identifier); + return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get()) + : new Remote(parent, connection, identifier); } - static ProxyHistory createSingle(final AbstractClientConnection connection, + static ProxyHistory createSingle(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get()) - : new RemoteSingle(connection, identifier); + return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get()) + : new RemoteSingle(parent, connection, identifier); } @Override @@ -311,15 +329,20 @@ abstract class ProxyHistory implements Identifiable { return connection.localActor(); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { + final AbstractClientHistory parent() { + return parent; + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, + final boolean snapshotOnly) { lock.lock(); try { if (successor != null) { - return successor.createTransactionProxy(txId); + return successor.createTransactionProxy(txId, snapshotOnly); } final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId); + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); return ret; @@ -350,13 +373,29 @@ abstract class ProxyHistory implements Identifiable { } } + final void close() { + lock.lock(); + try { + if (successor != null) { + successor.close(); + return; + } + + LOG.debug("Proxy {} invoking destroy", this); + connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()), + this::onDestroyComplete); + } finally { + lock.unlock(); + } + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { connection.sendRequest(request, callback); } @GuardedBy("lock") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, - TransactionIdentifier txId); + TransactionIdentifier txId, boolean snapshotOnly); abstract ProxyHistory createSuccessor(AbstractClientConnection connection); @@ -378,6 +417,23 @@ abstract class ProxyHistory implements Identifiable { return new ReconnectCohort(); } + private void onDestroyComplete(final Response response) { + LOG.debug("Proxy {} destroy completed with {}", this, response); + + lock.lock(); + try { + parent.onProxyDestroyed(this); + connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()), + this::onPurgeComplete); + } finally { + lock.unlock(); + } + } + + private void onPurgeComplete(final Response response) { + LOG.debug("Proxy {} purge completed with {}", this, response); + } + @GuardedBy("lock") void onTransactionAborted(final AbstractProxyTransaction tx) { // No-op for most implementations