.child(TopologyTypes.class)
.augmentation(TopologyTypes1.class);
- private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
+ private final Map<EventSourceTopic, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
new ConcurrentHashMap<>();
private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
- new ConcurrentHashMap<>();;
+ new ConcurrentHashMap<>();
private final DataBroker dataBroker;
private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
}
public void register(final EventSource eventSource){
- NodeKey nodeKey = eventSource.getSourceNodeKey();
+ NodeKey nodeKey = eventSource.getSourceNodeKey();
final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
reg.registerPath(NodeContext.class, sourcePath);
routedRpcRegistrations.put(nodeKey,reg);
insert(sourcePath);
+
+ for(EventSourceTopic est : topicListenerRegistrations.keySet()){
+ est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
+ }
}
public void unRegister(final EventSource eventSource){
<configuration>
<instructions>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
- <Export-package>org.opendaylight.controller.sal.binding.spi.*,</Export-package>
+ <Export-Package>
+ org.opendaylight.controller.sal.binding.spi.*,
+ org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.binding.impl.rev131028.*,
+ org.opendaylight.controller.config.yang.md.sal.binding.impl
+ </Export-Package>
<Private-Package>
- org.opendaylight.controller.config.yang.md.sal.binding.impl,
org.opendaylight.controller.sal.binding.impl,
org.opendaylight.controller.sal.binding.impl.*,
org.opendaylight.controller.sal.binding.codegen,
org.opendaylight.controller.md.sal.binding.compat,
org.opendaylight.controller.md.sal.binding.spi,
<!--org.opendaylight.controller.sal.binding.dom.*,-->
- org.opendaylight.controller.sal.binding.osgi.*,
- org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.binding.impl.rev131028.*
+ org.opendaylight.controller.sal.binding.osgi.*
</Private-Package>
</instructions>
</configuration>
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
/**
* This executor is used to execute Future listener callback Runnables async.
*/
- private final ExecutorService clientFutureCallbackExecutor;
+ private final Executor clientFutureCallbackExecutor;
- /**
- * This executor is re-used internally in calls to Futures#addCallback to avoid the overhead
- * of Futures#addCallback creating a MoreExecutors#sameThreadExecutor for each call.
- */
- private final ExecutorService internalFutureCallbackExecutor = new SimpleSameThreadExecutor();
-
- public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, ExecutorService listenableFutureExecutor) {
+ public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
super(datastores);
this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
}
}
@Override
- public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
+ protected CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
Preconditions.checkArgument(transaction != null, "Transaction must not be null.");
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
}
}
}
};
ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(canCommitFuture, futureCallback, MoreExecutors.directExecutor());
}
private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
- Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(preCommitFuture, this, MoreExecutors.directExecutor());
}
}
};
ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
- Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(preCommitFuture, futureCallback, MoreExecutors.directExecutor());
}
private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
clientSubmitFuture.set();
} else {
ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
- Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
+ Futures.addCallback(commitFuture, this, MoreExecutors.directExecutor());
}
}
};
ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
- Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
+ Futures.addCallback(commitFuture, futureCallback, MoreExecutors.directExecutor());
}
- private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
+ private static void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
final String phase, final TransactionCommitFailedExceptionMapper exMapper,
final Throwable t) {
- if(clientSubmitFuture.isDone()) {
+ if (clientSubmitFuture.isDone()) {
// We must have had failures from multiple cohorts.
return;
}
LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
- Exception e;
- if(t instanceof Exception) {
+ final Exception e;
+ if (t instanceof Exception) {
e = (Exception)t;
} else {
e = new RuntimeException("Unexpected error occurred", t);
@SuppressWarnings("unchecked")
ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
int i = 0;
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
canCommitFutures[i++] = cohort.abort();
}
// what's interesting to the client.
clientSubmitFuture.setException(clientException);
}
- }, internalFutureCallbackExecutor);
+ }, MoreExecutors.directExecutor());
}
/**
*/
private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<Boolean>();
- private final ExecutorService listenerExecutor;
+ private final Executor listenerExecutor;
- AsyncNotifyingSettableFuture(ExecutorService listenerExecutor) {
- this.listenerExecutor = listenerExecutor;
+ AsyncNotifyingSettableFuture(Executor listenerExecutor) {
+ this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
}
@Override
}
}
}
-
- /**
- * A simple same-thread executor without the internal locking overhead that
- * MoreExecutors#sameThreadExecutor has. The #execute method is the only one of concern - we
- * don't shutdown the executor so the other methods irrelevant.
- */
- private static class SimpleSameThreadExecutor extends AbstractListeningExecutorService {
-
- @Override
- public void execute(Runnable command) {
- command.run();
- }
-
- @Override
- public boolean awaitTermination(long arg0, TimeUnit arg1) throws InterruptedException {
- return true;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- return null;
- }
- }
}
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+ return new TransactionProxy(actorContext, TransactionType.READ_ONLY);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
actorContext.acquireTxCreationPermit();
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+ return new TransactionProxy(actorContext, TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
actorContext.acquireTxCreationPermit();
- return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+ return new TransactionProxy(actorContext, TransactionType.READ_WRITE);
}
@Override
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+ return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
transactionId, transactionChainId, clientVersion);
}
"createSnapshot" + ++createSnapshotTransactionCounter);
ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
- TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+ TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
}
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import akka.actor.Props;
import akka.japi.Creator;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
this.shardActor = shardActor;
}
- ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
+ ActorRef newShardTransaction(TransactionType type, ShardTransactionIdentifier transactionID,
String transactionChainID, short clientVersion) {
final AbstractShardDataTreeTransaction<?> transaction;
switch (type) {
State localState = currentState;
checkReadyState(localState);
- return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
+ return new ChainedTransactionProxy(actorContext, TransactionType.READ_ONLY,
transactionChainId, localState.getPreviousReadyFutures());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
actorContext.acquireTxCreationPermit();
- return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
+ return allocateWriteTransaction(TransactionType.READ_WRITE);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
actorContext.acquireTxCreationPermit();
- return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
+ return allocateWriteTransaction(TransactionType.WRITE_ONLY);
}
@Override
actorContext.broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
}
- private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
+ private ChainedTransactionProxy allocateWriteTransaction(TransactionType type) {
State localState = currentState;
checkReadyState(localState);
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
public class TransactionContextImpl extends AbstractTransactionContext {
private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
- private final String transactionChainId;
private final ActorContext actorContext;
private final ActorSelection actor;
private final boolean isTxActorLocal;
private int totalBatchedModificationsSent;
protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
- String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
super(identifier);
this.actor = actor;
- this.transactionChainId = transactionChainId;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
+ return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, getIdentifier().getChainId());
}
private void batchModification(Modification modification) {
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
*/
public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
- public static enum TransactionType {
- READ_ONLY,
- WRITE_ONLY,
- READ_WRITE;
-
- // Cache all values
- private static final TransactionType[] VALUES = values();
-
- public static TransactionType fromInt(final int type) {
- try {
- return VALUES[type];
- } catch (IndexOutOfBoundsException e) {
- throw new IllegalArgumentException("In TransactionType enum value " + type, e);
- }
- }
- }
-
private static enum TransactionState {
OPEN,
READY,
private final TransactionType transactionType;
final ActorContext actorContext;
- private final String transactionChainId;
private final SchemaContext schemaContext;
private TransactionState state = TransactionState.OPEN;
}
public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
- super(createIdentifier(actorContext));
+ super(createIdentifier(actorContext, transactionChainId));
this.actorContext = Preconditions.checkNotNull(actorContext,
"actorContext should not be null");
this.transactionType = Preconditions.checkNotNull(transactionType,
"transactionType should not be null");
this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
"schemaContext should not be null");
- this.transactionChainId = transactionChainId;
LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
}
- private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
+ private static TransactionIdentifier createIdentifier(ActorContext actorContext, String transactionChainId) {
String memberName = actorContext.getCurrentMemberName();
if (memberName == null) {
memberName = "UNKNOWN-MEMBER";
}
- return new TransactionIdentifier(memberName, counter.getAndIncrement());
+ return TransactionIdentifier.create(memberName, counter.getAndIncrement(), transactionChainId);
}
@VisibleForTesting
TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
- txFutureCallback.getShardName(), transactionChainId);
+ txFutureCallback.getShardName(), getTransactionChainId());
final OperationCallback.Reference operationCallbackRef =
new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
- txFutureCallback.getShardName(), transactionChainId);
+ txFutureCallback.getShardName(), getTransactionChainId());
final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
final Future<ActorSelection> future;
}
String getTransactionChainId() {
- return transactionChainId;
+ return getIdentifier().getChainId();
}
protected ActorContext getActorContext() {
if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
- transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ actorContext, isTxActorLocal, remoteTransactionVersion,
operationCompleter);
} else {
- return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
- actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ return new TransactionContextImpl(transactionActor, getIdentifier(),
+ actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+public enum TransactionType {
+ READ_ONLY,
+ WRITE_ONLY,
+ READ_WRITE;
+
+ // Cache all values
+ private static final TransactionType[] VALUES = values();
+
+ public static TransactionType fromInt(final int type) {
+ try {
+ return VALUES[type];
+ } catch (IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("In TransactionType enum value " + type, e);
+ }
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
private final String transactionPath;
public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
- String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
+ ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationCompleter operationCompleter) {
- super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
- remoteTransactionVersion, operationCompleter);
+ super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
this.transactionPath = transactionPath;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A TransactionIdentifier which is tied to a backend transaction chain.
+ */
+public class ChainedTransactionIdentifier extends TransactionIdentifier {
+ private final String chainId;
+
+ public ChainedTransactionIdentifier(final String memberName, final long counter, final String chainId) {
+ super(memberName, counter);
+ this.chainId = Preconditions.checkNotNull(chainId);
+ }
+
+ @Override
+ public String getChainId() {
+ return chainId;
+ }
+}
package org.opendaylight.controller.cluster.datastore.identifiers;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
public class TransactionIdentifier {
private static final String TX_SEPARATOR = "-txn-";
this.counter = counter;
}
+ public String getChainId() {
+ return "";
+ }
+
+ public static TransactionIdentifier create(String memberName, long counter, String chainId) {
+ if (Strings.isNullOrEmpty(chainId)) {
+ return new TransactionIdentifier(memberName, counter);
+ } else {
+ return new ChainedTransactionIdentifier(memberName, counter, chainId);
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
shard.tell(new CreateTransaction("txn-1",
- TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+ TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
CreateTransactionReply.class);
waitUntilLeader(shard);
shard.tell(new CreateTransaction("txn-1",
- TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
+ TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
getRef());
CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
// Create a read Tx on the same chain.
- shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
+ shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
transactionChainID).toSerializable(), getRef());
CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
public void testGetIdentifier() {
setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- TransactionProxy.TransactionType.READ_ONLY);
+ TransactionType.READ_ONLY);
Object id = transactionProxy.getIdentifier();
assertNotNull("getIdentifier returned null", id);
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.util.Timeout;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardTest;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy;
+import org.opendaylight.controller.cluster.datastore.TransactionType;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
String transactionID = "txn-1";
shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
.setTransactionId(transactionID)
- .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+ .setTransactionType(TransactionType.WRITE_ONLY.ordinal())
.setTransactionChainId("").build(), getRef());
final FiniteDuration duration = duration("5 seconds");
String transactionID = "txn-1";
shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
.setTransactionId(transactionID)
- .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+ .setTransactionType(TransactionType.WRITE_ONLY.ordinal())
.setTransactionChainId("").build(), getRef());
final FiniteDuration duration = duration("5 seconds");
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Delete;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
@Override
public Config getConfig() {
final Config config = new Config();
+
final Get get = new Get();
get.setReceivedRequests(stats.getConfigGet());
+ get.setSuccessfulResponses(stats.getSuccessGetConfig());
+ get.setFailedResponses(stats.getFailureGetConfig());
config.setGet(get);
+
final Post post = new Post();
post.setReceivedRequests(stats.getConfigPost());
+ post.setSuccessfulResponses(stats.getSuccessPost());
+ post.setFailedResponses(stats.getFailurePost());
config.setPost(post);
+
final Put put = new Put();
put.setReceivedRequests(stats.getConfigPut());
+ put.setSuccessfulResponses(stats.getSuccessPut());
+ put.setFailedResponses(stats.getFailurePut());
config.setPut(put);
+
+ final Delete delete = new Delete();
+ delete.setReceivedRequests(stats.getConfigDelete());
+ delete.setSuccessfulResponses(stats.getSuccessDelete());
+ delete.setFailedResponses(stats.getFailureDelete());
+ config.setDelete(delete);
+
return config;
}
final Operational operational = new Operational();
final Get get = new Get();
get.setReceivedRequests(opGet);
+ get.setSuccessfulResponses(stats.getSuccessGetOperational());
+ get.setFailedResponses(stats.getFailureGetOperational());
operational.setGet(get);
return operational;
}
final BigInteger rpcInvoke = stats.getRpc();
final Rpcs rpcs = new Rpcs();
rpcs.setReceivedRequests(rpcInvoke);
- return rpcs ;
+ return rpcs;
}
-}
+}
\ No newline at end of file
import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.controller.sal.rest.api.RestconfService;
AtomicLong configPost = new AtomicLong();
AtomicLong configPut = new AtomicLong();
AtomicLong configDelete = new AtomicLong();
+ AtomicLong successGetConfig = new AtomicLong();
+ AtomicLong successGetOperational = new AtomicLong();
+ AtomicLong successPost = new AtomicLong();
+ AtomicLong successPut = new AtomicLong();
+ AtomicLong successDelete = new AtomicLong();
+ AtomicLong failureGetConfig = new AtomicLong();
+ AtomicLong failureGetOperational = new AtomicLong();
+ AtomicLong failurePost = new AtomicLong();
+ AtomicLong failurePut = new AtomicLong();
+ AtomicLong failureDelete = new AtomicLong();
private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
@Override
public NormalizedNodeContext readConfigurationData(final String identifier, final UriInfo uriInfo) {
configGet.incrementAndGet();
- return delegate.readConfigurationData(identifier, uriInfo);
+ NormalizedNodeContext normalizedNodeContext = null;
+ try {
+ normalizedNodeContext = delegate.readConfigurationData(identifier, uriInfo);
+ if (normalizedNodeContext.getData() != null) {
+ successGetConfig.incrementAndGet();
+ }
+ else {
+ failureGetConfig.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failureGetConfig.incrementAndGet();
+ throw e;
+ }
+ return normalizedNodeContext;
}
@Override
public NormalizedNodeContext readOperationalData(final String identifier, final UriInfo uriInfo) {
operationalGet.incrementAndGet();
- return delegate.readOperationalData(identifier, uriInfo);
+ NormalizedNodeContext normalizedNodeContext = null;
+ try {
+ normalizedNodeContext = delegate.readOperationalData(identifier, uriInfo);
+ if (normalizedNodeContext.getData() != null) {
+ successGetOperational.incrementAndGet();
+ }
+ else {
+ failureGetOperational.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failureGetOperational.incrementAndGet();
+ throw e;
+ }
+ return normalizedNodeContext;
}
@Override
public Response updateConfigurationData(final String identifier, final NormalizedNodeContext payload) {
configPut.incrementAndGet();
- return delegate.updateConfigurationData(identifier, payload);
+ Response response = null;
+ try {
+ response = delegate.updateConfigurationData(identifier, payload);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successPut.incrementAndGet();
+ }
+ else {
+ failurePut.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failurePut.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public Response createConfigurationData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) {
configPost.incrementAndGet();
- return delegate.createConfigurationData(identifier, payload, uriInfo);
+ Response response = null;
+ try {
+ response = delegate.createConfigurationData(identifier, payload, uriInfo);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successPost.incrementAndGet();
+ }
+ else {
+ failurePost.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failurePost.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public Response createConfigurationData(final NormalizedNodeContext payload, final UriInfo uriInfo) {
configPost.incrementAndGet();
- return delegate.createConfigurationData(payload, uriInfo);
+ Response response = null;
+ try {
+ response = delegate.createConfigurationData(payload, uriInfo);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successPost.incrementAndGet();
+ }
+ else {
+ failurePost.incrementAndGet();
+ }
+ }catch (Exception e) {
+ failurePost.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public Response deleteConfigurationData(final String identifier) {
- return delegate.deleteConfigurationData(identifier);
+ configDelete.incrementAndGet();
+ Response response = null;
+ try {
+ response = delegate.deleteConfigurationData(identifier);
+ if (response.getStatus() == Status.OK.getStatusCode()) {
+ successDelete.incrementAndGet();
+ }
+ else {
+ failureDelete.incrementAndGet();
+ }
+ } catch (Exception e) {
+ failureDelete.incrementAndGet();
+ throw e;
+ }
+ return response;
}
@Override
public BigInteger getRpc() {
return BigInteger.valueOf(rpc.get());
}
-}
+
+ public BigInteger getSuccessGetConfig() {
+ return BigInteger.valueOf(successGetConfig.get());
+ }
+
+ public BigInteger getSuccessGetOperational() {
+ return BigInteger.valueOf(successGetOperational.get());
+ }
+
+ public BigInteger getSuccessPost() {
+ return BigInteger.valueOf(successPost.get());
+ }
+
+ public BigInteger getSuccessPut() {
+ return BigInteger.valueOf(successPut.get());
+ }
+
+ public BigInteger getSuccessDelete() {
+ return BigInteger.valueOf(successDelete.get());
+ }
+
+ public BigInteger getFailureGetConfig() {
+ return BigInteger.valueOf(failureGetConfig.get());
+ }
+
+ public BigInteger getFailureGetOperational() {
+ return BigInteger.valueOf(failureGetOperational.get());
+ }
+
+ public BigInteger getFailurePost() {
+ return BigInteger.valueOf(failurePost.get());
+ }
+
+ public BigInteger getFailurePut() {
+ return BigInteger.valueOf(failurePut.get());
+ }
+
+ public BigInteger getFailureDelete() {
+ return BigInteger.valueOf(failureDelete.get());
+ }
+}
\ No newline at end of file
leaf received-requests {
type uint64;
}
+
+ leaf successful-responses {
+ type uint64;
+ }
+
+ leaf failed-responses {
+ type uint64;
+ }
}
augment "/config:modules/config:module/config:configuration" {
container put {
uses statistics;
}
+
+ container delete {
+ uses statistics;
+ }
}
container operational {