import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
private abstract static class AbstractLocal extends ProxyHistory {
private final DataTree dataTree;
- AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
+ AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
final LocalHistoryIdentifier identifier, final DataTree dataTree) {
- super(connection, identifier);
+ super(parent, connection, identifier);
this.dataTree = Preconditions.checkNotNull(dataTree);
}
}
private abstract static class AbstractRemote extends ProxyHistory {
- AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
+ AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
final LocalHistoryIdentifier identifier) {
- super(connection, identifier);
- }
-
- @Override
- final AbstractProxyTransaction doCreateTransactionProxy(
- final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
- return new RemoteProxyTransaction(this, txId);
+ super(parent, connection, identifier);
}
}
private static final class Local extends AbstractLocal {
- private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
+ private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
// Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
// the open one and attempts to create a new transaction again.
- private LocalProxyTransaction lastOpen;
+ private LocalReadWriteProxyTransaction lastOpen;
- private volatile LocalProxyTransaction lastSealed;
+ private volatile LocalReadWriteProxyTransaction lastSealed;
- Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
- final DataTree dataTree) {
- super(connection, identifier, dataTree);
+ Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+ final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+ super(parent, connection, identifier, dataTree);
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId) {
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
// onTransactionCompleted() runs concurrently
- final LocalProxyTransaction localSealed = lastSealed;
+ final LocalReadWriteProxyTransaction localSealed = lastSealed;
final DataTreeSnapshot baseSnapshot;
if (localSealed != null) {
baseSnapshot = localSealed.getSnapshot();
baseSnapshot = takeSnapshot();
}
- lastOpen = new LocalProxyTransaction(this, txId,
- (CursorAwareDataTreeModification) baseSnapshot.newModification());
+ if (snapshotOnly) {
+ return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
+ }
+
+ lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
LOG.debug("Proxy {} open transaction {}", this, lastOpen);
return lastOpen;
}
@Override
ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
- return createClient(connection, getIdentifier());
+ return createClient(parent(), connection, getIdentifier());
}
@Override
void onTransactionAborted(final AbstractProxyTransaction tx) {
- Preconditions.checkState(tx.equals(lastOpen));
- lastOpen = null;
+ if (tx.equals(lastOpen)) {
+ lastOpen = null;
+ }
}
@Override
void onTransactionCompleted(final AbstractProxyTransaction tx) {
Verify.verify(tx instanceof LocalProxyTransaction);
-
- if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
- LOG.debug("Completed last sealed transaction {}", tx);
+ if (tx instanceof LocalReadWriteProxyTransaction) {
+ if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+ LOG.debug("Completed last sealed transaction {}", tx);
+ }
}
}
}
private static final class LocalSingle extends AbstractLocal {
- LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+ LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
final LocalHistoryIdentifier identifier, final DataTree dataTree) {
- super(connection, identifier, dataTree);
+ super(parent, connection, identifier, dataTree);
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId) {
- return new LocalProxyTransaction(this, txId,
- (CursorAwareDataTreeModification) takeSnapshot().newModification());
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final DataTreeSnapshot snapshot = takeSnapshot();
+ return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
+ new LocalReadWriteProxyTransaction(this, txId, snapshot);
}
@Override
ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
- return createSingle(connection, getIdentifier());
+ return createSingle(parent(), connection, getIdentifier());
}
}
private static final class Remote extends AbstractRemote {
- Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
- super(connection, identifier);
+ Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
+ final LocalHistoryIdentifier identifier) {
+ super(parent, connection, identifier);
+ }
+
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
}
@Override
ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
- return createClient(connection, getIdentifier());
+ return createClient(parent(), connection, getIdentifier());
}
}
private static final class RemoteSingle extends AbstractRemote {
- RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+ RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
final LocalHistoryIdentifier identifier) {
- super(connection, identifier);
+ super(parent, connection, identifier);
+ }
+
+ @Override
+ AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+ final TransactionIdentifier txId, final boolean snapshotOnly) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
}
@Override
ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
- return createSingle(connection, getIdentifier());
+ return createSingle(parent(), connection, getIdentifier());
}
}
for (AbstractProxyTransaction t : proxies.values()) {
LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
+ final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
+ t.isSnapshotOnly());
LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
t.replayMessages(newProxy, previousEntries);
}
private final Lock lock = new ReentrantLock();
private final LocalHistoryIdentifier identifier;
private final AbstractClientConnection<ShardBackendInfo> connection;
+ private final AbstractClientHistory parent;
@GuardedBy("lock")
private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
@GuardedBy("lock")
private ProxyHistory successor;
- private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
- final LocalHistoryIdentifier identifier) {
+ private ProxyHistory(final AbstractClientHistory parent,
+ final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
+ this.parent = Preconditions.checkNotNull(parent);
this.connection = Preconditions.checkNotNull(connection);
this.identifier = Preconditions.checkNotNull(identifier);
}
- static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
- final LocalHistoryIdentifier identifier) {
+ static ProxyHistory createClient(final AbstractClientHistory parent,
+ final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
- return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
- : new Remote(connection, identifier);
+ return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
+ : new Remote(parent, connection, identifier);
}
- static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
+ static ProxyHistory createSingle(final AbstractClientHistory parent,
+ final AbstractClientConnection<ShardBackendInfo> connection,
final LocalHistoryIdentifier identifier) {
final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
- return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
- : new RemoteSingle(connection, identifier);
+ return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
+ : new RemoteSingle(parent, connection, identifier);
}
@Override
return connection.localActor();
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+ final AbstractClientHistory parent() {
+ return parent;
+ }
+
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ final boolean snapshotOnly) {
lock.lock();
try {
if (successor != null) {
- return successor.createTransactionProxy(txId);
+ return successor.createTransactionProxy(txId, snapshotOnly);
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
- final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
+ final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
}
}
+ final void close() {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.close();
+ return;
+ }
+
+ LOG.debug("Proxy {} invoking destroy", this);
+ connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
+ this::onDestroyComplete);
+ } finally {
+ lock.unlock();
+ }
+ }
+
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
connection.sendRequest(request, callback);
}
@GuardedBy("lock")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
- TransactionIdentifier txId);
+ TransactionIdentifier txId, boolean snapshotOnly);
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
return new ReconnectCohort();
}
+ private void onDestroyComplete(final Response<?, ?> response) {
+ LOG.debug("Proxy {} destroy completed with {}", this, response);
+
+ lock.lock();
+ try {
+ parent.onProxyDestroyed(this);
+ connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
+ this::onPurgeComplete);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void onPurgeComplete(final Response<?, ?> response) {
+ LOG.debug("Proxy {} purge completed with {}", this, response);
+ }
+
@GuardedBy("lock")
void onTransactionAborted(final AbstractProxyTransaction tx) {
// No-op for most implementations