*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
import java.util.function.Consumer;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
-import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
/**
- * A read-only specialization of {@link LocalProxyTransaction}.
+ * A read-only specialization of {@link LocalProxyTransaction}. This class is NOT thread-safe.
*
* @author Robert Varga
*/
-@NotThreadSafe
final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
- private static final Logger LOG = LoggerFactory.getLogger(LocalReadOnlyProxyTransaction.class);
-
private final DataTreeSnapshot snapshot;
LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final DataTreeSnapshot snapshot) {
- super(parent, identifier);
- this.snapshot = Preconditions.checkNotNull(snapshot);
+ super(parent, identifier, false);
+ this.snapshot = requireNonNull(snapshot);
+ }
+
+ LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
+ super(parent, identifier, true);
+ // It is an error to touch snapshot once we are DONE
+ snapshot = null;
}
@Override
@Override
DataTreeSnapshot readOnlyView() {
- return snapshot;
+ return verifyNotNull(snapshot, "Transaction %s is DONE", getIdentifier());
}
@Override
void doDelete(final YangInstanceIdentifier path) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ throw new UnsupportedOperationException("doDelete");
}
@Override
- void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) {
+ throw new UnsupportedOperationException("doMerge");
}
@Override
- void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) {
+ throw new UnsupportedOperationException("doWrite");
}
@Override
CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ throw new UnsupportedOperationException("commitRequest");
}
@Override
- void doSeal() {
+ Optional<ModifyTransactionRequest> flushState() {
// No-op
+ return Optional.empty();
}
@Override
- void flushState(final AbstractProxyTransaction successor) {
- // No-op
- }
-
- @Override
- void applyModifyTransactionRequest(final ModifyTransactionRequest request,
+ void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
final Consumer<Response<?, ?>> callback) {
- Verify.verify(request.getModifications().isEmpty());
-
- final PersistenceProtocol protocol = request.getPersistenceProtocol().get();
- Verify.verify(protocol == PersistenceProtocol.ABORT);
+ commonModifyTransactionRequest(request);
abort();
}
@Override
- void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) {
- if (request instanceof CommitLocalTransactionRequest) {
- final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
- final DataTreeModification mod = req.getModification();
-
- LOG.debug("Applying modification {} to successor {}", mod, successor);
- mod.applyToCursor(new AbstractDataTreeModificationCursor() {
- @Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.write(current().node(child), data);
- }
-
- @Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.merge(current().node(child), data);
- }
-
- @Override
- public void delete(final PathArgument child) {
- successor.delete(current().node(child));
- }
- });
-
- successor.seal();
-
- final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
- successor.sendRequest(successorReq, callback);
- } else if (request instanceof AbortLocalTransactionRequest) {
- LOG.debug("Forwarding abort {} to successor {}", request, successor);
- successor.abort();
- } else {
- throw new IllegalArgumentException("Unhandled request" + request);
- }
+ void replayModifyTransactionRequest(final ModifyTransactionRequest request,
+ final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ commonModifyTransactionRequest(request);
+ enqueueAbort(callback, enqueuedTicks);
+ }
+
+ private static void commonModifyTransactionRequest(final ModifyTransactionRequest request) {
+ verify(request.getModifications().isEmpty());
+
+ final PersistenceProtocol protocol = request.getPersistenceProtocol().orElseThrow();
+ verify(protocol == PersistenceProtocol.ABORT);
}
}