<type>xml</type>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-netconf-connector</artifactId>
+ <version>${mdsal.version}</version>
+ <classifier>features</classifier>
+ <type>xml</type>
+ <scope>runtime</scope>
+ </dependency>
<!-- JMH Benchmark dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<type>xml</type>
<scope>runtime</scope>
</dependency>
+ <!-- Netconf connector features. When this is included, users can test the netconf connector using netconf-testtool -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>features-netconf-connector</artifactId>
+ <classifier>features</classifier>
+ <type>xml</type>
+ </dependency>
+
</dependencies>
<build>
import akka.actor.Props;
import akka.japi.Creator;
-
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
public class DataChangeListener extends AbstractUntypedActor {
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private volatile boolean notificationsEnabled = false;
+ private boolean notificationsEnabled = false;
public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>> listener) {
change = reply.getChange();
this.listener.onDataChanged(change);
- if(getSender() != null){
+ // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
+ // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
getSender().tell(new DataChangedReply(), getSelf());
}
}
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-
import com.google.common.base.Preconditions;
-
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@Override public void onDataChanged(
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- dataChangeListenerActor.tell(new DataChanged(schemaContext, change), null);
+ dataChangeListenerActor.tell(new DataChanged(schemaContext, change), ActorRef.noSender());
}
}
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
-
+import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
-
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
ActorRef cohortActor = getContext().actorOf(
ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
- getSender()
- .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
+ getSender().tell(new ReadyTransactionReply(
+ Serialization.serializedActorPath(cohortActor)).toSerializable(), getSelf());
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
private final ActorContext actorContext;
- private final List<Future<ActorPath>> cohortPathFutures;
- private volatile List<ActorPath> cohortPaths;
+ private final List<Future<ActorSelection>> cohortFutures;
+ private volatile List<ActorSelection> cohorts;
private final String transactionId;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
- List<Future<ActorPath>> cohortPathFutures, String transactionId) {
+ List<Future<ActorSelection>> cohortFutures, String transactionId) {
this.actorContext = actorContext;
- this.cohortPathFutures = cohortPathFutures;
+ this.cohortFutures = cohortFutures;
this.transactionId = transactionId;
}
- private Future<Void> buildCohortPathsList() {
+ private Future<Void> buildCohortList() {
- Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+ Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
actorContext.getActorSystem().dispatcher());
- return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+ return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
@Override
- public Void apply(Iterable<ActorPath> paths) {
- cohortPaths = Lists.newArrayList(paths);
+ public Void apply(Iterable<ActorSelection> actorSelections) {
+ cohorts = Lists.newArrayList(actorSelections);
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} successfully built cohort path list: {}",
- transactionId, cohortPaths);
+ transactionId, cohorts);
}
return null;
}
// extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
// and passed to us from upstream processing. If any one fails then we'll fail canCommit.
- buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ buildCohortList().onComplete(new OnComplete<Void>() {
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
} else {
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
- List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
- for(ActorPath actorPath : cohortPaths) {
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
+ for(ActorSelection cohort : cohorts) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
- ActorSelection cohort = actorContext.actorSelection(actorPath);
futureList.add(actorContext.executeOperationAsync(cohort, message));
}
// The cohort actor list should already be built at this point by the canCommit phase but,
// if not for some reason, we'll try to build it here.
- if(cohortPaths != null) {
+ if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
returnFuture);
} else {
- buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ buildCohortList().onComplete(new OnComplete<Void>() {
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
}
@VisibleForTesting
- List<Future<ActorPath>> getCohortPathFutures() {
- return Collections.unmodifiableList(cohortPathFutures);
+ List<Future<ActorSelection>> getCohortFutures() {
+ return Collections.unmodifiableList(cohortFutures);
}
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
private final String transactionChainId;
- private volatile List<Future<ActorPath>> cohortPathFutures = Collections.emptyList();
+ private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
public TransactionChainProxy(ActorContext actorContext) {
this.actorContext = actorContext;
return transactionChainId;
}
- public void onTransactionReady(List<Future<ActorPath>> cohortPathFutures){
- this.cohortPathFutures = cohortPathFutures;
+ public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
+ this.cohortFutures = cohortFutures;
}
public void waitTillCurrentTransactionReady(){
try {
Await.result(Futures
- .sequence(this.cohortPathFutures, actorContext.getActorSystem().dispatcher()),
+ .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
actorContext.getOperationDuration());
} catch (Exception e) {
throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
}
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
}
- cohortPathFutures.add(transactionContext.readyTransaction());
+ cohortFutures.add(transactionContext.readyTransaction());
}
if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohortPathFutures);
+ transactionChainProxy.onTransactionReady(cohortFutures);
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
void closeTransaction();
- Future<ActorPath> readyTransaction();
+ Future<ActorSelection> readyTransaction();
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
return actor;
}
- private String getResolvedCohortPath(String cohortPath) {
- return actorContext.resolvePath(actorPath, cohortPath);
- }
-
@Override
public void closeTransaction() {
if(LOG.isDebugEnabled()) {
}
@Override
- public Future<ActorPath> readyTransaction() {
+ public Future<ActorSelection> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
- return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
@Override
- public ActorPath apply(Iterable<Object> notUsed) {
+ public ActorSelection apply(Iterable<Object> notUsed) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
if(serializedReadyReply.getClass().equals(
ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(), serializedReadyReply);
-
- String resolvedCohortPath = getResolvedCohortPath(
- reply.getCohortPath().toString());
+ serializedReadyReply);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
- identifier, resolvedCohortPath);
- }
- return actorContext.actorFor(resolvedCohortPath);
+ return actorContext.actorSelection(reply.getCohortPath());
} else {
// Throwing an exception here will fail the Future.
}
@Override
- public Future<ActorPath> readyTransaction() {
+ public Future<ActorSelection> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called", identifier);
}
package org.opendaylight.controller.cluster.datastore.messages;
-import akka.actor.ActorPath;
-import akka.actor.ActorSystem;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class ReadyTransactionReply implements SerializableMessage {
public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class;
- private final ActorPath cohortPath;
+ private final String cohortPath;
- public ReadyTransactionReply(ActorPath cohortPath) {
+ public ReadyTransactionReply(String cohortPath) {
this.cohortPath = cohortPath;
}
- public ActorPath getCohortPath() {
+ public String getCohortPath() {
return cohortPath;
}
@Override
public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
- .setActorPath(cohortPath.toString()).build();
+ .setActorPath(cohortPath).build();
}
- public static ReadyTransactionReply fromSerializable(ActorSystem actorSystem,Object serializable){
- ShardTransactionMessages.ReadyTransactionReply o = (ShardTransactionMessages.ReadyTransactionReply) serializable;
- return new ReadyTransactionReply(
- actorSystem.actorFor(o.getActorPath()).path());
+ public static ReadyTransactionReply fromSerializable(Object serializable) {
+ ShardTransactionMessages.ReadyTransactionReply o =
+ (ShardTransactionMessages.ReadyTransactionReply) serializable;
+
+ return new ReadyTransactionReply(o.getActorPath());
+
}
}
actorSystem.shutdown();
}
- /**
- * @deprecated Need to stop using this method. There are ways to send a
- * remote ActorRef as a string which should be used instead of this hack
- *
- * @param primaryPath
- * @param localPathOfRemoteActor
- * @return
- */
- @Deprecated
- public String resolvePath(final String primaryPath,
- final String localPathOfRemoteActor) {
- StringBuilder builder = new StringBuilder();
- String[] primaryPathElements = primaryPath.split("/");
- builder.append(primaryPathElements[0]).append("//")
- .append(primaryPathElements[1]).append(primaryPathElements[2]);
- String[] remotePathElements = localPathOfRemoteActor.split("/");
- for (int i = 3; i < remotePathElements.length; i++) {
- builder.append("/").append(remotePathElements[i]);
- }
-
- return builder.toString();
-
- }
-
- public ActorPath actorFor(String path){
- return actorSystem.actorFor(path).path();
- }
-
public String getCurrentMemberName(){
return clusterWrapper.getCurrentMemberName();
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
@Override
protected ActorSelection match(Object in) {
if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- ActorPath cohortPath =
- ReadyTransactionReply.fromSerializable(getSystem(),in)
+ String cohortPath =
+ ReadyTransactionReply.fromSerializable(in)
.getCohortPath();
- return getSystem()
- .actorSelection(cohortPath);
+ return getSystem().actorSelection(cohortPath);
} else {
throw noMatch();
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
public class DataChangeListenerTest extends AbstractActorTest {
- private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap<>();
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap<>();
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap<>();
-
-
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
- createdData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
- return createdData;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
- updatedData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
- return updatedData;
-
- }
-
- @Override
- public Set<YangInstanceIdentifier> getRemovedPaths() {
- Set<YangInstanceIdentifier>ids = new HashSet();
- ids.add( CompositeModel.TEST_PATH);
- return ids;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
- originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
- return originalData;
- }
-
- @Override public NormalizedNode<?, ?> getOriginalSubtree() {
-
-
- return originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
- }
-
- @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testDataChangedWhenNotificationsAreEnabled(){
+ new JavaTestKit(getSystem()) {{
+ final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
+ final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+ final Props props = DataChangeListener.props(mockListener);
+ final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
- //fixme: need to have some valid data here
- return originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
- }
- }
+ // Let the DataChangeListener know that notifications should be enabled
+ subject.tell(new EnableNotification(true), getRef());
- private class MockDataChangeListener implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private boolean gotIt = false;
- private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
+ subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ getRef());
- @Override public void onDataChanged(
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- gotIt = true;this.change=change;
- }
+ expectMsgClass(DataChangedReply.class);
- public boolean gotIt() {
- return gotIt;
- }
- public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange(){
- return change;
- }
+ Mockito.verify(mockListener).onDataChanged(mockChangeEvent);
+ }};
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testDataChangedWhenNotificationsAreEnabled(){
+ public void testDataChangedWhenNotificationsAreDisabled(){
new JavaTestKit(getSystem()) {{
- final MockDataChangeListener listener = new MockDataChangeListener();
- final Props props = DataChangeListener.props(listener);
+ final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
+ final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+ final Props props = DataChangeListener.props(mockListener);
final ActorRef subject =
- getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
+ getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+
+ subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ getRef());
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
- // Let the DataChangeListener know that notifications should
- // be enabled
- subject.tell(new EnableNotification(true), getRef());
-
- subject.tell(
- new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
- getRef());
-
- final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
- // do not put code outside this method, will run afterwards
- @Override
- protected Boolean match(Object in) {
- if (in != null && in.getClass().equals(DataChangedReply.class)) {
-
- return true;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertTrue(out);
- assertTrue(listener.gotIt());
- assertNotNull(listener.getChange().getCreatedData());
-
expectNoMsg();
+
+ Mockito.verify(mockListener, Mockito.never()).onDataChanged(
+ Mockito.any(AsyncDataChangeEvent.class));
}
};
}};
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
- public void testDataChangedWhenNotificationsAreDisabled(){
+ public void testDataChangedWithNoSender(){
new JavaTestKit(getSystem()) {{
- final MockDataChangeListener listener = new MockDataChangeListener();
- final Props props = DataChangeListener.props(listener);
- final ActorRef subject =
- getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
+ final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
+ final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
+ final Props props = DataChangeListener.props(mockListener);
+ final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
+ // Let the DataChangeListener know that notifications should be enabled
+ subject.tell(new EnableNotification(true), ActorRef.noSender());
+
+ subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
+ ActorRef.noSender());
+
+ getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
new Within(duration("1 seconds")) {
@Override
protected void run() {
-
- subject.tell(
- new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()),
- getRef());
-
expectNoMsg();
}
};
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
-
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-
import scala.concurrent.Future;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
@SuppressWarnings("serial")
doReturn(getSystem()).when(actorContext).getActorSystem();
}
- private Future<ActorPath> newCohortPath() {
+ private Future<ActorSelection> newCohort() {
ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
- doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
- return Futures.successful(path);
+ ActorSelection actorSelection = getSystem().actorSelection(path);
+ return Futures.successful(actorSelection);
}
private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(int i = 1; i <= nCohorts; i++) {
- cohortPathFutures.add(newCohortPath());
+ cohortFutures.add(newCohort());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
}
private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
throws Exception {
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
- cohortPathFutures.add(newCohortPath());
- cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+ cohortFutures.add(newCohort());
+ cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
}
private void setupMockActorContext(Class<?> requestType, Object... responses) {
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
+import com.google.common.util.concurrent.CheckedFuture;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
return argThat(matcher);
}
- private Future<Object> readyTxReply(ActorPath path) {
+ private Future<Object> readyTxReply(String path) {
return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeOperation(eq(getSystem().actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
-
- doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
- anyString(), eq(actorRef.path().toString()));
-
- doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
-
return actorRef;
}
DeleteDataReply.SERIALIZABLE_CLASS);
}
- private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
- Object... expReplies) throws Exception {
+ private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+ Object... expReplies) throws Exception {
assertEquals("getReadyOperationFutures size", expReplies.length,
- proxy.getCohortPathFutures().size());
+ proxy.getCohortFutures().size());
int i = 0;
- for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+ for( Future<ActorSelection> future: proxy.getCohortFutures()) {
assertNotNull("Ready operation Future is null", future);
Object expReply = expReplies[i++];
- if(expReply instanceof ActorPath) {
- ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
- assertEquals("Cohort actor path", expReply, actual);
+ if(expReply instanceof ActorSelection) {
+ ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
} else {
// Expecting exception.
try {
doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
- verifyCohortPathFutures(proxy, actorRef.path());
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
@SuppressWarnings("unchecked")
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
- verifyCohortPathFutures(proxy, TestException.class);
+ verifyCohortFutures(proxy, TestException.class);
}
@SuppressWarnings("unchecked")
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
- verifyCohortPathFutures(proxy, TestException.class);
+ verifyCohortFutures(proxy, TestException.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+ verifyCohortFutures(proxy, PrimaryNotFoundException.class);
}
@SuppressWarnings("unchecked")
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortPathFutures(proxy, IllegalArgumentException.class);
+ verifyCohortFutures(proxy, IllegalArgumentException.class);
}
@Test
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import static org.mockito.Mockito.mock;
public class ActorContextTest extends AbstractActorTest{
- @Test
- public void testResolvePathForRemoteActor(){
- ActorContext actorContext =
- new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
- ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForLocalActor(){
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka://system/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
-
- System.out.println(actorContext
- .actorFor("akka://system/user/shardmanager/shard/transaction"));
- }
-
private static class MockShardManager extends UntypedActor {
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+ private final SnapshotBackedWriteTransaction transaction;
+ private final DOMStoreThreePhaseCommitCohort delegate;
+ private final DOMStoreTransactionChainImpl txChain;
+
+ protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+ final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
+ this.transaction = Preconditions.checkNotNull(transaction);
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.txChain = Preconditions.checkNotNull(txChain);
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return delegate.canCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegate.preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ ListenableFuture<Void> commitFuture = delegate.commit();
+ Futures.addCallback(commitFuture, new FutureCallback<Void>() {
+ @Override
+ public void onFailure(final Throwable t) {
+ txChain.onTransactionFailed(transaction, t);
+ }
+
+ @Override
+ public void onSuccess(final Void result) {
+ txChain.onTransactionCommited(transaction);
+ }
+ });
+ return commitFuture;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
+ private static abstract class State {
+ /**
+ * Allocate a new snapshot.
+ *
+ * @return A new snapshot
+ */
+ protected abstract DataTreeSnapshot getSnapshot();
+ }
+
+ private static final class Idle extends State {
+ private final InMemoryDOMDataStore store;
+
+ Idle(final InMemoryDOMDataStore store) {
+ this.store = Preconditions.checkNotNull(store);
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ return store.takeSnapshot();
+ }
+ }
+
+ /**
+ * We have a transaction out there.
+ */
+ private static final class Allocated extends State {
+ private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+ private final DOMStoreWriteTransaction transaction;
+ private volatile DataTreeSnapshot snapshot;
+
+ Allocated(final DOMStoreWriteTransaction transaction) {
+ this.transaction = Preconditions.checkNotNull(transaction);
+ }
+
+ public DOMStoreWriteTransaction getTransaction() {
+ return transaction;
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ final DataTreeSnapshot ret = snapshot;
+ Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+ return ret;
+ }
+
+ void setSnapshot(final DataTreeSnapshot snapshot) {
+ final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+ Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+ }
+ }
+
+ /**
+ * Chain is logically shut down, no further allocation allowed.
+ */
+ private static final class Shutdown extends State {
+ private final String message;
+
+ Shutdown(final String message) {
+ this.message = Preconditions.checkNotNull(message);
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ throw new IllegalStateException(message);
+ }
+ }
+
+ private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
+ private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
+ private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+ private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+ private final InMemoryDOMDataStore store;
+ private final Idle idleState;
+ private volatile State state;
+
+ DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
+ this.store = Preconditions.checkNotNull(store);
+ idleState = new Idle(store);
+ state = idleState;
+ }
+
+ private Entry<State, DataTreeSnapshot> getSnapshot() {
+ final State localState = state;
+ return new SimpleEntry<>(localState, localState.getSnapshot());
+ }
+
+ private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+ final State state = new Allocated(transaction);
+ return STATE_UPDATER.compareAndSet(this, expected, state);
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+ return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ Entry<State, DataTreeSnapshot> entry;
+ DOMStoreReadWriteTransaction ret;
+
+ do {
+ entry = getSnapshot();
+ ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
+ store.getDebugTransactions(), entry.getValue(), this);
+ } while (!recordTransaction(entry.getKey(), ret));
+
+ return ret;
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ Entry<State, DataTreeSnapshot> entry;
+ DOMStoreWriteTransaction ret;
+
+ do {
+ entry = getSnapshot();
+ ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
+ store.getDebugTransactions(), entry.getValue(), this);
+ } while (!recordTransaction(entry.getKey(), ret));
+
+ return ret;
+ }
+
+ @Override
+ protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ final State localState = state;
+ if (localState instanceof Allocated) {
+ final Allocated allocated = (Allocated)localState;
+ if (allocated.getTransaction().equals(tx)) {
+ final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+ if (!success) {
+ LOG.info("State already transitioned from {} to {}", localState, state);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+ final State localState = state;
+
+ if (localState instanceof Allocated) {
+ final Allocated allocated = (Allocated)localState;
+ final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+ Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+ allocated.setSnapshot(tree);
+ } else {
+ LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+ }
+
+ return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+ }
+
+ @Override
+ public void close() {
+ final State localState = state;
+
+ do {
+ Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+ if (FAILED.equals(localState)) {
+ LOG.debug("Ignoring user close in failed state");
+ return;
+ }
+ } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+ }
+
+ void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
+ LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
+ state = FAILED;
+ }
+
+ void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
+ // If the committed transaction was the one we allocated last,
+ // we clear it and the ready snapshot, so the next transaction
+ // allocated refers to the data tree directly.
+ final State localState = state;
+
+ if (!(localState instanceof Allocated)) {
+ LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+ return;
+ }
+
+ final Allocated allocated = (Allocated)localState;
+ final DOMStoreWriteTransaction tx = allocated.getTransaction();
+ if (!tx.equals(transaction)) {
+ LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+ return;
+ }
+
+ if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+ LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+ }
+ }
+}
\ No newline at end of file
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
@Override
public DOMStoreTransactionChain createTransactionChain() {
- return new DOMStoreTransactionChainImpl();
+ return new DOMStoreTransactionChainImpl(this);
}
@Override
}
}
- boolean getDebugTransactions() {
+ public final boolean getDebugTransactions() {
return debugTransactions;
}
+ final DataTreeSnapshot takeSnapshot() {
+ return dataTree.takeSnapshot();
+ }
+
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
return new ThreePhaseCommitImpl(tx, tree);
}
- private Object nextIdentifier() {
+ Object nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
- private class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
- @GuardedBy("this")
- private SnapshotBackedWriteTransaction allocatedTransaction;
- @GuardedBy("this")
- private DataTreeSnapshot readySnapshot;
- @GuardedBy("this")
- private boolean chainFailed = false;
-
- @GuardedBy("this")
- private void checkFailed() {
- Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
- }
-
- @GuardedBy("this")
- private DataTreeSnapshot getSnapshot() {
- checkFailed();
-
- if (allocatedTransaction != null) {
- Preconditions.checkState(readySnapshot != null, "Previous transaction %s is not ready yet", allocatedTransaction.getIdentifier());
- return readySnapshot;
- } else {
- return dataTree.takeSnapshot();
- }
- }
-
- @GuardedBy("this")
- private <T extends SnapshotBackedWriteTransaction> T recordTransaction(final T transaction) {
- allocatedTransaction = transaction;
- readySnapshot = null;
- return transaction;
- }
-
- @Override
- public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
- final DataTreeSnapshot snapshot = getSnapshot();
- return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
- }
-
- @Override
- public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
- final DataTreeSnapshot snapshot = getSnapshot();
- return recordTransaction(new SnapshotBackedReadWriteTransaction(nextIdentifier(),
- getDebugTransactions(), snapshot, this));
- }
-
- @Override
- public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
- final DataTreeSnapshot snapshot = getSnapshot();
- return recordTransaction(new SnapshotBackedWriteTransaction(nextIdentifier(),
- getDebugTransactions(), snapshot, this));
- }
-
- @Override
- protected synchronized void transactionAborted(final SnapshotBackedWriteTransaction tx) {
- if (tx.equals(allocatedTransaction)) {
- Preconditions.checkState(readySnapshot == null, "Unexpected abort of transaction %s with ready snapshot %s", tx, readySnapshot);
- allocatedTransaction = null;
- }
- }
-
- @Override
- protected synchronized DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
- Preconditions.checkState(tx.equals(allocatedTransaction), "Mis-ordered ready transaction %s last allocated was %s", tx, allocatedTransaction);
- if (readySnapshot != null) {
- // The snapshot should have been cleared
- LOG.warn("Uncleared snapshot {} encountered, overwritten with transaction {} snapshot {}", readySnapshot, tx, tree);
- }
-
- final DOMStoreThreePhaseCommitCohort cohort = InMemoryDOMDataStore.this.transactionReady(tx, tree);
- readySnapshot = tree;
- return new ChainedTransactionCommitImpl(tx, cohort, this);
- }
-
- @Override
- public void close() {
- // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
- // by the outer class.
- //listeningExecutor.shutdownNow();
- }
-
- protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
- final Throwable t) {
- chainFailed = true;
- }
-
- public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
- // If the committed transaction was the one we allocated last,
- // we clear it and the ready snapshot, so the next transaction
- // allocated refers to the data tree directly.
- if (transaction.equals(allocatedTransaction)) {
- if (readySnapshot == null) {
- LOG.warn("Transaction {} committed while no ready snapshot present", transaction);
- }
-
- allocatedTransaction = null;
- readySnapshot = null;
- }
- }
- }
-
- private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction transaction;
- private final DOMStoreThreePhaseCommitCohort delegate;
- private final DOMStoreTransactionChainImpl txChain;
-
- protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
- final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
- this.transaction = transaction;
- this.delegate = delegate;
- this.txChain = txChain;
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- return delegate.canCommit();
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- return delegate.preCommit();
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- return delegate.abort();
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- ListenableFuture<Void> commitFuture = delegate.commit();
- Futures.addCallback(commitFuture, new FutureCallback<Void>() {
- @Override
- public void onFailure(final Throwable t) {
- txChain.onTransactionFailed(transaction, t);
- }
-
- @Override
- public void onSuccess(final Void result) {
- txChain.onTransactionCommited(transaction);
- }
- });
- return commitFuture;
- }
- }
-
- private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
+ private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
private final SnapshotBackedWriteTransaction transaction;
private final DataTreeModification modification;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;
import javax.xml.stream.FactoryConfigurationError;
import org.opendaylight.controller.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
WebApplicationException {
InstanceIdentifierContext pathContext = t.getInstanceIdentifierContext();
if (t.getData() == null) {
- throw new RestconfDocumentedException(Response.Status.NOT_FOUND);
+ throw new RestconfDocumentedException(
+ "Request could not be completed because the relevant data model content does not exist.",
+ ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
}
XMLStreamWriter xmlWriter;