X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=d6aa3d3f3fc079db3ac4ed12e46c40173f1c8dc4;hb=823bd74f34ee1c651f1f90daeef386a35c68d431;hp=07fcbebad01a263d38a0bfabe839e8b02832ffe2;hpb=b5444f8c2c10ded63d6a9e890db61b0f3aa2095e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 07fcbebad0..d6aa3d3f3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -22,6 +22,8 @@ import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -30,7 +32,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; @@ -61,23 +62,17 @@ abstract class ProxyHistory implements Identifiable { final LocalHistoryIdentifier identifier) { super(connection, identifier); } - - @Override - final AbstractProxyTransaction doCreateTransactionProxy( - final AbstractClientConnection connection, final TransactionIdentifier txId) { - return new RemoteProxyTransaction(this, txId); - } } private static final class Local extends AbstractLocal { - private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed"); + private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed"); // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting // the open one and attempts to create a new transaction again. - private LocalProxyTransaction lastOpen; + private LocalReadWriteProxyTransaction lastOpen; - private volatile LocalProxyTransaction lastSealed; + private volatile LocalReadWriteProxyTransaction lastSealed; Local(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, final DataTree dataTree) { @@ -86,11 +81,11 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { + final TransactionIdentifier txId, final boolean snapshotOnly) { Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); // onTransactionCompleted() runs concurrently - final LocalProxyTransaction localSealed = lastSealed; + final LocalReadWriteProxyTransaction localSealed = lastSealed; final DataTreeSnapshot baseSnapshot; if (localSealed != null) { baseSnapshot = localSealed.getSnapshot(); @@ -98,8 +93,11 @@ abstract class ProxyHistory implements Identifiable { baseSnapshot = takeSnapshot(); } - lastOpen = new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) baseSnapshot.newModification()); + if (snapshotOnly) { + return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot); + } + + lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot); LOG.debug("Proxy {} open transaction {}", this, lastOpen); return lastOpen; } @@ -111,16 +109,18 @@ abstract class ProxyHistory implements Identifiable { @Override void onTransactionAborted(final AbstractProxyTransaction tx) { - Preconditions.checkState(tx.equals(lastOpen)); - lastOpen = null; + if (tx.equals(lastOpen)) { + lastOpen = null; + } } @Override void onTransactionCompleted(final AbstractProxyTransaction tx) { Verify.verify(tx instanceof LocalProxyTransaction); - - if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) { - LOG.debug("Completed last sealed transaction {}", tx); + if (tx instanceof LocalReadWriteProxyTransaction) { + if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { + LOG.debug("Completed last sealed transaction {}", tx); + } } } @@ -140,9 +140,10 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { - return new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) takeSnapshot().newModification()); + final TransactionIdentifier txId, final boolean snapshotOnly) { + final DataTreeSnapshot snapshot = takeSnapshot(); + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) : + new LocalReadWriteProxyTransaction(this, txId, snapshot); } @Override @@ -156,6 +157,12 @@ abstract class ProxyHistory implements Identifiable { super(connection, identifier); } + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, true); + } + @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { return createClient(connection, getIdentifier()); @@ -168,6 +175,12 @@ abstract class ProxyHistory implements Identifiable { super(connection, identifier); } + @Override + AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, + final TransactionIdentifier txId, final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, false); + } + @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { return createSingle(connection, getIdentifier()); @@ -195,12 +208,34 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests() { + void replaySuccessfulRequests(final Iterable previousEntries) { + // First look for our Create message + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + if (req instanceof CreateLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + break; + } + } + } + for (AbstractProxyTransaction t : proxies.values()) { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); + final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(), + t.isSnapshotOnly()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.startReconnect(newProxy); + t.replayMessages(newProxy, previousEntries); + } + + // Now look for any finalizing messages + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + successor.connection.sendRequest(req, e.getCallback()); + } } } @@ -288,15 +323,16 @@ abstract class ProxyHistory implements Identifiable { return connection.localActor(); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, + final boolean snapshotOnly) { lock.lock(); try { if (successor != null) { - return successor.createTransactionProxy(txId); + return successor.createTransactionProxy(txId, snapshotOnly); } final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId); + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); return ret; @@ -333,7 +369,7 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, - TransactionIdentifier txId); + TransactionIdentifier txId, boolean snapshotOnly); abstract ProxyHistory createSuccessor(AbstractClientConnection connection); @@ -347,6 +383,11 @@ abstract class ProxyHistory implements Identifiable { successor = createSuccessor(newConnection); LOG.debug("History {} instantiated successor {}", this, successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.startReconnect(); + } + return new ReconnectCohort(); }