<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.catalina</artifactId>
- <version>7.0.53.v201406061610</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.catalina.ha</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.catalina.tribes</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.coyote</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.el</artifactId>
- <version>7.0.53.v201406060720</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.jasper</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.juli.extras</artifactId>
- <version>7.0.53.v201406060720</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.tomcat.api</artifactId>
- <version>7.0.53.v201406060720</version>
</dependency>
<dependency>
<groupId>orbit</groupId>
<artifactId>org.apache.tomcat.util</artifactId>
- <version>7.0.53.v201406070630</version>
</dependency>
<dependency>
<groupId>org.aopalliance</groupId>
<feature name="odl-base-tomcat" description="OpenDaylight Tomcat" version="7.0.53">
<feature>odl-base-gemini-web</feature>
<feature>odl-base-eclipselink-persistence</feature>
- <bundle start="true">mvn:orbit/org.apache.catalina/${commons.karaf.catalina}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.catalina/${commons.catalina}</bundle>
<bundle start="true">mvn:geminiweb/org.eclipse.gemini.web.tomcat/${geminiweb.version}</bundle>
- <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.karaf.catalina.ha}</bundle>
- <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.karaf.catalina.tribes}</bundle>
- <bundle start="true">mvn:orbit/org.apache.coyote/${commons.karaf.coyote}</bundle>
- <bundle start="true">mvn:orbit/org.apache.el/${commons.karaf.el}</bundle>
- <bundle start="true">mvn:orbit/org.apache.jasper/${commons.karaf.jasper}</bundle>
- <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.karaf.juli.version}</bundle>
- <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.karaf.tomcat.api}</bundle>
- <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.karaf.tomcat.util}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.catalina.ha/${commons.catalina.ha}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.catalina.tribes/${commons.catalina.tribes}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.coyote/${commons.coyote}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.el/${commons.el}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.jasper/${commons.jasper}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.juli.extras/${commons.juli.version}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.tomcat.api/${commons.tomcat.api}</bundle>
+ <bundle start="true">mvn:orbit/org.apache.tomcat.util/${commons.tomcat.util}</bundle>
<bundle start="true" >mvn:org.opendaylight.controller/karaf-tomcat-security/${karaf.security.version}</bundle>
<bundle start="true">wrap:mvn:virgomirror/org.eclipse.jdt.core.compiler.batch/${eclipse.jdt.core.compiler.batch.version}</bundle>
</feature>
<commmons.northbound.version>0.4.2-SNAPSHOT</commmons.northbound.version>
<!-- Third Party Versions -->
<codahale.metrics.version>3.0.1</codahale.metrics.version>
- <commons.catalina>7.0.32.v201211201336</commons.catalina>
- <commons.catalina.ha>7.0.32.v201211201952</commons.catalina.ha>
- <commons.catalina.tribes>7.0.32.v201211201952</commons.catalina.tribes>
- <commons.coyote>7.0.32.v201211201952</commons.coyote>
- <commons.el>7.0.32.v201211081135</commons.el>
- <commons.jasper>7.0.32.v201211201952</commons.jasper>
- <commons.juli.version>7.0.32.v201211081135</commons.juli.version>
- <commons.tomcat.api>7.0.32.v201211081135</commons.tomcat.api>
- <commons.tomcat.util>7.0.32.v201211201952</commons.tomcat.util>
- <commons.karaf.catalina>7.0.53.v201406061610</commons.karaf.catalina>
- <commons.karaf.catalina.ha>7.0.53.v201406070630</commons.karaf.catalina.ha>
- <commons.karaf.catalina.tribes>7.0.53.v201406070630</commons.karaf.catalina.tribes>
- <commons.karaf.coyote>7.0.53.v201406070630</commons.karaf.coyote>
- <commons.karaf.el>7.0.53.v201406060720</commons.karaf.el>
- <commons.karaf.jasper>7.0.53.v201406070630</commons.karaf.jasper>
- <commons.karaf.juli.version>7.0.53.v201406060720</commons.karaf.juli.version>
- <commons.karaf.tomcat.api>7.0.53.v201406060720</commons.karaf.tomcat.api>
- <commons.karaf.tomcat.util>7.0.53.v201406070630</commons.karaf.tomcat.util>
+ <commons.catalina>7.0.53.v201406061610</commons.catalina>
+ <commons.catalina.ha>7.0.53.v201406070630</commons.catalina.ha>
+ <commons.catalina.tribes>7.0.53.v201406070630</commons.catalina.tribes>
+ <commons.coyote>7.0.53.v201406070630</commons.coyote>
+ <commons.el>7.0.53.v201406060720</commons.el>
+ <commons.jasper>7.0.53.v201406070630</commons.jasper>
+ <commons.juli.version>7.0.53.v201406060720</commons.juli.version>
+ <commons.tomcat.api>7.0.53.v201406060720</commons.tomcat.api>
+ <commons.tomcat.util>7.0.53.v201406070630</commons.tomcat.util>
<commons.checkstyle.version>0.0.3-SNAPSHOT</commons.checkstyle.version>
<commons.fileupload.version>1.2.2</commons.fileupload.version>
# default Openflow version = 1.0, we also support 1.3.
# ovsdb.of.version=1.3
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
# TLS configuration
# To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
# The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
<groupId>org.opendaylight.controller.thirdparty</groupId>
<artifactId>net.sf.jung2</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.controller.thirdparty</groupId>
- <artifactId>org.apache.catalina.filters.CorsFilter</artifactId>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller.thirdparty</groupId>
<artifactId>org.openflow.openflowj</artifactId>
<phase>generate-resources</phase>
<configuration>
<outputDirectory>${project.build.directory}/configuration</outputDirectory>
- <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config</includeArtifactIds>
- <includes>**\/*.xml</includes>
+ <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config,sal-clustering-config</includeArtifactIds>
+ <includes>**\/*.xml,**/*.conf</includes>
<excludeTransitive>true</excludeTransitive>
<ignorePermissions>false</ignorePermissions>
</configuration>
# default Openflow version = 1.3, we also support 1.0.
ovsdb.of.version=1.3
+# ovsdb can be configured with ml2 to perform l3 forwarding. When used in that scenario, the mac address of the default
+# gateway --on the external subnet-- is expected to be resolved from its inet address. The config below overrides that
+# specific arp/neighDiscovery lookup.
+# ovsdb.l3gateway.mac=00:00:5E:00:02:01
+
# TLS configuration
# To enable TLS, set secureChannelEnabled=true and specify the location of controller Java KeyStore and TrustStore files.
# The Java KeyStore contains controller's private key and certificate. The Java TrustStore contains the trusted certificate
-Xmx*) jvmMaxMemory="$1"; shift;;
-D*) extraJVMOpts="${extraJVMOpts} $1"; shift;;
-X*) extraJVMOpts="${extraJVMOpts} $1"; shift;;
+ -J*) extraJVMOpts="${extraJVMOpts} -$(echo "$1" | cut -d'J' -f2)"; shift;;
-agentpath:*) agentPath="$1"; shift;;
"") break ;;
*) echo "Unknown option $1"; unknown_option=1; break ;;
*/
package org.opendaylight.controller.md.sal.binding.impl;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import javax.annotation.concurrent.GuardedBy;
-
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.yangtools.concepts.Delegator;
-import com.google.common.base.Preconditions;
-
class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator<DOMTransactionChain> {
private final DOMTransactionChain delegate;
-
- @GuardedBy("this")
- private final Map<AsyncTransaction<?, ?>, AsyncTransaction<?, ?>> delegateTxToBindingTx = new WeakHashMap<>();
private final BindingToNormalizedNodeCodec codec;
+ private final DelegateChainListener delegatingListener;
+ private final TransactionChainListener listener;
public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
- this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener));
+ this.delegatingListener = new DelegateChainListener();
+ this.listener = listener;
+ this.delegate = chainFactory.createTransactionChain(listener);
this.codec = codec;
}
public ReadOnlyTransaction newReadOnlyTransaction() {
DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction();
ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec);
- putDelegateToBinding(delegateTx, bindingTx);
return bindingTx;
}
@Override
public ReadWriteTransaction newReadWriteTransaction() {
DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
- ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec);
- putDelegateToBinding(delegateTx, bindingTx);
+ ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec) {
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ return listenForFailure(this,super.submit());
+ }
+
+ };
return bindingTx;
}
@Override
public WriteTransaction newWriteOnlyTransaction() {
- DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
- WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec);
- putDelegateToBinding(delegateTx, bindingTx);
+ final DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
+ WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<DOMDataWriteTransaction>(delegateTx, codec) {
+
+ @Override
+ public CheckedFuture<Void,TransactionCommitFailedException> submit() {
+ return listenForFailure(this,super.submit());
+ };
+
+ };
return bindingTx;
}
- @Override
- public void close() {
- delegate.close();
+ protected CheckedFuture<Void, TransactionCommitFailedException> listenForFailure(
+ final WriteTransaction tx, CheckedFuture<Void, TransactionCommitFailedException> future) {
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onFailure(Throwable t) {
+ failTransactionChain(tx,t);
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ // Intentionally NOOP
+ }
+ });
+
+ return future;
}
- private synchronized void putDelegateToBinding(final AsyncTransaction<?, ?> domTx,
- final AsyncTransaction<?, ?> bindingTx) {
- final Object previous = delegateTxToBindingTx.put(domTx, bindingTx);
- Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous);
+ protected void failTransactionChain(WriteTransaction tx, Throwable t) {
+ // We asume correct state change for underlaying transaction
+ // chain, so we are not changing any of our internal state
+ // to mark that we failed.
+ this.delegatingListener.onTransactionChainFailed(this, tx, t);
}
- private synchronized AsyncTransaction<?, ?> getBindingTransaction(final AsyncTransaction<?, ?> transaction) {
- return delegateTxToBindingTx.get(transaction);
+ @Override
+ public void close() {
+ delegate.close();
}
- private final class ListenerInvoker implements TransactionChainListener {
-
- private final TransactionChainListener listener;
-
- public ListenerInvoker(final TransactionChainListener listener) {
- this.listener = Preconditions.checkNotNull(listener, "Listener must not be null.");
- }
+ private final class DelegateChainListener implements TransactionChainListener {
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
- Preconditions.checkState(delegate.equals(chain),
- "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
- AsyncTransaction<?, ?> bindingTx = getBindingTransaction(transaction);
- listener.onTransactionChainFailed(chain, bindingTx, cause);
+ /*
+ * Intentionally NOOP, callback for failure, since we
+ * are also listening on each transaction for failure.
+ *
+ * by listening on submit future for Binding transaction
+ * in order to provide Binding transaction (which was seen by client
+ * of this transaction chain, instead of
+ */
}
@Override
actorSystem, actorSystem.actorOf(
ShardManager.props(type, cluster, configuration, datastoreContext).
withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
+
+ actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
}
public DistributedDataStore(ActorContext actorContext) {
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
Object result = actorContext.executeLocalShardOperation(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- ActorContext.ASK_DURATION);
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope));
if (result != null) {
RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
private final int maxShardDataChangeExecutorQueueSize;
private final int maxShardDataChangeExecutorPoolSize;
private final int shardTransactionIdleTimeoutInMinutes;
+ private final int operationTimeoutInSeconds;
public DistributedDataStoreProperties() {
maxShardDataChangeListenerQueueSize = 1000;
maxShardDataChangeExecutorQueueSize = 1000;
maxShardDataChangeExecutorPoolSize = 20;
shardTransactionIdleTimeoutInMinutes = 10;
+ operationTimeoutInSeconds = 5;
}
public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
- int shardTransactionIdleTimeoutInMinutes) {
+ int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+ this.operationTimeoutInSeconds = operationTimeoutInSeconds;
}
public int getMaxShardDataChangeListenerQueueSize() {
public int getShardTransactionIdleTimeoutInMinutes() {
return shardTransactionIdleTimeoutInMinutes;
}
+
+ public int getOperationTimeoutInSeconds() {
+ return operationTimeoutInSeconds;
+ }
}
ActorSelection cohort = actorContext.actorSelection(actorPath);
- futureList.add(actorContext.executeRemoteOperationAsync(cohort, message,
- ActorContext.ASK_DURATION));
+ futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
- ActorContext.ASK_DURATION);
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
// Send the ReadyTransaction message to the Tx actor.
final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
+ new ReadyTransaction().toSerializable());
// Combine all the previously recorded put/merge/delete operation reply Futures and the
// ReadyTransactionReply Future into one Future. If any one fails then the combined
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
+ new DeleteData(path).toSerializable() ));
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new MergeData(path, data, schemaContext).toSerializable(),
- ActorContext.ASK_DURATION));
+ new MergeData(path, data, schemaContext).toSerializable()));
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new WriteData(path, data, schemaContext).toSerializable(),
- ActorContext.ASK_DURATION));
+ new WriteData(path, data, schemaContext).toSerializable()));
}
@Override
};
Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
+ new ReadData(path).toSerializable());
readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
};
Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
- new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
+ new DataExists(path).toSerializable());
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
}
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- public static final FiniteDuration ASK_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
- public static final Duration AWAIT_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
+ private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
public static final String MAILBOX = "bounded-mailbox";
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
private volatile SchemaContext schemaContext;
+ private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+ private Timeout operationTimeout = new Timeout(operationDuration);
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
}
}
+ public void setOperationTimeout(int timeoutInSeconds) {
+ operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ }
+
public SchemaContext getSchemaContext() {
return schemaContext;
}
*/
public ActorRef findLocalShard(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindLocalShard(shardName), ASK_DURATION);
+ new FindLocalShard(shardName));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+ new FindPrimary(shardName).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
*
* @param actor
* @param message
- * @param duration
* @return The response of the operation
*/
- public Object executeLocalOperation(ActorRef actor, Object message,
- FiniteDuration duration) {
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ public Object executeLocalOperation(ActorRef actor, Object message) {
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
}
*
* @param actor
* @param message
- * @param duration
* @return
*/
- public Object executeRemoteOperation(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Object executeRemoteOperation(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+ throw new TimeoutException("Sending message " + message.getClass().toString() +
+ " to actor " + actor.toString() + " failed" , e);
}
}
*
* @param actor the ActorSelection
* @param message the message to send
- * @param duration the maximum amount of time to send he message
* @return a Future containing the eventual result
*/
- public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- return ask(actor, message, new Timeout(duration));
+ return ask(actor, message, operationTimeout);
}
/**
*
* @param shardName
* @param message
- * @param duration
* @return
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
* @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*/
- public Object executeShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeShardOperation(String shardName, Object message) {
ActorSelection primary = findPrimary(shardName);
- return executeRemoteOperation(primary, message, duration);
+ return executeRemoteOperation(primary, message);
}
/**
*
* @param shardName the name of the shard on which the operation needs to be executed
* @param message the message that needs to be sent to the shard
- * @param duration the time duration in which this operation should complete
* @return the message that was returned by the local actor on which the
* the operation was executed. If a local shard was not found then
* null is returned
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
* if the operation does not complete in a specified time duration
*/
- public Object executeLocalShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeLocalShardOperation(String shardName, Object message) {
ActorRef local = findLocalShard(shardName);
if(local != null) {
- return executeLocalOperation(local, message, duration);
+ return executeLocalOperation(local, message);
}
return null;
}
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
- new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
- props.getMaxShardDataChangeExecutorQueueSize(),
- props.getMaxShardDataChangeListenerQueueSize(),
- props.getShardTransactionIdleTimeoutInMinutes()));
+ new DistributedDataStoreProperties(
+ props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue(),
+ props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ props.getOperationTimeoutInSeconds().getValue()));
}
}
return DistributedDataStoreFactory.createInstance("operational",
getOperationalSchemaServiceDependency(),
- new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
- props.getMaxShardDataChangeExecutorQueueSize(),
- props.getMaxShardDataChangeListenerQueueSize(),
- props.getShardTransactionIdleTimeoutInMinutes()));
+ new DistributedDataStoreProperties(
+ props.getMaxShardDataChangeExecutorPoolSize().getValue(),
+ props.getMaxShardDataChangeExecutorQueueSize().getValue(),
+ props.getMaxShardDataChangeListenerQueueSize().getValue(),
+ props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+ props.getOperationTimeoutInSeconds().getValue()));
}
}
config:java-name-prefix DistributedOperationalDataStoreProvider;
}
+ typedef non-zero-uint16-type {
+ type uint16 {
+ range "1..max";
+ }
+ }
+
+ typedef operation-timeout-type {
+ type uint16 {
+ range "5..max";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-executor-pool-size {
default 20;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum thread pool size for each shard's data store data change notification executor.";
}
leaf max-shard-data-change-listener-queue-size {
default 1000;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum queue size for each shard's data store data change listeners.";
}
leaf shard-transaction-idle-timeout-in-minutes {
default 10;
- type uint16;
+ type non-zero-uint16-type;
description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
}
+
+ leaf operation-timeout-in-seconds {
+ default 5;
+ type operation-timeout-type;
+ description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
+ }
}
// Augments the 'configuration' choice node under modules/module.
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
+import akka.util.Timeout;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
).build();
+ Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
+
//This is done so that Modification list is updated which is used during commit
- Future future =
- akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+ Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
//ready transaction creates the cohort so that we get into the
//block where in commmit is done
ShardTransactionMessages.ReadyTransaction readyTransaction =
ShardTransactionMessages.ReadyTransaction.newBuilder().build();
- future =
- akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+ future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
//but when the message is sent it will have the MockCommit object
//so that we can simulate throwing of exception
when(mockModification.toSerializable()).thenReturn(
PersistentMessages.CompositeModification.newBuilder().build());
- future =
- akka.pattern.Patterns.ask(subject,
- mockForwardCommitTransaction
- , 3000);
+ future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
Await.result(future, ASK_RESULT_DURATION);
}
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.util.List;
import java.util.concurrent.ExecutionException;
}
stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
- isA(requestType), any(FiniteDuration.class));
+ isA(requestType));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
- any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
+ any(ActorSelection.class), isA(requestType));
}
private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.List;
import java.util.concurrent.TimeUnit;
return getSystem().actorSelection(actorRef.path());
}
- private FiniteDuration anyDuration() {
- return any(FiniteDuration.class);
- }
-
private CreateTransactionReply createTransactionReply(ActorRef actorRef){
return CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
- eqCreateTransaction(memberName, type), anyDuration());
+ eqCreateTransaction(memberName, type));
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
anyString(), eq(actorRef.path().toString()));
doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
READ_ONLY);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
throws Throwable {
doThrow(exToThrow).when(mockActorContext).executeShardOperation(
- anyString(), any(), anyDuration());
+ anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
}
}
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(expectedNode));
doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
READ_ONLY);
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
+ executeRemoteOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
} finally {
verify(mockActorContext, times(0)).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
+ eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+ eq(actorSelection(actorRef)), eqDeleteData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
transactionProxy.delete(TestModel.TEST_PATH);
verify(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
+ eq(actorSelection(actorRef)), eqDeleteData());
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
DeleteDataReply.SERIALIZABLE_CLASS);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite),
- anyDuration());
+ executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeRemoteOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
- anyString(), any(), anyDuration());
+ anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.successful(new Object())).when(mockActorContext).
executeRemoteOperationAsync(eq(actorSelection(actorRef)),
- isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), anyDuration());
+ eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_WRITE);
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+ Object out = actorContext.executeLocalShardOperation("default", "hello");
assertEquals("hello", out);
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.executeLocalShardOperation("default", "hello", duration("1 seconds"));
+ Object out = actorContext.executeLocalShardOperation("default", "hello");
assertNull(out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Object out = actorContext.executeRemoteOperation(actor, "hello", duration("3 seconds"));
+ Object out = actorContext.executeRemoteOperation(actor, "hello");
assertEquals("hello", out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello",
- Duration.create(3, TimeUnit.SECONDS));
+ Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
try {
Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
-import scala.concurrent.duration.FiniteDuration;
public class MockActorContext extends ActorContext {
@Override public Object executeShardOperation(String shardName,
- Object message, FiniteDuration duration) {
+ Object message) {
return executeShardOperationResponse;
}
@Override public Object executeRemoteOperation(ActorSelection actor,
- Object message, FiniteDuration duration) {
+ Object message) {
return executeRemoteOperationResponse;
}
@Override
public Object executeLocalOperation(ActorRef actor,
- Object message, FiniteDuration duration) {
+ Object message) {
return this.executeLocalOperationResponse;
}
@Override
public Object executeLocalShardOperation(String shardName,
- Object message, FiniteDuration duration) {
+ Object message) {
return this.executeLocalShardOperationResponse;
}
}
ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages",
- ActorContext.ASK_DURATION);
+ .executeLocalOperation(actorRef, "messages");
Assert.assertNotNull(messages);
package org.opendaylight.controller.config.yang.md.sal.rest.connector;
-import org.opendaylight.controller.sal.rest.impl.RestconfProviderImpl;
+import org.opendaylight.controller.sal.restconf.impl.RestconfProviderImpl;
public class RestConnectorModule extends org.opendaylight.controller.config.yang.md.sal.rest.connector.AbstractRestConnectorModule {
instance.setWebsocketPort(getWebsocketPort());
// Register it with the Broker
getDomBrokerDependency().registerProvider(instance);
+
+
+ getRootRuntimeBeanRegistratorWrapper().register(instance);
+
return instance;
}
}
import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
import org.opendaylight.controller.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.controller.sal.restconf.impl.StatisticsRestconfServiceWrapper;
public class RestconfApplication extends Application {
restconfImpl.setControllerContext(controllerContext);
singletons.add(controllerContext);
singletons.add(brokerFacade);
- singletons.add(restconfImpl);
+ singletons.add(StatisticsRestconfServiceWrapper.getInstance());
singletons.add(StructuredDataToXmlProvider.INSTANCE);
singletons.add(StructuredDataToJsonProvider.INSTANCE);
singletons.add(JsonToCompositeNodeProvider.INSTANCE);
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.math.BigInteger;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.slf4j.LoggerFactory;
public class RestconfImpl implements RestconfService {
+
private enum UriParameters {
PRETTY_PRINT("prettyPrint"),
DEPTH("depth");
}
}
+
+
private final static RestconfImpl INSTANCE = new RestconfImpl();
private static final int NOTIFICATION_PORT = 8181;
}
return false;
}
+
+ public BigInteger getOperationalReceived() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.rest.impl;
+package org.opendaylight.controller.sal.restconf.impl;
+import java.math.BigInteger;
import java.util.Collection;
import java.util.Collections;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Config;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Get;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Operational;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Post;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Put;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnectorRuntimeMXBean;
+import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.rest.api.RestConnector;
-import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
-import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
import org.opendaylight.controller.sal.streams.websockets.WebSocketServer;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector {
+public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnector, RestConnectorRuntimeMXBean {
public final static String NOT_INITALIZED_MSG = "Restconf is not initialized yet. Please try again later";
+ private final StatisticsRestconfServiceWrapper stats = StatisticsRestconfServiceWrapper.getInstance();
private ListenerRegistration<SchemaContextListener> listenerRegistration;
private PortNumber port;
public void setWebsocketPort(PortNumber port) {
}
webSocketServerThread.interrupt();
}
+
+ @Override
+ public Config getConfig() {
+ Config config = new Config();
+ Get get = new Get();
+ get.setReceivedRequests(stats.getConfigGet());
+ config.setGet(get);
+ Post post = new Post();
+ post.setReceivedRequests(stats.getConfigPost());
+ config.setPost(post);
+ Put put = new Put();
+ put.setReceivedRequests(stats.getConfigPut());
+ config.setPut(put);
+ return config;
+ }
+
+ @Override
+ public Operational getOperational() {
+ BigInteger opGet = stats.getOperationalGet();
+ Operational operational = new Operational();
+ Get get = new Get();
+ get.setReceivedRequests(opGet);
+ operational.setGet(get);
+ return operational;
+ }
+
+ @Override
+ public Rpcs getRpcs() {
+ BigInteger rpcInvoke = stats.getRpc();
+ Rpcs rpcs = new Rpcs();
+ rpcs.setReceivedRequests(rpcInvoke);
+ return rpcs ;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.sal.rest.api.RestconfService;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.Node;
+
+public class StatisticsRestconfServiceWrapper implements RestconfService {
+
+ AtomicLong operationalGet = new AtomicLong();
+ AtomicLong configGet = new AtomicLong();
+ AtomicLong rpc = new AtomicLong();
+ AtomicLong configPost = new AtomicLong();
+ AtomicLong configPut = new AtomicLong();
+ AtomicLong configDelete = new AtomicLong();
+
+ private static final StatisticsRestconfServiceWrapper INSTANCE = new StatisticsRestconfServiceWrapper(RestconfImpl.getInstance());
+
+ final RestconfService delegate;
+
+ private StatisticsRestconfServiceWrapper(RestconfService delegate) {
+ this.delegate = delegate;
+ }
+
+ public static StatisticsRestconfServiceWrapper getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public Object getRoot() {
+ return delegate.getRoot();
+ }
+
+ @Override
+ public StructuredData getModules(UriInfo uriInfo) {
+ return delegate.getModules(uriInfo);
+ }
+
+ @Override
+ public StructuredData getModules(String identifier, UriInfo uriInfo) {
+ return delegate.getModules(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData getModule(String identifier, UriInfo uriInfo) {
+ return delegate.getModule(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData getOperations(UriInfo uriInfo) {
+ return delegate.getOperations(uriInfo);
+ }
+
+ @Override
+ public StructuredData getOperations(String identifier, UriInfo uriInfo) {
+ return delegate.getOperations(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData invokeRpc(String identifier, CompositeNode payload, UriInfo uriInfo) {
+ rpc.incrementAndGet();
+ return delegate.invokeRpc(identifier, payload, uriInfo);
+ }
+
+ @Override
+ public StructuredData invokeRpc(String identifier, String noPayload, UriInfo uriInfo) {
+ rpc.incrementAndGet();
+ return delegate.invokeRpc(identifier, noPayload, uriInfo);
+ }
+
+ @Override
+ public NormalizedNodeContext readConfigurationData(String identifier, UriInfo uriInfo) {
+ configGet.incrementAndGet();
+ return delegate.readConfigurationData(identifier, uriInfo);
+ }
+
+ @Override
+ public NormalizedNodeContext readOperationalData(String identifier, UriInfo uriInfo) {
+ operationalGet.incrementAndGet();
+ return delegate.readOperationalData(identifier, uriInfo);
+ }
+
+ @Override
+ public Response updateConfigurationData(String identifier, Node<?> payload) {
+ configPut.incrementAndGet();
+ return delegate.updateConfigurationData(identifier, payload);
+ }
+
+ @Override
+ public Response createConfigurationData(String identifier, Node<?> payload) {
+ configPost.incrementAndGet();
+ return delegate.createConfigurationData(identifier, payload);
+ }
+
+ @Override
+ public Response createConfigurationData(Node<?> payload) {
+ configPost.incrementAndGet();
+ return delegate.createConfigurationData(payload);
+ }
+
+ @Override
+ public Response deleteConfigurationData(String identifier) {
+ return delegate.deleteConfigurationData(identifier);
+ }
+
+ @Override
+ public Response subscribeToStream(String identifier, UriInfo uriInfo) {
+ return delegate.subscribeToStream(identifier, uriInfo);
+ }
+
+ @Override
+ public StructuredData getAvailableStreams(UriInfo uriInfo) {
+ return delegate.getAvailableStreams(uriInfo);
+ }
+
+ public BigInteger getConfigDelete() {
+ return BigInteger.valueOf(configDelete.get());
+ }
+
+ public BigInteger getConfigGet() {
+ return BigInteger.valueOf(configGet.get());
+ }
+
+ public BigInteger getConfigPost() {
+ return BigInteger.valueOf(configPost.get());
+ }
+
+ public BigInteger getConfigPut() {
+ return BigInteger.valueOf(configPut.get());
+ }
+
+ public BigInteger getOperationalGet() {
+ return BigInteger.valueOf(operationalGet.get());
+ }
+
+ public BigInteger getRpc() {
+ return BigInteger.valueOf(rpc.get());
+ }
+
+}
config:java-name-prefix RestConnector;
}
+ grouping statistics {
+ leaf received-requests {
+ type uint64;
+ }
+ }
+
augment "/config:modules/config:module/config:configuration" {
case rest-connector-impl {
when "/config:modules/config:module/config:type = 'rest-connector-impl'";
}
}
}
+
+ augment "/config:modules/config:module/config:state" {
+ case rest-connector-impl {
+ when "/config:modules/config:module/config:type = 'rest-connector-impl'";
+ container rpcs {
+ uses statistics;
+ }
+
+ container config {
+ container get {
+ uses statistics;
+ }
+
+ container post {
+ uses statistics;
+ }
+
+ container put {
+ uses statistics;
+ }
+ }
+
+ container operational {
+ container get {
+ uses statistics;
+ }
+ }
+ }
+ }
}
\ No newline at end of file