import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Future;
/**
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void executeModification(final AbstractModification modification, final Boolean havePermit) {
+ public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
incrementModificationCount();
if (operationError == null) {
try {
- modification.apply(getWriteDelegate());
+ getWriteDelegate().delete(path);
+ } catch (Exception e) {
+ operationError = e;
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final Boolean havePermit) {
+ incrementModificationCount();
+ if (operationError == null) {
+ try {
+ getWriteDelegate().merge(path, data);
+ } catch (Exception e) {
+ operationError = e;
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final Boolean havePermit) {
+ incrementModificationCount();
+ if (operationError == null) {
+ try {
+ getWriteDelegate().write(path, data);
} catch (Exception e) {
operationError = e;
}
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
return akka.dispatch.Futures.failed(failure);
}
- @Override
- public void executeModification(final AbstractModification modification, final Boolean havePermit) {
- LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
- modification.getClass().getSimpleName(), modification.getPath());
- }
-
@Override
public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
final Boolean havePermit) {
proxyFuture.setException(new ReadFailedException("Error executeRead " + readCmd.getClass().getSimpleName()
+ " for path " + readCmd.getPath(), t));
}
+
+ @Override
+ public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
+ LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
+ }
+
+ @Override
+ public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final Boolean havePermit) {
+ LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
+ }
+
+ @Override
+ public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final Boolean havePermit) {
+ LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
+ }
}
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(),
actorUtils.getTransactionCommitOperationTimeout());
- sent.onComplete(new OnComplete<Object>() {
+ sent.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
if (failure != null) {
}
@Override
- public void executeModification(final AbstractModification modification, final Boolean havePermit) {
- LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
- modification.getClass().getSimpleName(), modification.getPath());
+ public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
+ LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
+ executeModification(new DeleteModification(path), havePermit);
+ }
+
+ @Override
+ public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final Boolean havePermit) {
+ LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
+ executeModification(new MergeModification(path, data), havePermit);
+ }
+
+ @Override
+ public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+ final Boolean havePermit) {
+ LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
+ executeModification(new WriteModification(path, data), havePermit);
+ }
+ private void executeModification(final AbstractModification modification, final Boolean havePermit) {
final boolean permitToRelease;
if (havePermit == null) {
permitToRelease = failedModification == null && acquireOperation();
final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue();
sendBatchedModifications();
- OnComplete<Object> onComplete = new OnComplete<Object>() {
+ OnComplete<Object> onComplete = new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object response) {
// We have previously acquired an operation, now release it, no matter what happened
import java.util.Optional;
import java.util.SortedSet;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Future;
/*
Future<ActorSelection> readyTransaction(Boolean havePermit, Optional<SortedSet<String>> participatingShardNames);
- void executeModification(AbstractModification modification, Boolean havePermit);
-
<T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise, Boolean havePermit);
+ void executeDelete(YangInstanceIdentifier path, Boolean havePermit);
+
+ void executeMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data, Boolean havePermit);
+
+ void executeWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data, Boolean havePermit);
+
Future<Object> directCommit(Boolean havePermit);
/**
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * A TransactionOperation to apply a specific modification. Subclasses provide type capture of required data, so that
+ * we instantiate AbstractModification subclasses for the bare minimum time required.
+ */
+abstract class TransactionModificationOperation extends TransactionOperation {
+ private abstract static class AbstractDataOperation extends TransactionModificationOperation {
+ private final NormalizedNode<?, ?> data;
+
+ AbstractDataOperation(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ super(path);
+ this.data = requireNonNull(data);
+ }
+
+ final NormalizedNode<?, ?> data() {
+ return data;
+ }
+ }
+
+ static final class DeleteOperation extends TransactionModificationOperation {
+ DeleteOperation(final YangInstanceIdentifier path) {
+ super(path);
+ }
+
+ @Override
+ protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeDelete(path(), havePermit);
+ }
+ }
+
+ static final class MergeOperation extends AbstractDataOperation {
+ MergeOperation(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ super(path, data);
+ }
+
+ @Override
+ protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeMerge(path(), data(), havePermit);
+ }
+ }
+
+ static final class WriteOperation extends AbstractDataOperation {
+ WriteOperation(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ super(path, data);
+ }
+
+ @Override
+ protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeWrite(path(), data(), havePermit);
+ }
+ }
+
+ private final YangInstanceIdentifier path;
+
+ TransactionModificationOperation(final YangInstanceIdentifier path) {
+ this.path = requireNonNull(path);
+ }
+
+ final YangInstanceIdentifier path() {
+ return path;
+ }
+}
import java.util.TreeMap;
import java.util.TreeSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
+import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
public void delete(final YangInstanceIdentifier path) {
checkModificationState("delete", path);
- executeModification(new DeleteModification(path));
+ executeModification(new DeleteOperation(path));
}
@Override
public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
checkModificationState("merge", path);
- executeModification(new MergeModification(path, data));
+ executeModification(new MergeOperation(path, data));
}
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
checkModificationState("write", path);
- executeModification(new WriteModification(path, data));
+ executeModification(new WriteOperation(path, data));
}
- private void executeModification(final AbstractModification modification) {
- final TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.executeModification(modification, havePermit);
- }
- });
+ private void executeModification(final TransactionModificationOperation operation) {
+ getContextWrapper(operation.path()).maybeExecuteTransactionOperation(operation);
}
private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.empty();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
- null);
+ localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.empty();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
- null);
+ localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
@Test
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.empty();
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
+ localTransactionContext.executeDelete(yangInstanceIdentifier, null);
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
-
@Test
public void testRead() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.empty();
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
- null);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
- null);
+ localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
+ localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
- null);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
- null);
+ localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
+ localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
+ localTransactionContext.executeDelete(yangInstanceIdentifier, null);
+ localTransactionContext.executeDelete(yangInstanceIdentifier, null);
verify(readWriteTransaction).delete(yangInstanceIdentifier);
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import scala.concurrent.Await;
import scala.concurrent.Future;
private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
0), 0);
- private static final DeleteModification DELETE = new DeleteModification(DataStoreVersions.CURRENT_VERSION);
private OperationLimiter limiter;
private RemoteTransactionContext txContext;
*/
@Test
public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
assertEquals(2, limiter.availablePermits());
final Future<Object> sendFuture = txContext.sendBatchedModifications();
assertEquals(2, msg.getModifications().size());
assertEquals(1, msg.getTotalMessagesSent());
sendReply(new Failure(new NullPointerException()));
- assertFuture(sendFuture, new OnComplete<Object>() {
+ assertFuture(sendFuture, new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
assertTrue(failure instanceof NullPointerException);
assertEquals(4, limiter.availablePermits());
// The transaction has failed, no throttling should occur
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
assertEquals(4, limiter.availablePermits());
// Executing a read should result in immediate failure
assertTrue(msg.isReady());
assertEquals(2, msg.getTotalMessagesSent());
sendReply(new Failure(new IllegalStateException()));
- assertFuture(commitFuture, new OnComplete<Object>() {
+ assertFuture(commitFuture, new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
assertTrue(failure instanceof IllegalStateException);
*/
@Test
public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
assertEquals(0, limiter.availablePermits());
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
// Last acquire should have failed ...
assertEquals(0, limiter.availablePermits());
assertEquals(1, msg.getTotalMessagesSent());
sendReply(new Failure(new NullPointerException()));
- assertFuture(future, new OnComplete<Object>() {
+ assertFuture(future, new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
assertTrue(failure instanceof NullPointerException);