return currentBehavior.state();
}
+ protected ReplicatedLogEntry getLastLogEntry() {
+ return replicatedLog.last();
+ }
+
+ protected Long getCurrentTerm(){
+ return context.getTermInformation().getCurrentTerm();
+ }
+
+ protected Long getCommitIndex(){
+ return context.getCommitIndex();
+ }
+
+ protected Long getLastApplied(){
+ return context.getLastApplied();
+ }
+
/**
* setPeerAddress sets the address of a known peer at a later time.
* <p>
*
* @param index a log index that is known to be committed
*/
- protected void applyLogToStateMachine(long index) {
+ protected void applyLogToStateMachine(final long index) {
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1;
i < index + 1; i++) {
}
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
+ context.getLogger().info("Setting last applied to {}", index);
context.setLastApplied(index);
}
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
context.getLogger().debug("Replicate message " + logIndex);
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
if (followers.size() == 0) {
- context.setCommitIndex(
- replicate.getReplicatedLogEntry().getIndex());
-
- context.getActor()
- .tell(new ApplyState(replicate.getClientActor(),
- replicate.getIdentifier(),
- replicate.getReplicatedLogEntry()),
- context.getActor()
- );
+ context.setCommitIndex(logIndex);
+ applyLogToStateMachine(logIndex);
} else {
-
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
sendAppendEntries();
}
}
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
MockRaftActorContext actorContext =
new MockRaftActorContext("test", getSystem(), raftActor);
+ actorContext.getReplicatedLog().removeFrom(0);
+
+ actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
+ new MockRaftActorContext.MockPayload("foo")));
+
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(1, 1,
+ new MockRaftActorContext.MockPayload("foo"));
+
+ actorContext.getReplicatedLog().append(entry);
+
Leader leader = new Leader(actorContext);
RaftState raftState = leader
- .handleMessage(senderActor, new Replicate(null, "state-id",
- new MockRaftActorContext.MockReplicatedLogEntry(1,
- 100,
- new MockRaftActorContext.MockPayload("foo"))
- ));
+ .handleMessage(senderActor, new Replicate(null, "state-id",entry));
// State should not change
assertEquals(RaftState.Leader, raftState);
- assertEquals(100, actorContext.getCommitIndex());
+ assertEquals(1, actorContext.getCommitIndex());
final String out =
new ExpectMsg<String>(duration("1 seconds"),
}
public DOMStore createConfigurationDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
public DOMStore createOperationalDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MutableClassToInstanceMap;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
@Beta
public class BindingTestContext implements AutoCloseable {
public void startNewDomDataBroker() {
checkState(executor != null, "Executor needs to be set");
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor);
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor);
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor,
+ MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor,
+ MoreExecutors.sameThreadExecutor());
newDatastores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.OPERATIONAL, operStore)
.put(LogicalDatastoreType.CONFIGURATION, configStore)
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-operational-datastore-provider</type>
<name>distributed-operational-store-module</name>
- <schema-service>
+ <operational-schema-service>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
<name>yang-schema-service</name>
- </schema-service>
+ </operational-schema-service>
</module>
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-config-datastore-provider</type>
<name>distributed-config-store-module</name>
- <schema-service>
+ <config-schema-service>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
<name>yang-schema-service</name>
- </schema-service>
+ </config-schema-service>
</module>
<module>
netty.tcp {
hostname = "<CHANGE_ME>"
port = 2550
- maximum-frame-size = 2097152
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
+ maximum-frame-size = 419430400
+ send-buffer-size = 52428800
+ receive-buffer-size = 52428800
}
}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-
/**
*
*/
private static final Logger
LOG = LoggerFactory.getLogger(DistributedDataStore.class);
- private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10;
+ private static final String EXECUTOR_MAX_POOL_SIZE_PROP =
+ "mdsal.dist-datastore-executor-pool.size";
+ private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10;
+
+ private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.dist-datastore-executor-queue.size";
+ private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
private final String type;
private final ActorContext actorContext;
private SchemaContext schemaContext;
-
-
/**
* Executor used to run FutureTask's
*
* This is typically used when we need to make a request to an actor and
* wait for it's response and the consumer needs to be provided a Future.
- *
- * FIXME : Make the thread pool size configurable.
*/
private final ListeningExecutorService executor =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE));
+ MoreExecutors.listeningDecorator(
+ SpecialExecutors.newBlockingBoundedFastThreadPool(
+ PropertyUtils.getIntSystemProperty(
+ EXECUTOR_MAX_POOL_SIZE_PROP,
+ DEFAULT_EXECUTOR_MAX_POOL_SIZE),
+ PropertyUtils.getIntSystemProperty(
+ EXECUTOR_MAX_QUEUE_SIZE_PROP,
+ DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
this(new ActorContext(actorSystem, actorSystem
import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
public static final String DEFAULT_NAME = "default";
- private final ListeningExecutorService storeExecutor =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
-
private final InMemoryDOMDataStore store;
private final Map<Object, DOMStoreThreePhaseCommitCohort>
LOG.info("Creating shard : {} persistent : {}", name, persistent);
- store = new InMemoryDOMDataStore(name, storeExecutor);
+ store = InMemoryDOMDataStoreFactory.create(name, null);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
}
- public static Props props(final String name, final Map<String, String> peerAddresses) {
+ public static Props props(final String name,
+ final Map<String, String> peerAddresses) {
return Props.create(new Creator<Shard>() {
@Override
}
- @Override public void onReceiveCommand(Object message){
- LOG.debug("Received message {} from {}", message.getClass().toString(), getSender());
+ @Override public void onReceiveCommand(Object message) {
+ LOG.debug("Received message {} from {}", message.getClass().toString(),
+ getSender());
- if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
- if(isLeader()) {
+ if (message.getClass()
+ .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
+ if (isLeader()) {
createTransactionChain();
- } else if(getLeader() != null){
+ } else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
} else if (message instanceof RegisterChangeListener) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof ForwardedCommitTransaction) {
handleForwardedCommit((ForwardedCommitTransaction) message);
- } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- if(isLeader()) {
+ } else if (message.getClass()
+ .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (isLeader()) {
createTransaction(CreateTransaction.fromSerializable(message));
- } else if(getLeader() != null){
+ } else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
- } else if (message instanceof PeerAddressResolved){
+ } else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
- } else {
- super.onReceiveCommand(message);
+ } else{
+ super.onReceiveCommand(message);
}
}
- private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
- if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
-
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
- }else{
- throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
- }
- }
+ private ActorRef createTypedTransactionActor(
+ CreateTransaction createTransaction, String transactionId) {
+ if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ shardMBean.incrementReadOnlyTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newReadOnlyTransaction(), getSelf(),
+ schemaContext), transactionId);
+
+ } else if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+ shardMBean.incrementReadWriteTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newReadWriteTransaction(), getSelf(),
+ schemaContext), transactionId);
+
+
+ } else if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ shardMBean.incrementWriteOnlyTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newWriteOnlyTransaction(), getSelf(),
+ schemaContext), transactionId);
+ } else {
+ throw new IllegalArgumentException(
+ "CreateTransaction message has unidentified transaction type="
+ + createTransaction.getTransactionType());
+ }
+ }
private void createTransaction(CreateTransaction createTransaction) {
String transactionId = "shard-" + createTransaction.getTransactionId();
- LOG.info("Creating transaction : {} " , transactionId);
- ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId);
+ LOG.info("Creating transaction : {} ", transactionId);
+ ActorRef transactionActor =
+ createTypedTransactionActor(createTransaction, transactionId);
getSender()
- .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(),
+ .tell(new CreateTransactionReply(
+ Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(),
getSelf());
}
private void commit(final ActorRef sender, Object serialized) {
- Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
+ Modification modification = MutableCompositeModification
+ .fromSerializable(serialized, schemaContext);
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
future.get();
future = commitCohort.commit();
future.get();
- } catch (InterruptedException e) {
- LOG.error("Failed to commit", e);
- } catch (ExecutionException e) {
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
LOG.error("Failed to commit", e);
+ return;
}
+ //we want to just apply the recovery commit and return
+ shardMBean.incrementCommittedTransactionCount();
+ return;
}
final ListenableFuture<Void> future = cohort.commit();
- shardMBean.incrementCommittedTransactionCount();
final ActorRef self = getSelf();
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
-
- if(sender != null) {
sender
.tell(new CommitTransactionReply().toSerializable(),
self);
- } else {
- LOG.error("sender is null ???");
- }
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(new Date());
+
} catch (InterruptedException | ExecutionException e) {
- // FIXME : Handle this properly
- LOG.error(e, "An exception happened when committing");
+ shardMBean.incrementFailedTransactionsCount();
+ sender.tell(new akka.actor.Status.Failure(e),self);
}
}
}, getContext().dispatcher());
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
- Object serializedModification = message.getModification().toSerializable();
+ Object serializedModification =
+ message.getModification().toSerializable();
modificationToCohort
- .put(serializedModification , message.getCohort());
+ .put(serializedModification, message.getCohort());
- if(persistent) {
- this.persistData(getSender(), "identifier", new CompositeModificationPayload(serializedModification));
+ if (persistent) {
+ this.persistData(getSender(), "identifier",
+ new CompositeModificationPayload(serializedModification));
} else {
this.commit(getSender(), serializedModification);
}
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
+ LOG.debug("registerDataChangeListener for " + registerChangeListener
+ .getPath());
ActorSelection dataChangeListenerPath = getContext()
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
// it will not
- dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListenerPath
+ .tell(new EnableNotification(isLeader()), getSelf());
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
+ listener =
+ new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
registration =
getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
+ LOG.debug(
+ "registerDataChangeListener sending reply, listenerRegistrationPath = "
+ + listenerRegistration.path().toString());
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
ShardTransactionChain.props(chain, schemaContext));
getSender()
.tell(new CreateTransactionChainReply(transactionChain.path())
- .toSerializable(),
+ .toSerializable(),
getSelf());
}
@Override protected void applyState(ActorRef clientActor, String identifier,
Object data) {
- if(data instanceof CompositeModificationPayload){
+ if (data instanceof CompositeModificationPayload) {
Object modification =
((CompositeModificationPayload) data).getModification();
- if(modification != null){
+ if (modification != null) {
commit(clientActor, modification);
} else {
LOG.error("modification is null - this is very unexpected");
LOG.error("Unknown state received {}", data);
}
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+
+ if(lastLogEntry != null){
+ shardMBean.setLastLogIndex(lastLogEntry.getIndex());
+ shardMBean.setLastLogTerm(lastLogEntry.getTerm());
+ }
+
+ shardMBean.setCommitIndex(getCommitIndex());
+ shardMBean.setLastApplied(getLastApplied());
+
}
@Override protected Object createSnapshot() {
}
@Override protected void onStateChanged() {
- for(ActorSelection dataChangeListener : dataChangeListeners){
- dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+ for (ActorSelection dataChangeListener : dataChangeListeners) {
+ dataChangeListener
+ .tell(new EnableNotification(isLeader()), getSelf());
}
- if(getLeaderId() != null){
+ if (getLeaderId() != null) {
shardMBean.setLeader(getLeaderId());
}
shardMBean.setRaftState(getRaftState().name());
+ shardMBean.setCurrentTerm(getCurrentTerm());
}
@Override public String persistenceId() {
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
+ //default scope test method to check if we get correct exception
+ void forUnitTestOnlyExplicitTransactionClose(){
+ transaction.close();
+ }
+
}
getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
+
+ /**
+ * The following method is used in unit testing only
+ * hence the default scope.
+ * This is done to test out failure cases.
+ */
+ public void forUnitTestOnlyExplicitTransactionClose() {
+ transaction.close();
+ }
}
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.ExecutionException;
-
/**
* The ShardTransaction Actor represents a remote transaction
* <p>
} else {
sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
}
- } catch (InterruptedException | ExecutionException e) {
- log.error(e,
- "An exception happened when reading data from path : "
- + path.toString());
+ } catch (Exception e) {
+ sender.tell(new akka.actor.Status.Failure(new ReadFailedException( "An Exception occurred when reading data from path : "
+ + path.toString(),e)),self);
}
}
modification.addModification(
new WriteModification(message.getPath(), message.getData(),schemaContext));
LOG.debug("writeData at path : " + message.getPath().toString());
- transaction.write(message.getPath(), message.getData());
- getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+
+ try {
+ transaction.write(message.getPath(), message.getData());
+ getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
modification.addModification(
new MergeModification(message.getPath(), message.getData(), schemaContext));
LOG.debug("mergeData at path : " + message.getPath().toString());
- transaction.merge(message.getPath(), message.getData());
- getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+ try {
+ transaction.merge(message.getPath(), message.getData());
+ getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
modification.addModification(new DeleteModification(message.getPath()));
- transaction.delete(message.getPath());
- getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+ try {
+ transaction.delete(message.getPath());
+ getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
+
+ /**
+ * The following method is used in unit testing only
+ * hence the default scope.
+ * This is done to test out failure cases.
+ */
+ public void forUnitTestOnlyExplicitTransactionClose() {
+ transaction.close();
+ }
}
* Date: 7/16/14
*/
public class ShardMBeanFactory {
- private static Map<String,ShardStats> shardMBeans= new HashMap<String,ShardStats>();
+ private static Map<String, ShardStats> shardMBeans =
+ new HashMap<String, ShardStats>();
- public static ShardStats getShardStatsMBean(String shardName){
- if(shardMBeans.containsKey(shardName)){
+ public static ShardStats getShardStatsMBean(String shardName) {
+ if (shardMBeans.containsKey(shardName)) {
return shardMBeans.get(shardName);
- }else {
- ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+ } else {
+ ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
- if(shardStatsMBeanImpl.registerMBean()) {
- shardMBeans.put(shardName, shardStatsMBeanImpl);
- }
- return shardStatsMBeanImpl;
- }
- }
+ if (shardStatsMBeanImpl.registerMBean()) {
+ shardMBeans.put(shardName, shardStatsMBeanImpl);
+ }
+ return shardStatsMBeanImpl;
+ }
+ }
}
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
/**
* @author: syedbahm
*/
public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
- private Long committedTransactionsCount;
- private Long journalMessagesCount;
- final private String shardName;
- private String leader;
- private String raftState;
- ShardStats(String shardName){
- this.shardName = shardName;
- committedTransactionsCount =0L;
- journalMessagesCount = 0L;
- };
+ private final String shardName;
+
+ private Long committedTransactionsCount = 0L;
+
+ private Long readOnlyTransactionCount = 0L;
+
+ private Long writeOnlyTransactionCount = 0L;
+
+ private Long readWriteTransactionCount = 0L;
+
+ private String leader;
+
+ private String raftState;
+
+ private Long lastLogTerm = -1L;
+
+ private Long lastLogIndex = -1L;
+
+ private Long currentTerm = -1L;
+
+ private Long commitIndex = -1L;
+
+ private Long lastApplied = -1L;
+
+ private Date lastCommittedTransactionTime = new Date(0L);
+
+ private Long failedTransactionsCount = 0L;
+
+ private SimpleDateFormat sdf =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+ ShardStats(String shardName) {
+ this.shardName = shardName;
+ }
+
+
+ @Override
+ public String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ public Long getCommittedTransactionsCount() {
+ return committedTransactionsCount;
+ }
+
+ @Override public String getLeader() {
+ return leader;
+ }
+
+ @Override public String getRaftState() {
+ return raftState;
+ }
+
+ @Override public Long getReadOnlyTransactionCount() {
+ return readOnlyTransactionCount;
+ }
+
+ @Override public Long getWriteOnlyTransactionCount() {
+ return writeOnlyTransactionCount;
+ }
+
+ @Override public Long getReadWriteTransactionCount() {
+ return readWriteTransactionCount;
+ }
+
+ @Override public Long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ @Override public Long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ @Override public Long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ @Override public Long getCommitIndex() {
+ return commitIndex;
+ }
+
+ @Override public Long getLastApplied() {
+ return lastApplied;
+ }
+
+ @Override
+ public String getLastCommittedTransactionTime() {
+ return sdf.format(lastCommittedTransactionTime);
+ }
- @Override
- public String getShardName() {
- return shardName;
- }
+ @Override public Long getFailedTransactionsCount() {
+ return failedTransactionsCount;
+ }
- @Override
- public Long getCommittedTransactionsCount() {
- return committedTransactionsCount;
- }
+ public Long incrementCommittedTransactionCount() {
+ return committedTransactionsCount++;
+ }
- @Override
- public Long getJournalMessagesCount() {
- //FIXME: this will be populated once after integration with Raft stuff
- return journalMessagesCount;
- }
+ public Long incrementReadOnlyTransactionCount() {
+ return readOnlyTransactionCount++;
+ }
- @Override public String getLeader() {
- return leader;
- }
+ public Long incrementWriteOnlyTransactionCount() {
+ return writeOnlyTransactionCount++;
+ }
- @Override public String getRaftState() {
- return raftState;
- }
+ public Long incrementReadWriteTransactionCount() {
+ return readWriteTransactionCount++;
+ }
- public Long incrementCommittedTransactionCount() {
- return committedTransactionsCount++;
- }
+ public void setLeader(String leader) {
+ this.leader = leader;
+ }
+ public void setRaftState(String raftState) {
+ this.raftState = raftState;
+ }
- public void updateCommittedTransactionsCount(long currentCount){
- committedTransactionsCount = currentCount;
+ public void setLastLogTerm(Long lastLogTerm) {
+ this.lastLogTerm = lastLogTerm;
+ }
- }
+ public void setLastLogIndex(Long lastLogIndex) {
+ this.lastLogIndex = lastLogIndex;
+ }
- public void updateJournalMessagesCount(long currentCount){
- journalMessagesCount = currentCount;
+ public void setCurrentTerm(Long currentTerm) {
+ this.currentTerm = currentTerm;
+ }
- }
+ public void setCommitIndex(Long commitIndex) {
+ this.commitIndex = commitIndex;
+ }
- public void setLeader(String leader){
- this.leader = leader;
- }
+ public void setLastApplied(Long lastApplied) {
+ this.lastApplied = lastApplied;
+ }
- public void setRaftState(String raftState){
- this.raftState = raftState;
- }
+ public void setLastCommittedTransactionTime(
+ Date lastCommittedTransactionTime) {
+ this.lastCommittedTransactionTime = lastCommittedTransactionTime;
+ }
- @Override
- protected String getMBeanName() {
- return shardName;
- }
+ @Override
+ protected String getMBeanName() {
+ return shardName;
+ }
- @Override
- protected String getMBeanType() {
- return JMX_TYPE_DISTRIBUTED_DATASTORE;
- }
+ @Override
+ protected String getMBeanType() {
+ return JMX_TYPE_DISTRIBUTED_DATASTORE;
+ }
- @Override
- protected String getMBeanCategory() {
- return JMX_CATEGORY_SHARD;
- }
+ @Override
+ protected String getMBeanCategory() {
+ return JMX_CATEGORY_SHARD;
+ }
+ public void incrementFailedTransactionsCount() {
+ this.failedTransactionsCount++;
+ }
}
* @author: syedbahm
*/
public interface ShardStatsMBean {
- String getShardName();
- Long getCommittedTransactionsCount();
- Long getJournalMessagesCount();
- String getLeader();
- String getRaftState();
+ String getShardName();
+
+ Long getCommittedTransactionsCount();
+
+ String getLeader();
+
+ String getRaftState();
+
+ Long getReadOnlyTransactionCount();
+
+ Long getWriteOnlyTransactionCount();
+
+ Long getReadWriteTransactionCount();
+
+ Long getLastLogIndex();
+
+ Long getLastLogTerm();
+
+ Long getCurrentTerm();
+
+ Long getCommitIndex();
+
+ Long getLastApplied();
+
+ String getLastCommittedTransactionTime();
+
+ Long getFailedTransactionsCount();
+
}
netty.tcp {
hostname = "127.0.0.1"
port = 2550
- maximum-frame-size = 2097152
+ maximum-frame-size = 419430400
send-buffer-size = 52428800
receive-buffer-size = 52428800
}
public class DataChangeListenerRegistrationTest extends AbstractActorTest {
private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
static {
store.onGlobalContextUpdated(TestModel.createTestContext());
final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
return "match";
private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
- private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+ private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
static {
store.onGlobalContextUpdated(TestModel.createTestContext());
final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
return CreateTransactionReply.fromSerializable(in).getTransactionPath();
final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new CloseTransactionChain().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
return "match";
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Covers negative test cases
+ * @author Basheeruddin Ahmed <syedbahm@cisco.com>
+ */
+public class ShardTransactionFailureTest extends AbstractActorTest {
+ private static ListeningExecutorService storeExecutor =
+ MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+ private static final InMemoryDOMDataStore store =
+ new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
+
+ private static final SchemaContext testSchemaContext =
+ TestModel.createTestContext();
+
+ static {
+ store.onGlobalContextUpdated(testSchemaContext);
+ }
+
+
+ @Test
+ public void testNegativePerformingWriteOperationOnReadTransaction()
+ throws Exception {
+ try {
+
+ final ActorRef
+ shard = getSystem()
+ .actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction
+ .props(store.newReadOnlyTransaction(), shard, TestModel
+ .createTestContext());
+ final TestActorRef subject = TestActorRef.apply(props, getSystem());
+
+ subject
+ .receive(new DeleteData(TestModel.TEST_PATH).toSerializable(),
+ ActorRef.noSender());
+ Assert.assertFalse(true);
+
+
+ } catch (Exception cs) {
+ assertEquals(cs.getClass().getSimpleName(),
+ Exception.class.getSimpleName());
+ assertTrue(cs.getMessage().startsWith(
+ "ShardTransaction:handleRecieve received an unknown message"));
+ }
+ }
+
+ @Test(expected = ReadFailedException.class)
+ public void testNegativeReadWithReadOnlyTransactionClosed()
+ throws Throwable {
+
+ final ActorRef shard =
+ getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+ TestModel.createTestContext());
+
+ final TestActorRef<ShardTransaction> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeReadWithReadOnlyTransactionClosed");
+
+ ShardTransactionMessages.ReadData readData =
+ ShardTransactionMessages.ReadData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()
+ ).build();
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject, readData, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+ ((ShardReadTransaction) subject.underlyingActor())
+ .forUnitTestOnlyExplicitTransactionClose();
+
+ future = akka.pattern.Patterns.ask(subject, readData, 3000);
+ Await.result(future, Duration.Zero());
+
+
+ }
+
+
+ @Test(expected = ReadFailedException.class)
+ public void testNegativeReadWithReadWriteOnlyTransactionClosed()
+ throws Throwable {
+
+ final ActorRef shard =
+ getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ TestModel.createTestContext());
+
+ final TestActorRef<ShardTransaction> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeReadWithReadWriteOnlyTransactionClosed");
+
+ ShardTransactionMessages.ReadData readData =
+ ShardTransactionMessages.ReadData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()
+ ).build();
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject, readData, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+ ((ShardReadWriteTransaction) subject.underlyingActor())
+ .forUnitTestOnlyExplicitTransactionClose();
+
+ future = akka.pattern.Patterns.ask(subject, readData, 3000);
+ Await.result(future, Duration.Zero());
+
+
+ }
+
+
+ @Test(expected = IllegalStateException.class)
+ public void testNegativeWriteWithTransactionReady() throws Exception {
+
+
+ final ActorRef shard =
+ getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+ TestModel.createTestContext());
+
+ final TestActorRef<ShardTransaction> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeWriteWithTransactionReady");
+
+ ShardTransactionMessages.ReadyTransaction readyTransaction =
+ ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+ ShardTransactionMessages.WriteData writeData =
+ ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()).setNormalizedNode(
+ NormalizedNodeMessages.Node.newBuilder().build()
+
+ ).build();
+
+ future = akka.pattern.Patterns.ask(subject, writeData, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+
+ }
+
+
+ @Test(expected = IllegalStateException.class)
+ public void testNegativeReadWriteWithTransactionReady() throws Exception {
+
+
+ final ActorRef shard =
+ getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ TestModel.createTestContext());
+
+ final TestActorRef<ShardTransaction> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeReadWriteWithTransactionReady");
+
+ ShardTransactionMessages.ReadyTransaction readyTransaction =
+ ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+ ShardTransactionMessages.WriteData writeData =
+ ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()).setNormalizedNode(
+ NormalizedNodeMessages.Node.newBuilder().build()
+
+ ).build();
+
+ future = akka.pattern.Patterns.ask(subject, writeData, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testNegativeMergeTransactionReady() throws Exception {
+
+
+ final ActorRef shard =
+ getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ TestModel.createTestContext());
+
+ final TestActorRef<ShardTransaction> subject = TestActorRef
+ .create(getSystem(), props, "testNegativeMergeTransactionReady");
+
+ ShardTransactionMessages.ReadyTransaction readyTransaction =
+ ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+ ShardTransactionMessages.MergeData mergeData =
+ ShardTransactionMessages.MergeData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()).setNormalizedNode(
+ NormalizedNodeMessages.Node.newBuilder().build()
+
+ ).build();
+
+ future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+
+ }
+
+
+ @Test(expected = IllegalStateException.class)
+ public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
+
+
+ final ActorRef shard =
+ getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+ final Props props =
+ ShardTransaction.props(store.newReadWriteTransaction(), shard,
+ TestModel.createTestContext());
+
+ final TestActorRef<ShardTransaction> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeDeleteDataWhenTransactionReady");
+
+ ShardTransactionMessages.ReadyTransaction readyTransaction =
+ ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+ ShardTransactionMessages.DeleteData deleteData =
+ ShardTransactionMessages.DeleteData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()).build();
+
+ future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
+ assertTrue(future.isCompleted());
+ Await.result(future, Duration.Zero());
+
+
+ }
+}
MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
private static final InMemoryDOMDataStore store =
- new InMemoryDOMDataStore("OPER", storeExecutor);
+ new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
private static final SchemaContext testSchemaContext = TestModel.createTestContext();
final ActorRef subject = getSystem().actorOf(props, "testReadData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
final Class<? extends Modification> modificationType) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject
.tell(new ShardTransaction.GetCompositedModification(),
final CompositeModification compositeModification =
new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected CompositeModification match(Object in) {
if (in instanceof ShardTransaction.GetCompositeModificationReply) {
return ((ShardTransaction.GetCompositeModificationReply) in)
getSystem().actorOf(props, "testWriteData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new WriteData(TestModel.TEST_PATH,
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
return "match";
getSystem().actorOf(props, "testMergeData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new MergeData(TestModel.TEST_PATH,
final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
return "match";
getSystem().actorOf(props, "testDeleteData");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
return "match";
getSystem().actorOf(props, "testReadyTransaction");
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
subject.tell(new ReadyTransaction().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
return "match";
watch(subject);
new Within(duration("2 seconds")) {
+ @Override
protected void run() {
subject.tell(new CloseTransaction().toSerializable(), getRef());
final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
return "match";
final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof Terminated) {
return "match";
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import java.text.SimpleDateFormat;
+import java.util.Date;
public class ShardStatsTest {
- private MBeanServer mbeanServer;
- private ShardStats shardStats;
- private ObjectName testMBeanName;
+ private MBeanServer mbeanServer;
+ private ShardStats shardStats;
+ private ObjectName testMBeanName;
- @Before
- public void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
- shardStats = new ShardStats("shard-1");
- shardStats.registerMBean();
- mbeanServer= shardStats.getMBeanServer();
- String objectName = AbstractBaseMBean.BASE_JMX_PREFIX + "type="+shardStats.getMBeanType()+",Category="+
- shardStats.getMBeanCategory() + ",name="+
- shardStats.getMBeanName();
- testMBeanName = new ObjectName(objectName);
- }
+ shardStats = new ShardStats("shard-1");
+ shardStats.registerMBean();
+ mbeanServer = shardStats.getMBeanServer();
+ String objectName =
+ AbstractBaseMBean.BASE_JMX_PREFIX + "type=" + shardStats
+ .getMBeanType() + ",Category=" +
+ shardStats.getMBeanCategory() + ",name=" +
+ shardStats.getMBeanName();
+ testMBeanName = new ObjectName(objectName);
+ }
- @After
- public void tearDown() throws Exception {
- shardStats.unregisterMBean();
- }
+ @After
+ public void tearDown() throws Exception {
+ shardStats.unregisterMBean();
+ }
- @Test
- public void testGetShardName() throws Exception {
+ @Test
+ public void testGetShardName() throws Exception {
- Object attribute = mbeanServer.getAttribute(testMBeanName,"ShardName");
- Assert.assertEquals((String) attribute, "shard-1");
+ Object attribute = mbeanServer.getAttribute(testMBeanName, "ShardName");
+ Assert.assertEquals((String) attribute, "shard-1");
- }
+ }
- @Test
- public void testGetCommittedTransactionsCount() throws Exception {
- //let us increment some transactions count and then check
- shardStats.incrementCommittedTransactionCount();
- shardStats.incrementCommittedTransactionCount();
- shardStats.incrementCommittedTransactionCount();
+ @Test
+ public void testGetCommittedTransactionsCount() throws Exception {
+ //let us increment some transactions count and then check
+ shardStats.incrementCommittedTransactionCount();
+ shardStats.incrementCommittedTransactionCount();
+ shardStats.incrementCommittedTransactionCount();
- //now let us get from MBeanServer what is the transaction count.
- Object attribute = mbeanServer.getAttribute(testMBeanName,"CommittedTransactionsCount");
- Assert.assertEquals((Long) attribute, (Long)3L);
+ //now let us get from MBeanServer what is the transaction count.
+ Object attribute = mbeanServer.getAttribute(testMBeanName,
+ "CommittedTransactionsCount");
+ Assert.assertEquals((Long) attribute, (Long) 3L);
- }
-}
\ No newline at end of file
+ }
+
+ @Test
+ public void testGetLastCommittedTransactionTime() throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ Assert.assertEquals(shardStats.getLastCommittedTransactionTime(),
+ sdf.format(new Date(0L)));
+ long millis = System.currentTimeMillis();
+ shardStats.setLastCommittedTransactionTime(new Date(millis));
+
+ //now let us get from MBeanServer what is the transaction count.
+ Object attribute = mbeanServer.getAttribute(testMBeanName,
+ "LastCommittedTransactionTime");
+ Assert.assertEquals((String) attribute, sdf.format(new Date(millis)));
+ Assert.assertNotEquals((String) attribute,
+ sdf.format(new Date(millis - 1)));
+
+ }
+
+ @Test
+ public void testGetFailedTransactionsCount() throws Exception {
+ //let us increment some transactions count and then check
+ shardStats.incrementFailedTransactionsCount();
+ shardStats.incrementFailedTransactionsCount();
+
+
+ //now let us get from MBeanServer what is the transaction count.
+ Object attribute =
+ mbeanServer.getAttribute(testMBeanName, "FailedTransactionsCount");
+ Assert.assertEquals((Long) attribute, (Long) 2L);
+
+
+
+ }
+}
@Before
public void setUp(){
- store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
+ store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor(),
+ MoreExecutors.sameThreadExecutor());
store.onGlobalContextUpdated(TestModel.createTestContext());
}
*/
package org.opendaylight.controller.config.yang.md.sal.dom.impl;
-import java.util.concurrent.Executors;
-
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.PropertyUtils;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
/**
*
public final class DomInmemoryDataBrokerModule extends
org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
+ private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-future-callback-queue.size";
+ private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000;
+
+ private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP =
+ "mdsal.datastore-future-callback-pool.size";
+ private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20;
+ private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-commit-queue.size";
+ private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
+
public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
@Override
public java.lang.AutoCloseable createInstance() {
- ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
//Initializing Operational DOM DataStore defaulting to InMemoryDOMDataStore if one is not configured
DOMStore operStore = getOperationalDataStoreDependency();
if(operStore == null){
//we will default to InMemoryDOMDataStore creation
- operStore = new InMemoryDOMDataStore("DOM-OPER", storeExecutor);
- //here we will register the SchemaContext listener
- getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)operStore);
+ operStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency());
}
DOMStore configStore = getConfigDataStoreDependency();
if(configStore == null){
//we will default to InMemoryDOMDataStore creation
- configStore = new InMemoryDOMDataStore("DOM-CFG", storeExecutor);
- //here we will register the SchemaContext listener
- getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)configStore);
+ configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
}
ImmutableMap<LogicalDatastoreType, DOMStore> datastores = ImmutableMap
.<LogicalDatastoreType, DOMStore> builder().put(LogicalDatastoreType.OPERATIONAL, operStore)
.put(LogicalDatastoreType.CONFIGURATION, configStore).build();
+ /*
+ * We use a single-threaded executor for commits with a bounded queue capacity. If the
+ * queue capacity is reached, subsequent commit tasks will be rejected and the commits will
+ * fail. This is done to relieve back pressure. This should be an extreme scenario - either
+ * there's deadlock(s) somewhere and the controller is unstable or some rogue component is
+ * continuously hammering commits too fast or the controller is just over-capacity for the
+ * system it's running on.
+ */
+ ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
+ PropertyUtils.getIntSystemProperty(
+ COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP,
+ DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit");
+
+ /*
+ * We use an executor for commit ListenableFuture callbacks that favors reusing available
+ * threads over creating new threads at the expense of execution time. The assumption is
+ * that most ListenableFuture callbacks won't execute a lot of business logic where we want
+ * it to run quicker - many callbacks will likely just handle error conditions and do
+ * nothing on success. The executor queue capacity is bounded and, if the capacity is
+ * reached, subsequent submitted tasks will block the caller.
+ */
+ Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
+ PropertyUtils.getIntSystemProperty(
+ FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP,
+ DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE),
+ PropertyUtils.getIntSystemProperty(
+ FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP,
+ DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures");
+
DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
- new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION));
+ new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
+ listenableFutureExecutor));
return newDataBroker;
}
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
import javax.annotation.concurrent.GuardedBy;
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
Preconditions.checkArgument(listener != null, "Listener must not be null");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
- transaction, cohorts, listener));
+
+ ListenableFuture<Void> commitFuture = null;
+ try {
+ commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+ } catch(RejectedExecutionException e) {
+ LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+ executor, e);
+ return Futures.immediateFailedCheckedFuture(
+ new TransactionCommitFailedException(
+ "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+ }
+
if (listener.isPresent()) {
Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
}
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
private SchemaContext schemaContext;
private DOMDataBrokerImpl domBroker;
private ListeningExecutorService executor;
+ private ExecutorService futureExecutor;
+ private CommitExecutorService commitExecutor;
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
.put(OPERATIONAL, operStore) //
.build();
- executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
- TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
+ commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+ futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+ executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
domBroker = new DOMDataBrokerImpl(stores, executor);
}
if( executor != null ) {
executor.shutdownNow();
}
+
+ if(futureExecutor != null) {
+ futureExecutor.shutdownNow();
+ }
}
@Test(timeout=10000)
assertTrue(afterCommitRead.isPresent());
}
+ @Test(expected=TransactionCommitFailedException.class)
+ public void testRejectedCommit() throws Exception {
+
+ commitExecutor.delegate = Mockito.mock( ExecutorService.class );
+ Mockito.doThrow( new RejectedExecutionException( "mock" ) )
+ .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
+ Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
+ Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
+ Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
+ Mockito.doReturn( true ).when( commitExecutor.delegate )
+ .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
+
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
+
+ writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
+ }
+
/**
* Tests a simple DataChangeListener notification after a write.
*/
assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
}
}
+
+ static class CommitExecutorService extends ForwardingExecutorService {
+
+ ExecutorService delegate;
+
+ public CommitExecutorService( ExecutorService delegate ) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return delegate;
+ }
+ }
}
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
-import java.util.concurrent.Executors;
-
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-
-import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
+
public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
@Override
public java.lang.AutoCloseable createInstance() {
- InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
- getSchemaServiceDependency().registerSchemaContextListener(ids);
- return ids;
+ return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
}
}
package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
-import java.util.concurrent.Executors;
-
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-
-import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
+
public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
@Override
public java.lang.AutoCloseable createInstance() {
- InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
- getOperationalSchemaServiceDependency().registerSchemaContextListener(ids);
- return ids;
+ return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency());
}
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
class ChangeListenerNotifyTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
+
private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+ notificationMgr;
+
+ @SuppressWarnings("rawtypes")
public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event) {
+ final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event,
+ final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
this.listeners = listeners;
this.event = event;
+ this.notificationMgr = notificationMgr;
}
@Override
public void run() {
for (DataChangeListenerRegistration<?> listener : listeners) {
- try {
- listener.getInstance().onDataChanged(event);
- } catch (Exception e) {
- LOG.error("Unhandled exception during invoking listener {} with event {}", listener, event, e);
- }
+ notificationMgr.submitNotification(listener.getInstance(), event);
}
-
}
@Override
public String toString() {
return "ChangeListenerNotifyTask [listeners=" + listeners + ", event=" + event + "]";
}
-
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+
import java.util.Collections;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.Preconditions.checkState;
public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
TransactionReadyPrototype,AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+
+ @SuppressWarnings("rawtypes")
+ private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+ AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
+ new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+ AsyncDataChangeEvent>() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void invokeListener( AsyncDataChangeListener listener,
+ AsyncDataChangeEvent notification ) {
+ listener.onDataChanged(notification);
+ }
+ };
+
+ private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-queue.size";
+
+ private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
+
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
- private final ListeningExecutorService executor;
+ private final ListeningExecutorService listeningExecutor;
+
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+ dataChangeListenerNotificationManager;
+ private final ExecutorService dataChangeListenerExecutor;
private final String name;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ final ExecutorService dataChangeListenerExecutor) {
this.name = Preconditions.checkNotNull(name);
- this.executor = Preconditions.checkNotNull(executor);
+ this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
+
+ this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+
+ int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
+ DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
+
+ dataChangeListenerNotificationManager =
+ new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
+ DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
}
@Override
}
@Override
- public void close(){
- executor.shutdownNow();
+ public void close() {
+ ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+ ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
}
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
.setAfter(data) //
.addCreated(path, data) //
.build();
- executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
+
+ new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
+ dataChangeListenerNotificationManager).run();
}
}
@Override
public void close() {
- executor.shutdownNow();
-
+ // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
+ // by the outer class.
+ //listeningExecutor.shutdownNow();
}
protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
@Override
public ListenableFuture<Boolean> canCommit() {
- return executor.submit(new Callable<Boolean>() {
+ return listeningExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws TransactionCommitFailedException {
try {
@Override
public ListenableFuture<Void> preCommit() {
- return executor.submit(new Callable<Void>() {
+ return listeningExecutor.submit(new Callable<Void>() {
@Override
public Void call() {
candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+ listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
+ dataChangeListenerNotificationManager);
return null;
}
});
for (ChangeListenerNotifyTask task : listenerResolver.call()) {
LOG.trace("Scheduling invocation of listeners: {}", task);
- executor.submit(task);
+ task.run();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * A factory for creating InMemoryDOMDataStore instances.
+ *
+ * @author Thomas Pantelis
+ */
+public final class InMemoryDOMDataStoreFactory {
+
+ private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-queue.size";
+ private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000;
+
+ private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-pool.size";
+ private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20;
+
+ private InMemoryDOMDataStoreFactory() {
+ }
+
+ /**
+ * Creates an InMemoryDOMDataStore instance.
+ *
+ * @param name the name of the data store
+ * @param schemaService the SchemaService to which to register the data store.
+ * @return an InMemoryDOMDataStore instance
+ */
+ public static InMemoryDOMDataStore create(final String name,
+ @Nullable final SchemaService schemaService) {
+
+ // For DataChangeListener notifications we use an executor that provides the fastest
+ // task execution time to get higher throughput as DataChangeListeners typically provide
+ // much of the business logic for a data model. If the executor queue size limit is reached,
+ // subsequent submitted notifications will block the calling thread.
+
+ int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty(
+ DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE);
+ int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty(
+ DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE);
+
+ ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
+ dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
+
+ InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
+ dataChangeListenerExecutor);
+
+ if(schemaService != null) {
+ schemaService.registerSchemaContextListener(dataStore);
+ }
+
+ return dataStore;
+ }
+}
import java.util.Set;
import java.util.concurrent.Callable;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
private final DataTreeCandidate candidate;
private final ListenerTree listenerRoot;
- public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr;
+
+ @SuppressWarnings("rawtypes")
+ public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree,
+ final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr) {
this.candidate = Preconditions.checkNotNull(candidate);
this.listenerRoot = Preconditions.checkNotNull(listenerTree);
+ this.notificationMgr = Preconditions.checkNotNull(notificationMgr);
}
/**
* @param listeners
* @param entries
*/
- private static void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
+ private void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
final ListenerTree.Node listeners, final Collection<DOMImmutableDataChangeEvent> entries) {
if (!entries.isEmpty()) {
* @param listeners
* @param event
*/
- private static void addNotificationTaskByScope(
+ private void addNotificationTaskByScope(
final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
final DOMImmutableDataChangeEvent event) {
DataChangeScope eventScope = event.getScope();
List<DataChangeListenerRegistration<?>> listenerSet = Collections
.<DataChangeListenerRegistration<?>> singletonList(listenerReg);
if (eventScope == DataChangeScope.BASE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
} else if (eventScope == DataChangeScope.ONE && listenerScope != DataChangeScope.BASE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
} else if (eventScope == DataChangeScope.SUBTREE && listenerScope == DataChangeScope.SUBTREE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
}
}
}
* @param listeners
* @param entries
*/
- private static void addNotificationTasksAndMergeEvents(
+ private void addNotificationTasksAndMergeEvents(
final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
final Collection<DOMImmutableDataChangeEvent> entries) {
}
}
- private static void addNotificationTaskExclusively(
+ private void addNotificationTaskExclusively(
final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final Node listeners,
final DOMImmutableDataChangeEvent event) {
for (DataChangeListenerRegistration<?> listener : listeners.getListeners()) {
if (listener.getScope() == event.getScope()) {
Set<DataChangeListenerRegistration<?>> listenerSet = Collections
.<DataChangeListenerRegistration<?>> singleton(listener);
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+ taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
}
}
}
}
}
- public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
- return new ResolveDataChangeEventsTask(candidate, listenerTree);
+ @SuppressWarnings("rawtypes")
+ public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate,
+ final ListenerTree listenerTree,
+ final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
+ return new ResolveDataChangeEventsTask(candidate, listenerTree, notificationMgr);
}
}
/**
* A walking context, pretty much equivalent to an iterator, but it
- * exposes the undelying tree structure.
+ * exposes the underlying tree structure.
+ */
+ /*
+ * FIXME: BUG-1511: split this class out as ListenerWalker.
*/
public static final class Walker implements AutoCloseable {
private final Lock lock;
* only as long as the {@link org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker} instance through which it is reached remains
* unclosed.
*/
+ /*
+ * FIXME: BUG-1511: split this class out as ListenerNode.
+ */
public static final class Node implements StoreTreeNode<Node>, Identifiable<PathArgument> {
private final Collection<DataChangeListenerRegistration<?>> listeners = new ArrayList<>();
private final Map<PathArgument, Node> children = new HashMap<>();
import java.util.Collection;
import java.util.Map;
-
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.controller.md.sal.dom.store.impl.DatastoreTestTask.WriteTransactionCustomizer;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.top.level.list.NestedList;
import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
import org.opendaylight.yangtools.yang.common.QName;
private InMemoryDOMDataStore datastore;
private SchemaContext schemaContext;
+ private TestDCLExecutorService dclExecutorService;
@Before
public final void setup() throws Exception {
ModuleInfoBackedContext context = ModuleInfoBackedContext.create();
context.registerModuleInfo(moduleInfo);
schemaContext = context.tryToCreateSchemaContext().get();
+
+ dclExecutorService = new TestDCLExecutorService(
+ SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" ));
+
datastore = new InMemoryDOMDataStore("TEST",
- MoreExecutors.sameThreadExecutor());
+ MoreExecutors.sameThreadExecutor(), dclExecutorService );
datastore.onGlobalContextUpdated(schemaContext);
}
+ @After
+ public void tearDown() {
+ if( dclExecutorService != null ) {
+ dclExecutorService.shutdownNow();
+ }
+ }
+
public final DatastoreTestTask newTestTask() {
- return new DatastoreTestTask(datastore).cleanup(DatastoreTestTask
+ return new DatastoreTestTask(datastore, dclExecutorService).cleanup(DatastoreTestTask
.simpleDelete(TOP_LEVEL));
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
private WriteTransactionCustomizer cleanup;
private YangInstanceIdentifier changePath;
private DataChangeScope changeScope;
- private boolean postSetup = false;
+ private volatile boolean postSetup = false;
private final ChangeEventListener internalListener;
+ private final TestDCLExecutorService dclExecutorService;
- public DatastoreTestTask(final DOMStore datastore) {
+ public DatastoreTestTask(final DOMStore datastore, final TestDCLExecutorService dclExecutorService) {
this.store = datastore;
+ this.dclExecutorService = dclExecutorService;
internalListener = new ChangeEventListener();
}
return this;
}
- public void run() throws InterruptedException, ExecutionException {
+ public void run() throws InterruptedException, ExecutionException, TimeoutException {
if (setup != null) {
execute(setup);
}
}
Preconditions.checkState(write != null, "Write Transaction must be set.");
+
postSetup = true;
+ dclExecutorService.afterTestSetup();
+
execute(write);
if (registration != null) {
registration.close();
}
+
if (changeListener != null) {
- changeListener.onDataChanged(internalListener.receivedChange.get());
+ changeListener.onDataChanged(getChangeEvent());
}
if (read != null) {
read.verify(store.newReadOnlyTransaction());
}
}
- public Future<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> getChangeEvent() {
- return internalListener.receivedChange;
+ public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChangeEvent() {
+ try {
+ return internalListener.receivedChange.get(10, TimeUnit.SECONDS);
+ } catch( Exception e ) {
+ fail( "Error getting the AsyncDataChangeEvent from the Future: " + e );
+ }
+
+ // won't get here
+ return null;
+ }
+
+ public void verifyNoChangeEvent() {
+ try {
+ Object unexpected = internalListener.receivedChange.get(500, TimeUnit.MILLISECONDS);
+ fail( "Got unexpected AsyncDataChangeEvent from the Future: " + unexpected );
+ } catch( TimeoutException e ) {
+ // Expected
+ } catch( Exception e ) {
+ fail( "Error getting the AsyncDataChangeEvent from the Future: " + e );
+ }
}
private void execute(final WriteTransactionCustomizer writeCustomizer) throws InterruptedException,
abstract protected void customizeTask(DatastoreTestTask task);
@Test
- public final void putTopLevelOneNested() throws InterruptedException, ExecutionException {
+ public final void putTopLevelOneNested() throws Exception {
DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR));
customizeTask(task);
}
@Test
- public final void existingTopWriteSibling() throws InterruptedException, ExecutionException {
+ public final void existingTopWriteSibling() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test(
new WriteTransactionCustomizer() {
@Override
@Test
- public final void existingTopWriteTwoNested() throws InterruptedException, ExecutionException {
+ public final void existingTopWriteTwoNested() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test(
new WriteTransactionCustomizer() {
@Override
@Test
- public final void existingOneNestedWriteAdditionalNested() throws InterruptedException, ExecutionException {
+ public final void existingOneNestedWriteAdditionalNested() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test(
new WriteTransactionCustomizer() {
@Override
protected abstract void existingOneNestedWriteAdditionalNested(DatastoreTestTask task) throws InterruptedException, ExecutionException;
- protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws InterruptedException,
- ExecutionException;
+ protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws Exception;
@Test
- public final void replaceTopLevelNestedChanged() throws InterruptedException, ExecutionException {
+ public final void replaceTopLevelNestedChanged() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test(
writeOneTopMultipleNested(FOO, BAZ));
customizeTask(task);
ExecutionException;
@Test
- public final void putTopLevelWithTwoNested() throws InterruptedException, ExecutionException {
+ public final void putTopLevelWithTwoNested() throws Exception {
DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR, BAZ));
customizeTask(task);
ExecutionException;
@Test
- public final void twoNestedExistsOneIsDeleted() throws InterruptedException, ExecutionException {
+ public final void twoNestedExistsOneIsDeleted() throws Exception {
DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR, BAZ)).test(
deleteNested(FOO, BAZ));
ExecutionException;
@Test
- public final void nestedListExistsRootDeleted() throws InterruptedException, ExecutionException {
+ public final void nestedListExistsRootDeleted() throws Exception {
DatastoreTestTask task = newTestTask().cleanup(null).setup(writeOneTopMultipleNested(FOO, BAR, BAZ))
.test(DatastoreTestTask.simpleDelete(TOP_LEVEL));
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
+ MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
domStore.onGlobalContextUpdated(schemaContext);
}
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR));
assertEmpty(change.getUpdatedData());
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO, BAZ));
assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR), path(FOO, BAZ));
assertEmpty(change.getUpdatedData());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
protected void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAZ));
assertNotContains(change.getCreatedData(), path(FOO,BAR));
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ));
assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertContains(change.getUpdatedData(), TOP_LEVEL);
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
+ MoreExecutors.sameThreadExecutor());
loadSchemas(RockTheHouseInput.class);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * A forwarding Executor used by unit tests for DataChangeListener notifications
+ *
+ * @author Thomas Pantelis
+ */
+public class TestDCLExecutorService extends ForwardingExecutorService {
+
+ // Start with a same thread executor to avoid timing issues during test setup.
+ private volatile ExecutorService currentExecutor = MoreExecutors.sameThreadExecutor();
+
+ // The real executor to use when test setup is complete.
+ private final ExecutorService postSetupExecutor;
+
+
+ public TestDCLExecutorService( ExecutorService postSetupExecutor ) {
+ this.postSetupExecutor = postSetupExecutor;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return currentExecutor;
+ }
+
+ public void afterTestSetup() {
+ // Test setup complete - switch to the real executor.
+ currentExecutor = postSetupExecutor;
+ }
+}
\ No newline at end of file
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertContains(change.getCreatedData(), path(FOO, BAZ));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* Base listener should be notified only and only if actual node changed its state,
* since deletion of child, did not result in change of node we are listening
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL);
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertContains(change.getCreatedData(), path(FOO, BAZ));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- Future<?> future = task.getChangeEvent();
/*
* One listener should be notified only and only if actual node changed its state,
* since deletion of nested child (in this case /nested-list/nested-list[foo],
* and this means settable future containing receivedDataChangeEvent is not done.
*
*/
- assertFalse(future.isDone());
+ task.verifyNoChangeEvent();
}
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertNotContains(change.getUpdatedData(),path(FOO), TOP_LEVEL);
@Override
public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotContains(change.getCreatedData(), TOP_LEVEL);
assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR));
public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertContains(change.getCreatedData(), path(FOO, BAZ));
protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
assertTrue(change.getCreatedData().isEmpty());
assertContains(change.getUpdatedData(), path(FOO));
public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertEmpty(change.getCreatedData());
assertEmpty(change.getUpdatedData());
@Override
protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAZ));
assertNotContains(change.getCreatedData(), path(FOO,BAR));
@Override
protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ));
assertContains(change.getUpdatedData(), path(FOO));
@Override
protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+ AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertContains(change.getCreatedData(), path(FOO_SIBLING));
assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL);
return table.containsKey(routeId);
}
+ public Boolean isEmpty(){
+ return table.isEmpty();
+ }
///
/// Getter, Setters
///
import java.util.List;
import java.util.Map;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
/**
* Registry to look up cluster nodes that have registered for a given rpc.
- * <p>
+ * <p/>
* It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
* cluster wide information.
- *
*/
public class RpcRegistry extends UntypedActor {
if (message instanceof SetLocalRouter)
receiveSetLocalRouter((SetLocalRouter) message);
- if (message instanceof AddOrUpdateRoute)
- receiveAddRoute((AddOrUpdateRoute) message);
+ if (message instanceof AddOrUpdateRoutes)
+ receiveAddRoutes((AddOrUpdateRoutes) message);
- else if (message instanceof RemoveRoute)
- receiveRemoveRoute((RemoveRoute) message);
+ else if (message instanceof RemoveRoutes)
+ receiveRemoveRoutes((RemoveRoutes) message);
else if (message instanceof Messages.FindRouters)
- receiveGetRouter((Messages.FindRouters) message);
+ receiveGetRouter((FindRouters) message);
else
unhandled(message);
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
private void receiveSetLocalRouter(SetLocalRouter message) {
- if (message == null || message.getRouter() == null)
- return;//ignore
-
localRouter = message.getRouter();
}
/**
- * //TODO: update this to accept multiple route registration
* @param msg
*/
- private void receiveAddRoute(AddOrUpdateRoute msg) {
- if (msg.getRouteIdentifier() == null)
- return;//ignore
+ private void receiveAddRoutes(AddOrUpdateRoutes msg) {
Preconditions.checkState(localRouter != null, "Router must be set first");
Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
- futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
+ futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
/**
- * //TODO: update this to accept multiple routes
- * @param msg
+ * @param msg contains list of route ids to remove
*/
- private void receiveRemoveRoute(RemoveRoute msg) {
- if (msg.getRouteIdentifier() == null)
- return;//ignore
+ private void receiveRemoveRoutes(RemoveRoutes msg) {
Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
- futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
+ futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
/**
* Finds routers for the given rpc.
+ *
* @param msg
*/
- private void receiveGetRouter(Messages.FindRouters msg) {
+ private void receiveGetRouter(FindRouters msg) {
final ActorRef sender = getSender();
- //if empty message, return empty list
- if (msg.getRouteIdentifier() == null) {
- sender.tell(createEmptyReply(), getSelf());
- return;
- }
-
Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-
}
/**
/**
* Helper to create a reply when routers are found for the given rpc
+ *
* @param buckets
* @param routeId
* @return
private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
for (Bucket bucket : buckets.values()) {
RoutingTable table = (RoutingTable) bucket.getData();
-
if (table == null)
continue;
routerWithUpdateTime = table.getRouterFor(routeId);
-
if (routerWithUpdateTime.isEmpty())
continue;
///
/**
- * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+ * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
*
* @param routeId the rpc
* @param sender client who asked to find the routers.
* Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
* it updates the local bucket in bucket store.
*
- * @param routeId rpc to remote
+ * @param routeIds rpc to remote
* @return
*/
- private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
return new Mapper<Object, Void>() {
@Override
public Void apply(Object replyMessage) {
table = new RoutingTable();
table.setRouter(localRouter);
- table.removeRoute(routeId);
+ if (!table.isEmpty()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.removeRoute(routeId);
+ }
+ }
bucket.setData(table);
UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
* Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
* it updates the local bucket in bucket store.
*
- * @param routeId rpc to add
+ * @param routeIds rpc to add
* @return
*/
- private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+ private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
return new Mapper<Object, Void>() {
@Override
table = new RoutingTable();
table.setRouter(localRouter);
- table.addRoute(routeId);
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
+ table.addRoute(routeId);
+ }
bucket.setData(table);
public static class ContainsRoute {
- final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+ final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
- public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- Preconditions.checkArgument(routeIdentifier != null);
- this.routeIdentifier = routeIdentifier;
+ public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ Preconditions.checkArgument(routeIdentifiers != null &&
+ !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
+ this.routeIdentifiers = routeIdentifiers;
}
- public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
- return this.routeIdentifier;
+ public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+ return this.routeIdentifiers;
}
@Override
public String toString() {
- return this.getClass().getSimpleName() + "{" +
- "routeIdentifier=" + routeIdentifier +
+ return "ContainsRoute{" +
+ "routeIdentifiers=" + routeIdentifiers +
'}';
}
}
- public static class AddOrUpdateRoute extends ContainsRoute{
+ public static class AddOrUpdateRoutes extends ContainsRoute {
- public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- super(routeIdentifier);
+ public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
}
}
- public static class RemoveRoute extends ContainsRoute {
+ public static class RemoveRoutes extends ContainsRoute {
- public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- super(routeIdentifier);
+ public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+ super(routeIdentifiers);
}
}
- public static class SetLocalRouter{
+ public static class SetLocalRouter {
private final ActorRef router;
public SetLocalRouter(ActorRef router) {
+ Preconditions.checkArgument(router != null, "Router must not be null");
this.router = router;
}
- public ActorRef getRouter(){
+ public ActorRef getRouter() {
return this.router;
}
}
}
- public static class FindRouters extends ContainsRoute {
+ public static class FindRouters {
+ private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
- super(routeIdentifier);
+ Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+ this.routeIdentifier = routeIdentifier;
+ }
+
+ public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+ return routeIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return "FindRouters{" +
+ "routeIdentifier=" + routeIdentifier +
+ '}';
}
}
final List<Pair<ActorRef, Long>> routerWithUpdateTime;
public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+ Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
this.routerWithUpdateTime = routerWithUpdateTime;
}
- public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+ public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
return routerWithUpdateTime;
}
/**
* Gossiper that syncs bucket store across nodes in the cluster.
- * <p>
- * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
- * to a randomly selected remote gossiper.
- * <p>
- * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
- * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
- * gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p>
- * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
+ * <p/>
+ * It keeps a local scheduler that periodically sends Gossip ticks to
+ * itself to send bucket store's bucket versions to a randomly selected remote
+ * gossiper.
+ * <p/>
+ * When bucket versions are received from a remote gossiper, it is compared
+ * with bucket store's bucket versions. Which ever buckets are newer
+ * locally, are sent to remote gossiper. If any bucket is older in bucket store,
+ * a gossip status is sent to remote gossiper so that it can send the newer buckets.
+ * <p/>
+ * When a bucket is received from a remote gossiper, its sent to the bucket store
+ * for update.
*
*/
/**
* Helpful for testing
- * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
+ * @param autoStartGossipTicks used for turning off gossip ticks during testing.
+ * Gossip tick can be manually sent.
*/
public Gossiper(Boolean autoStartGossipTicks){
this.autoStartGossipTicks = autoStartGossipTicks;
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+ new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
* @param status bucket versions from a remote member
*/
void receiveGossipStatus(GossipStatus status){
- //Dont want to accept messages from non-members
+ //Don't accept messages from non-members
if (!clusterMembers.contains(status.from()))
return;
final ActorRef sender = getSender();
-
Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
-
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
}
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
- if (envelope.getBuckets() == null)
- return; //nothing to do
updateRemoteBuckets(envelope.getBuckets());
*/
void updateRemoteBuckets(Map<Address, Bucket> buckets) {
- if (buckets == null || buckets.isEmpty())
- return; //nothing to merge
-
UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
-
getContext().parent().tell(updateRemoteBuckets, getSelf());
}
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
-
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
-
}
/**
//Get local status from bucket store and send to remote
Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
-
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
}
/**
- * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
+ * Process bucket versions received from
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
* Then this method compares remote bucket versions with local bucket versions.
* <ul>
* <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ * to remote
* <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
- * remote sends GossipEnvelop.
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+ * to remote so that remote sends GossipEnvelop.
* </ul>
*
* @param sender the remote member
}
/**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
- * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
+ * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
+ * These buckets are sent to a remote member encapsulated in
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
*
* @param sender the remote member that sent
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.info("Buckets to send from {}: {}", selfAddress, buckets);
+ log.debug("Buckets to send from {}: {}", selfAddress, buckets);
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}
import java.util.Map;
import java.util.Set;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
+
+
/**
* These messages are used by {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} and
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} actors.
Map<Address, Long> versions;
public ContainsBucketVersions(Map<Address, Long> versions) {
- Preconditions.checkArgument(versions != null, "versions can not be null");
+ Preconditions.checkArgument(versions != null, "versions can not be null or empty");
+
this.versions = versions;
}
public static final class GossipTick extends Tick {}
- public static final class GossipStatus extends BucketStoreMessages.ContainsBucketVersions implements Serializable{
+ public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
private Address from;
public GossipStatus(Address from, Map<Address, Long> versions) {
}
}
- public static final class GossipEnvelope extends BucketStoreMessages.ContainsBuckets implements Serializable {
+ public static final class GossipEnvelope extends ContainsBuckets implements Serializable {
private final Address from;
private final Address to;
public GossipEnvelope(Address from, Address to, Map<Address, Bucket> buckets) {
super(buckets);
+ Preconditions.checkArgument(to != null, "Recipient of message must not be null");
this.to = to;
this.from = from;
}
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
return resolved;
}
- private AddOrUpdateRoute getAddRouteMessage() throws URISyntaxException {
- return new AddOrUpdateRoute(createRouteId());
+ private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
+ return new AddOrUpdateRoutes(createRouteIds());
+ }
+
+ private List<RpcRouter.RouteIdentifier<?,?,?>> createRouteIds() throws URISyntaxException {
+ QName type = new QName(new URI("/mockrpc"), "mockrpc");
+ List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
+ routeIds.add(new RouteIdentifierImpl(null, type, null));
+ return routeIds;
}
private RpcRouter.RouteIdentifier<?,?,?> createRouteId() throws URISyntaxException {
verify(mockGossiper, times(0)).updateRemoteBuckets(anyMap());
}
- @Test
- public void testUpdateRemoteBuckets_WhenNoBucketShouldIgnore(){
-
- mockGossiper.updateRemoteBuckets(null);
- verify(mockGossiper, times(0)).getContext();
-
- Map<Address, Bucket> empty = Collections.emptyMap();
- mockGossiper.updateRemoteBuckets(empty);
- verify(mockGossiper, times(0)).getContext();
- }
-
/**
* Create Gossiper actor and return the underlying instance of Gossiper class.
*
</services>
</data>
</configuration>
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:rest:connector?module=opendaylight-rest-connector&revision=2014-07-24</capability>
+ </required-capabilities>
</snapshot>
NormalizedNode<?, ?> data = null;
YangInstanceIdentifier normalizedII;
if (mountPoint != null) {
- normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData.getInstanceIdentifier());
+ normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData
+ .getInstanceIdentifier());
data = broker.readConfigurationData(mountPoint, normalizedII);
} else {
normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier());
NormalizedNode<?, ?> data = null;
YangInstanceIdentifier normalizedII;
if (mountPoint != null) {
- normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData.getInstanceIdentifier());
+ normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData
+ .getInstanceIdentifier());
data = broker.readOperationalData(mountPoint, normalizedII);
} else {
normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier());
try {
if (mountPoint != null) {
- normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData.getInstanceIdentifier());
+ normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData
+ .getInstanceIdentifier());
broker.commitConfigurationDataPut(mountPoint, normalizedII, datastoreNormalizedNode).get();
} else {
normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier());
try {
if (mountPoint != null) {
- normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData.getInstanceIdentifier());
+ normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData
+ .getInstanceIdentifier());
broker.commitConfigurationDataPost(mountPoint, normalizedII, datastoreNormalizedData);
} else {
normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier());
try {
if (mountPoint != null) {
- normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData.getInstanceIdentifier());
+ normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData
+ .getInstanceIdentifier());
broker.commitConfigurationDataPost(mountPoint, normalizedII, datastoreNormalizedData);
} else {
try {
if (mountPoint != null) {
- normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData.getInstanceIdentifier());
+ normalizedII = new DataNormalizer(mountPoint.getSchemaContext()).toNormalized(iiWithData
+ .getInstanceIdentifier());
broker.commitConfigurationDataDelete(mountPoint, normalizedII);
} else {
normalizedII = controllerContext.toNormalized(iiWithData.getInstanceIdentifier());
iiBuilder = YangInstanceIdentifier.builder(iiOriginal);
}
- iiBuilder.node(schemaOfData.getQName());
+ if ((schemaOfData instanceof ListSchemaNode)) {
+ HashMap<QName, Object> keys = this.resolveKeysFromData(((ListSchemaNode) schemaOfData), data);
+ iiBuilder.nodeWithKey(schemaOfData.getQName(), keys);
+ } else {
+ iiBuilder.node(schemaOfData.getQName());
+ }
YangInstanceIdentifier instance = iiBuilder.toInstance();
DOMMountPoint mountPoint = null;
return new InstanceIdWithSchemaNode(instance, schemaOfData, mountPoint);
}
+ private HashMap<QName, Object> resolveKeysFromData(final ListSchemaNode listNode, final CompositeNode dataNode) {
+ final HashMap<QName, Object> keyValues = new HashMap<QName, Object>();
+ List<QName> _keyDefinition = listNode.getKeyDefinition();
+ for (final QName key : _keyDefinition) {
+ SimpleNode<? extends Object> head = null;
+ String localName = key.getLocalName();
+ List<SimpleNode<? extends Object>> simpleNodesByName = dataNode.getSimpleNodesByName(localName);
+ if (simpleNodesByName != null) {
+ head = Iterables.getFirst(simpleNodesByName, null);
+ }
+
+ Object dataNodeKeyValueObject = null;
+ if (head != null) {
+ dataNodeKeyValueObject = head.getValue();
+ }
+
+ if (dataNodeKeyValueObject == null) {
+ throw new RestconfDocumentedException("Data contains list \"" + dataNode.getNodeType().getLocalName()
+ + "\" which does not contain key: \"" + key.getLocalName() + "\"", ErrorType.PROTOCOL,
+ ErrorTag.INVALID_VALUE);
+ }
+
+ keyValues.put(key, dataNodeKeyValueObject);
+ }
+
+ return keyValues;
+ }
+
private boolean endsWithMountPoint(final String identifier) {
return identifier.endsWith(ControllerContext.MOUNT) || identifier.endsWith(ControllerContext.MOUNT + "/");
}
"It wasn't possible to correctly interpret data."));
}
- private NormalizedNode<?, ?> compositeNodeToDatastoreNormalizedNode(final CompositeNode compNode, final DataSchemaNode schema) {
+ private NormalizedNode<?, ?> compositeNodeToDatastoreNormalizedNode(final CompositeNode compNode,
+ final DataSchemaNode schema) {
List<Node<?>> lst = new ArrayList<Node<?>>();
lst.add(compNode);
if (schema instanceof ContainerSchemaNode) {
"It wasn't possible to translate specified data to datastore readable form."));
}
- private InstanceIdWithSchemaNode normalizeInstanceIdentifierWithSchemaNode(final InstanceIdWithSchemaNode iiWithSchemaNode) {
+ private InstanceIdWithSchemaNode normalizeInstanceIdentifierWithSchemaNode(
+ final InstanceIdWithSchemaNode iiWithSchemaNode) {
return normalizeInstanceIdentifierWithSchemaNode(iiWithSchemaNode, false);
}
iiWithSchemaNode.getMountPoint());
}
- private YangInstanceIdentifier instanceIdentifierToReadableFormForNormalizeNode(final YangInstanceIdentifier instIdentifier,
- final boolean unwrapLastListNode) {
+ private YangInstanceIdentifier instanceIdentifierToReadableFormForNormalizeNode(
+ final YangInstanceIdentifier instIdentifier, final boolean unwrapLastListNode) {
Preconditions.checkNotNull(instIdentifier, "Instance identifier can't be null");
final List<PathArgument> result = new ArrayList<PathArgument>();
final Iterator<PathArgument> iter = instIdentifier.getPathArguments().iterator();
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-api</artifactId>
+ </dependency>
</dependencies>
<build>
*/
package org.opendaylight.controller.sal.rest.doc.impl;
+import com.google.common.base.Preconditions;
import javax.ws.rs.core.UriInfo;
-
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.rest.doc.swagger.ApiDeclaration;
import org.opendaylight.controller.sal.rest.doc.swagger.ResourceList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
- * This class gathers all yang defined {@link Module}s and generates Swagger
- * compliant documentation.
+ * This class gathers all yang defined {@link Module}s and generates Swagger compliant documentation.
*/
public class ApiDocGenerator extends BaseYangSwaggerGenerator {
*/
package org.opendaylight.controller.sal.rest.doc.impl;
+import static org.opendaylight.controller.sal.rest.doc.util.RestDocgenUtil.resolvePathArgumentsName;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.text.DateFormat;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-
import javax.ws.rs.core.UriInfo;
-
import org.json.JSONException;
import org.json.JSONObject;
import org.opendaylight.controller.sal.rest.doc.model.builder.OperationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-import com.google.common.base.Preconditions;
-
public class BaseYangSwaggerGenerator {
private static Logger _logger = LoggerFactory.getLogger(BaseYangSwaggerGenerator.class);
* @param operType
* @return list of modules converted to swagger compliant resource list.
*/
- public ResourceList getResourceListing(UriInfo uriInfo, SchemaContext schemaContext,
- String context) {
+ public ResourceList getResourceListing(UriInfo uriInfo, SchemaContext schemaContext, String context) {
ResourceList resourceList = createResourceList();
for (Module module : modules) {
String revisionString = SIMPLE_DATE_FORMAT.format(module.getRevision());
-
Resource resource = new Resource();
_logger.debug("Working on [{},{}]...", module.getName(), revisionString);
- ApiDeclaration doc = getApiDeclaration(module.getName(), revisionString, uriInfo,
- schemaContext, context);
+ ApiDeclaration doc = getApiDeclaration(module.getName(), revisionString, uriInfo, schemaContext, context);
if (doc != null) {
resource.setPath(generatePath(uriInfo, module.getName(), revisionString));
return uri.toASCIIString();
}
- public ApiDeclaration getApiDeclaration(String module, String revision, UriInfo uriInfo,
- SchemaContext schemaContext, String context) {
+ public ApiDeclaration getApiDeclaration(String module, String revision, UriInfo uriInfo, SchemaContext schemaContext, String context) {
Date rev = null;
try {
rev = SIMPLE_DATE_FORMAT.parse(revision);
throw new IllegalArgumentException(e);
}
Module m = schemaContext.findModuleByName(module, rev);
- Preconditions.checkArgument(m != null, "Could not find module by name,revision: " + module
- + "," + revision);
+ Preconditions.checkArgument(m != null, "Could not find module by name,revision: " + module + "," + revision);
- return getApiDeclaration(m, rev, uriInfo, schemaContext, context);
+ return getApiDeclaration(m, rev, uriInfo, context, schemaContext);
}
- public ApiDeclaration getApiDeclaration(Module module, Date revision, UriInfo uriInfo,
- SchemaContext schemaContext, String context) {
+ public ApiDeclaration getApiDeclaration(Module module, Date revision, UriInfo uriInfo, String context, SchemaContext schemaContext) {
String basePath = createBasePathFromUriInfo(uriInfo);
- ApiDeclaration doc = getSwaggerDocSpec(module, basePath, context);
+ ApiDeclaration doc = getSwaggerDocSpec(module, basePath, context, schemaContext);
if (doc != null) {
return doc;
}
portPart = ":" + port;
}
String basePath = new StringBuilder(uriInfo.getBaseUri().getScheme()).append("://")
- .append(uriInfo.getBaseUri().getHost()).append(portPart).append("/")
- .append(RESTCONF_CONTEXT_ROOT).toString();
+ .append(uriInfo.getBaseUri().getHost()).append(portPart).append("/").append(RESTCONF_CONTEXT_ROOT)
+ .toString();
return basePath;
}
- public ApiDeclaration getSwaggerDocSpec(Module m, String basePath, String context) {
+ public ApiDeclaration getSwaggerDocSpec(Module m, String basePath, String context, SchemaContext schemaContext) {
ApiDeclaration doc = createApiDeclaration(basePath);
List<Api> apis = new ArrayList<Api>();
for (DataSchemaNode node : dataSchemaNodes) {
if ((node instanceof ListSchemaNode) || (node instanceof ContainerSchemaNode)) {
- _logger.debug("Is Configuration node [{}] [{}]", node.isConfiguration(), node
- .getQName().getLocalName());
+ _logger.debug("Is Configuration node [{}] [{}]", node.isConfiguration(), node.getQName().getLocalName());
List<Parameter> pathParams = new ArrayList<Parameter>();
- String resourcePath = getDataStorePath("/config/", context) + m.getName() + ":";
- addApis(node, apis, resourcePath, pathParams, true);
+ String resourcePath = getDataStorePath("/config/", context);
+ addApis(node, apis, resourcePath, pathParams, schemaContext, true);
pathParams = new ArrayList<Parameter>();
- resourcePath = getDataStorePath("/operational/", context) + m.getName() + ":";
- addApis(node, apis, resourcePath, pathParams, false);
+ resourcePath = getDataStorePath("/operational/", context);
+ addApis(node, apis, resourcePath, pathParams, schemaContext, false);
}
Set<RpcDefinition> rpcs = m.getRpcs();
for (RpcDefinition rpcDefinition : rpcs) {
- String resourcePath = getDataStorePath("/operations/", context) + m.getName() + ":";
- addRpcs(rpcDefinition, apis, resourcePath);
+ String resourcePath = getDataStorePath("/operations/", context);
+ addRpcs(rpcDefinition, apis, resourcePath, schemaContext);
}
}
JSONObject models = null;
try {
- models = jsonConverter.convertToJsonSchema(m);
+ models = jsonConverter.convertToJsonSchema(m, schemaContext);
doc.setModels(models);
if (_logger.isDebugEnabled()) {
_logger.debug(mapper.writeValueAsString(doc));
return module + "(" + revision + ")";
}
- private void addApis(DataSchemaNode node, List<Api> apis, String parentPath,
- List<Parameter> parentPathParams, boolean addConfigApi) {
+ private void addApis(DataSchemaNode node, List<Api> apis, String parentPath, List<Parameter> parentPathParams, SchemaContext schemaContext,
+ boolean addConfigApi) {
Api api = new Api();
List<Parameter> pathParams = new ArrayList<Parameter>(parentPathParams);
- String resourcePath = parentPath + createPath(node, pathParams) + "/";
+ String resourcePath = parentPath + createPath(node, pathParams, schemaContext) + "/";
_logger.debug("Adding path: [{}]", resourcePath);
api.setPath(resourcePath);
api.setOperations(operations(node, pathParams, addConfigApi));
if (childNode instanceof ListSchemaNode || childNode instanceof ContainerSchemaNode) {
// keep config and operation attributes separate.
if (childNode.isConfiguration() == addConfigApi) {
- addApis(childNode, apis, resourcePath, pathParams, addConfigApi);
+ addApis(childNode, apis, resourcePath, pathParams, schemaContext, addConfigApi);
}
}
}
* @param pathParams
* @return
*/
- private List<Operation> operations(DataSchemaNode node, List<Parameter> pathParams,
- boolean isConfig) {
+ private List<Operation> operations(DataSchemaNode node, List<Parameter> pathParams, boolean isConfig) {
List<Operation> operations = new ArrayList<>();
OperationBuilder.Get getBuilder = new OperationBuilder.Get(node, isConfig);
return operations;
}
- private String createPath(final DataSchemaNode schemaNode, List<Parameter> pathParams) {
+ private String createPath(final DataSchemaNode schemaNode, List<Parameter> pathParams, SchemaContext schemaContext) {
ArrayList<LeafSchemaNode> pathListParams = new ArrayList<LeafSchemaNode>();
StringBuilder path = new StringBuilder();
- QName _qName = schemaNode.getQName();
- String localName = _qName.getLocalName();
+ String localName = resolvePathArgumentsName(schemaNode, schemaContext);
path.append(localName);
if ((schemaNode instanceof ListSchemaNode)) {
final List<QName> listKeys = ((ListSchemaNode) schemaNode).getKeyDefinition();
for (final QName listKey : listKeys) {
- {
- DataSchemaNode _dataChildByName = ((DataNodeContainer) schemaNode)
- .getDataChildByName(listKey);
- pathListParams.add(((LeafSchemaNode) _dataChildByName));
-
- String pathParamIdentifier = new StringBuilder("/{")
- .append(listKey.getLocalName()).append("}").toString();
- path.append(pathParamIdentifier);
-
- Parameter pathParam = new Parameter();
- pathParam.setName(listKey.getLocalName());
- pathParam.setDescription(_dataChildByName.getDescription());
- pathParam.setType("string");
- pathParam.setParamType("path");
-
- pathParams.add(pathParam);
- }
+ DataSchemaNode _dataChildByName = ((DataNodeContainer) schemaNode).getDataChildByName(listKey);
+ pathListParams.add(((LeafSchemaNode) _dataChildByName));
+
+ String pathParamIdentifier = new StringBuilder("/{").append(listKey.getLocalName()).append("}")
+ .toString();
+ path.append(pathParamIdentifier);
+
+ Parameter pathParam = new Parameter();
+ pathParam.setName(listKey.getLocalName());
+ pathParam.setDescription(_dataChildByName.getDescription());
+ pathParam.setType("string");
+ pathParam.setParamType("path");
+
+ pathParams.add(pathParam);
}
}
return path.toString();
}
- protected void addRpcs(RpcDefinition rpcDefn, List<Api> apis, String parentPath) {
+ protected void addRpcs(RpcDefinition rpcDefn, List<Api> apis, String parentPath, SchemaContext schemaContext) {
Api rpc = new Api();
- String resourcePath = parentPath + rpcDefn.getQName().getLocalName();
+ String resourcePath = parentPath + resolvePathArgumentsName(rpcDefn, schemaContext);
rpc.setPath(resourcePath);
Operation operationSpec = new Operation();
}
return sortedModules;
}
+
}
*/
package org.opendaylight.controller.sal.rest.doc.impl;
+import static org.opendaylight.controller.sal.rest.doc.util.RestDocgenUtil.resolveNodesName;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import org.json.JSONException;
import org.json.JSONObject;
import org.opendaylight.controller.sal.rest.doc.model.builder.OperationBuilder;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.api.AnyXmlSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
import org.opendaylight.yangtools.yang.model.api.ChoiceNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.TypeDefinition;
import org.opendaylight.yangtools.yang.model.api.type.BinaryTypeDefinition;
import org.opendaylight.yangtools.yang.model.api.type.BitsTypeDefinition;
private static final String NUMBER = "number";
private static final String BOOLEAN = "boolean";
private static final String STRING = "string";
- private static final String ID_KEY = "id";
- private static final String SUB_TYPES_KEY = "subTypes";
+ private static final String ID_KEY = "id";
+ private static final String SUB_TYPES_KEY = "subTypes";
private static final Map<Class<? extends TypeDefinition<?>>, String> YANG_TYPE_TO_JSON_TYPE_MAPPING;
YANG_TYPE_TO_JSON_TYPE_MAPPING = Collections.unmodifiableMap(tempMap1);
}
+ private Module topLevelModule;
+
public ModelGenerator() {
}
- public JSONObject convertToJsonSchema(Module module) throws IOException, JSONException {
+ public JSONObject convertToJsonSchema(Module module, SchemaContext schemaContext) throws IOException, JSONException {
JSONObject models = new JSONObject();
- processContainers(module, models);
- processRPCs(module, models);
- processIdentities(module, models);
+ topLevelModule = module;
+ processContainers(module, models, schemaContext);
+ processRPCs(module, models, schemaContext);
+ processIdentities(module, models);
return models;
}
- private void processContainers(Module module, JSONObject models) throws IOException,
+ private void processContainers(Module module, JSONObject models, SchemaContext schemaContext) throws IOException,
JSONException {
String moduleName = module.getName();
* For every container in the module
*/
if (childNode instanceof ContainerSchemaNode) {
- configModuleJSON = processContainer((ContainerSchemaNode) childNode, moduleName,
- true, models, true);
- operationalModuleJSON = processContainer((ContainerSchemaNode) childNode,
- moduleName, true, models, false);
+ configModuleJSON = processContainer((ContainerSchemaNode) childNode, moduleName, true, models, true,
+ schemaContext);
+ operationalModuleJSON = processContainer((ContainerSchemaNode) childNode, moduleName, true, models,
+ false, schemaContext);
}
if (configModuleJSON != null) {
}
/**
- * Process the RPCs for a Module Spits out a file each of the name
- * <rpcName>-input.json and <rpcName>-output.json for each RPC that contains
- * input & output elements
+ * Process the RPCs for a Module Spits out a file each of the name <rpcName>-input.json and <rpcName>-output.json
+ * for each RPC that contains input & output elements
*
* @param module
* @throws JSONException
* @throws IOException
*/
- private void processRPCs(Module module, JSONObject models) throws JSONException, IOException {
+ private void processRPCs(Module module, JSONObject models, SchemaContext schemaContext) throws JSONException,
+ IOException {
Set<RpcDefinition> rpcs = module.getRpcs();
String moduleName = module.getName();
ContainerSchemaNode input = rpc.getInput();
if (input != null) {
- JSONObject inputJSON = processContainer(input, moduleName, true, models);
+ JSONObject inputJSON = processContainer(input, moduleName, true, models, schemaContext);
String filename = "(" + rpc.getQName().getLocalName() + ")input";
inputJSON.put("id", filename);
// writeToFile(filename, inputJSON.toString(2), moduleName);
ContainerSchemaNode output = rpc.getOutput();
if (output != null) {
- JSONObject outputJSON = processContainer(output, moduleName, true, models);
+ JSONObject outputJSON = processContainer(output, moduleName, true, models, schemaContext);
String filename = "(" + rpc.getQName().getLocalName() + ")output";
outputJSON.put("id", filename);
models.put(filename, outputJSON);
}
}
- /**
- * Processes the 'identity' statement in a yang model
- * and maps it to a 'model' in the Swagger JSON spec.
- *
- * @param module The module from which the identity stmt will be processed
- * @param models The JSONObject in which the parsed identity will be put as a 'model' obj
- * @throws JSONException
- */
- private void processIdentities(Module module, JSONObject models) throws JSONException {
-
- String moduleName = module.getName();
- Set<IdentitySchemaNode> idNodes = module.getIdentities();
- _logger.debug("Processing Identities for module {} . Found {} identity statements", moduleName, idNodes.size());
-
- for(IdentitySchemaNode idNode : idNodes){
- JSONObject identityObj=new JSONObject();
- String identityName = idNode.getQName().getLocalName();
- _logger.debug("Processing Identity: {}", identityName);
-
- identityObj.put(ID_KEY, identityName);
- identityObj.put(DESCRIPTION_KEY, idNode.getDescription());
-
- JSONObject props = new JSONObject();
- IdentitySchemaNode baseId = idNode.getBaseIdentity();
+ /**
+ * Processes the 'identity' statement in a yang model and maps it to a 'model' in the Swagger JSON spec.
+ *
+ * @param module
+ * The module from which the identity stmt will be processed
+ * @param models
+ * The JSONObject in which the parsed identity will be put as a 'model' obj
+ * @throws JSONException
+ */
+ private void processIdentities(Module module, JSONObject models) throws JSONException {
+ String moduleName = module.getName();
+ Set<IdentitySchemaNode> idNodes = module.getIdentities();
+ _logger.debug("Processing Identities for module {} . Found {} identity statements", moduleName, idNodes.size());
+
+ for (IdentitySchemaNode idNode : idNodes) {
+ JSONObject identityObj = new JSONObject();
+ String identityName = idNode.getQName().getLocalName();
+ _logger.debug("Processing Identity: {}", identityName);
+
+ identityObj.put(ID_KEY, identityName);
+ identityObj.put(DESCRIPTION_KEY, idNode.getDescription());
+
+ JSONObject props = new JSONObject();
+ IdentitySchemaNode baseId = idNode.getBaseIdentity();
+
+ if (baseId == null) {
+ /**
+ * This is a base identity. So lets see if it has sub types. If it does, then add them to the model
+ * definition.
+ */
+ Set<IdentitySchemaNode> derivedIds = idNode.getDerivedIdentities();
+
+ if (derivedIds != null) {
+ JSONArray subTypes = new JSONArray();
+ for (IdentitySchemaNode derivedId : derivedIds) {
+ subTypes.put(derivedId.getQName().getLocalName());
+ }
+ identityObj.put(SUB_TYPES_KEY, subTypes);
+ }
+ } else {
+ /**
+ * This is a derived entity. Add it's base type & move on.
+ */
+ props.put(TYPE_KEY, baseId.getQName().getLocalName());
+ }
- if(baseId==null) {
- /**
- * This is a base identity. So lets see if
- * it has sub types. If it does, then add them to the model definition.
- */
- Set<IdentitySchemaNode> derivedIds = idNode.getDerivedIdentities();
-
- if(derivedIds != null) {
- JSONArray subTypes = new JSONArray();
- for(IdentitySchemaNode derivedId : derivedIds){
- subTypes.put(derivedId.getQName().getLocalName());
- }
- identityObj.put(SUB_TYPES_KEY, subTypes);
+ // Add the properties. For a base type, this will be an empty object as required by the Swagger spec.
+ identityObj.put(PROPERTIES_KEY, props);
+ models.put(identityName, identityObj);
}
- } else {
- /**
- * This is a derived entity. Add it's base type & move on.
- */
- props.put(TYPE_KEY, baseId.getQName().getLocalName());
- }
-
- //Add the properties. For a base type, this will be an empty object as required by the Swagger spec.
- identityObj.put(PROPERTIES_KEY, props);
- models.put(identityName, identityObj);
}
- }
+
/**
* Processes the container node and populates the moduleJSON
*
* @throws JSONException
* @throws IOException
*/
- private JSONObject processContainer(ContainerSchemaNode container, String moduleName,
- boolean addSchemaStmt, JSONObject models) throws JSONException, IOException {
- return processContainer(container, moduleName, addSchemaStmt, models, (Boolean) null);
+ private JSONObject processContainer(ContainerSchemaNode container, String moduleName, boolean addSchemaStmt,
+ JSONObject models, SchemaContext schemaContext) throws JSONException, IOException {
+ return processContainer(container, moduleName, addSchemaStmt, models, (Boolean) null, schemaContext);
}
- private JSONObject processContainer(ContainerSchemaNode container, String moduleName,
- boolean addSchemaStmt, JSONObject models, Boolean isConfig) throws JSONException,
- IOException {
+ private JSONObject processContainer(ContainerSchemaNode container, String moduleName, boolean addSchemaStmt,
+ JSONObject models, Boolean isConfig, SchemaContext schemaContext) throws JSONException, IOException {
JSONObject moduleJSON = getSchemaTemplate();
if (addSchemaStmt) {
moduleJSON = getSchemaTemplate();
String containerDescription = container.getDescription();
moduleJSON.put(DESCRIPTION_KEY, containerDescription);
- JSONObject properties = processChildren(container.getChildNodes(), moduleName, models, isConfig);
+ JSONObject properties = processChildren(container.getChildNodes(), container.getQName(), moduleName, models,
+ isConfig, schemaContext);
moduleJSON.put(PROPERTIES_KEY, properties);
return moduleJSON;
}
- private JSONObject processChildren(Iterable<DataSchemaNode> nodes, String moduleName,
- JSONObject models) throws JSONException, IOException {
- return processChildren(nodes, moduleName, models, null);
+ private JSONObject processChildren(Iterable<DataSchemaNode> nodes, QName parentQName, String moduleName,
+ JSONObject models, SchemaContext schemaContext) throws JSONException, IOException {
+ return processChildren(nodes, parentQName, moduleName, models, null, schemaContext);
}
/**
* Processes the nodes
*
* @param nodes
+ * @param parentQName
* @param moduleName
* @param isConfig
* @return
* @throws JSONException
* @throws IOException
*/
- private JSONObject processChildren(Iterable<DataSchemaNode> nodes, String moduleName,
- JSONObject models, Boolean isConfig) throws JSONException, IOException {
+ private JSONObject processChildren(Iterable<DataSchemaNode> nodes, QName parentQName, String moduleName,
+ JSONObject models, Boolean isConfig, SchemaContext schemaContext) throws JSONException, IOException {
JSONObject properties = new JSONObject();
for (DataSchemaNode node : nodes) {
if (isConfig == null || node.isConfiguration() == isConfig) {
- String name = node.getQName().getLocalName();
+ String name = resolveNodesName(node, topLevelModule, schemaContext);
JSONObject property = null;
if (node instanceof LeafSchemaNode) {
property = processLeafNode((LeafSchemaNode) node);
} else if (node instanceof ListSchemaNode) {
- property = processListSchemaNode((ListSchemaNode) node, moduleName, models, isConfig);
+ property = processListSchemaNode((ListSchemaNode) node, moduleName, models, isConfig, schemaContext);
} else if (node instanceof LeafListSchemaNode) {
property = processLeafListNode((LeafListSchemaNode) node);
} else if (node instanceof ChoiceNode) {
- property = processChoiceNode((ChoiceNode) node, moduleName, models);
+ property = processChoiceNode((ChoiceNode) node, moduleName, models, schemaContext);
} else if (node instanceof AnyXmlSchemaNode) {
property = processAnyXMLNode((AnyXmlSchemaNode) node);
} else if (node instanceof ContainerSchemaNode) {
- property = processContainer((ContainerSchemaNode) node, moduleName, false,
- models, isConfig);
+ property = processContainer((ContainerSchemaNode) node, moduleName, false, models, isConfig,
+ schemaContext);
} else {
- throw new IllegalArgumentException("Unknown DataSchemaNode type: "
- + node.getClass());
+ throw new IllegalArgumentException("Unknown DataSchemaNode type: " + node.getClass());
}
property.putOpt(DESCRIPTION_KEY, node.getDescription());
* @throws JSONException
* @throws IOException
*/
- private JSONObject processChoiceNode(ChoiceNode choiceNode, String moduleName, JSONObject models)
- throws JSONException, IOException {
+ private JSONObject processChoiceNode(ChoiceNode choiceNode, String moduleName, JSONObject models,
+ SchemaContext schemaContext) throws JSONException, IOException {
Set<ChoiceCaseNode> cases = choiceNode.getCases();
JSONArray choiceProps = new JSONArray();
for (ChoiceCaseNode choiceCase : cases) {
String choiceName = choiceCase.getQName().getLocalName();
- JSONObject choiceProp = processChildren(choiceCase.getChildNodes(), moduleName, models);
+ JSONObject choiceProp = processChildren(choiceCase.getChildNodes(), choiceCase.getQName(), moduleName,
+ models, schemaContext);
JSONObject choiceObj = new JSONObject();
choiceObj.put(choiceName, choiceProp);
choiceObj.put(TYPE_KEY, OBJECT_TYPE);
* @param props
* @throws JSONException
*/
- private void processConstraints(ConstraintDefinition constraints, JSONObject props)
- throws JSONException {
+ private void processConstraints(ConstraintDefinition constraints, JSONObject props) throws JSONException {
boolean isMandatory = constraints.isMandatory();
props.put(REQUIRED_KEY, isMandatory);
/**
* Parses a ListSchema node.
*
- * Due to a limitation of the RAML--->JAX-RS tool, sub-properties must be in
- * a separate JSON schema file. Hence, we have to write some properties to a
- * new file, while continuing to process the rest.
+ * Due to a limitation of the RAML--->JAX-RS tool, sub-properties must be in a separate JSON schema file. Hence, we
+ * have to write some properties to a new file, while continuing to process the rest.
*
* @param listNode
* @param moduleName
* @throws JSONException
* @throws IOException
*/
- private JSONObject processListSchemaNode(ListSchemaNode listNode, String moduleName,
- JSONObject models, Boolean isConfig) throws JSONException, IOException {
+ private JSONObject processListSchemaNode(ListSchemaNode listNode, String moduleName, JSONObject models,
+ Boolean isConfig, SchemaContext schemaContext) throws JSONException, IOException {
- String fileName = (BooleanUtils.isNotFalse(isConfig)?OperationBuilder.CONFIG:OperationBuilder.OPERATIONAL) +
- listNode.getQName().getLocalName();
+ String fileName = (BooleanUtils.isNotFalse(isConfig) ? OperationBuilder.CONFIG : OperationBuilder.OPERATIONAL)
+ + listNode.getQName().getLocalName();
- JSONObject childSchemaProperties = processChildren(listNode.getChildNodes(), moduleName, models);
+ JSONObject childSchemaProperties = processChildren(listNode.getChildNodes(), listNode.getQName(), moduleName,
+ models, schemaContext);
JSONObject childSchema = getSchemaTemplate();
childSchema.put(TYPE_KEY, OBJECT_TYPE);
childSchema.put(PROPERTIES_KEY, childSchemaProperties);
/*
- * Due to a limitation of the RAML--->JAX-RS tool, sub-properties must
- * be in a separate JSON schema file. Hence, we have to write some
- * properties to a new file, while continuing to process the rest.
+ * Due to a limitation of the RAML--->JAX-RS tool, sub-properties must be in a separate JSON schema file. Hence,
+ * we have to write some properties to a new file, while continuing to process the rest.
*/
// writeToFile(fileName, childSchema.toString(2), moduleName);
childSchema.put("id", fileName);
* @param property
* @throws JSONException
*/
- private void processTypeDef(TypeDefinition<?> leafTypeDef, JSONObject property)
- throws JSONException {
+ private void processTypeDef(TypeDefinition<?> leafTypeDef, JSONObject property) throws JSONException {
if (leafTypeDef instanceof ExtendedType) {
processExtendedType(leafTypeDef, property);
processUnionType((UnionTypeDefinition) leafTypeDef, property);
} else if (leafTypeDef instanceof IdentityrefTypeDefinition) {
- property.putOpt(TYPE_KEY, ((IdentityrefTypeDefinition) leafTypeDef).getIdentity().getQName().getLocalName());
+ property.putOpt(TYPE_KEY, ((IdentityrefTypeDefinition) leafTypeDef).getIdentity().getQName().getLocalName());
} else if (leafTypeDef instanceof BinaryTypeDefinition) {
processBinaryType((BinaryTypeDefinition) leafTypeDef, property);
} else {
* @param property
* @throws JSONException
*/
- private void processExtendedType(TypeDefinition<?> leafTypeDef, JSONObject property)
- throws JSONException {
+ private void processExtendedType(TypeDefinition<?> leafTypeDef, JSONObject property) throws JSONException {
Object leafBaseType = leafTypeDef.getBaseType();
if (leafBaseType instanceof ExtendedType) {
// recursively process an extended type until we hit a base type
processExtendedType((TypeDefinition<?>) leafBaseType, property);
} else {
- List<LengthConstraint> lengthConstraints = ((ExtendedType) leafTypeDef)
- .getLengthConstraints();
+ List<LengthConstraint> lengthConstraints = ((ExtendedType) leafTypeDef).getLengthConstraints();
for (LengthConstraint lengthConstraint : lengthConstraints) {
Number min = lengthConstraint.getMin();
Number max = lengthConstraint.getMax();
/*
*
*/
- private void processBinaryType(BinaryTypeDefinition binaryType, JSONObject property)
- throws JSONException {
+ private void processBinaryType(BinaryTypeDefinition binaryType, JSONObject property) throws JSONException {
property.put(TYPE_KEY, STRING);
JSONObject media = new JSONObject();
media.put(BINARY_ENCODING_KEY, BASE_64);
* @param property
* @throws JSONException
*/
- private void processEnumType(EnumerationType enumLeafType, JSONObject property)
- throws JSONException {
+ private void processEnumType(EnumerationType enumLeafType, JSONObject property) throws JSONException {
List<EnumPair> enumPairs = enumLeafType.getValues();
List<String> enumNames = new ArrayList<String>();
for (EnumPair enumPair : enumPairs) {
* @param property
* @throws JSONException
*/
- private void processBitsType(BitsTypeDefinition bitsType, JSONObject property)
- throws JSONException {
+ private void processBitsType(BitsTypeDefinition bitsType, JSONObject property) throws JSONException {
property.put(TYPE_KEY, ARRAY_TYPE);
property.put(MIN_ITEMS, 0);
property.put(UNIQUE_ITEMS_KEY, true);
* @param property
* @throws JSONException
*/
- private void processUnionType(UnionTypeDefinition unionType, JSONObject property)
- throws JSONException {
+ private void processUnionType(UnionTypeDefinition unionType, JSONObject property) throws JSONException {
StringBuilder type = new StringBuilder();
- for (TypeDefinition<?> typeDef : unionType.getTypes() ) {
- if( type.length() > 0 ){
- type.append( " or " );
+ for (TypeDefinition<?> typeDef : unionType.getTypes()) {
+ if (type.length() > 0) {
+ type.append(" or ");
}
type.append(YANG_TYPE_TO_JSON_TYPE_MAPPING.get(typeDef.getClass()));
}
- property.put(TYPE_KEY, type );
+ property.put(TYPE_KEY, type);
}
/**
return schemaJSON;
}
+
}
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-
import javax.ws.rs.core.UriInfo;
-
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.rest.doc.util;
+
+import java.net.URI;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
+
+public class RestDocgenUtil {
+
+ private RestDocgenUtil() {
+ }
+
+ private static Map<URI, Map<Date, Module>> namespaceAndRevisionToModule = new HashMap<URI, Map<Date, Module>>();
+
+ /**
+ * Resolve path argument name for {@code node}.
+ *
+ * The name can contain also prefix which consists of module name followed by colon. The module prefix is presented
+ * if namespace of {@code node} and its parent is different. In other cases only name of {@code node} is returned.
+ *
+ * @return name of {@code node}
+ */
+ public static String resolvePathArgumentsName(final SchemaNode node, final SchemaContext schemaContext) {
+ Iterable<QName> schemaPath = node.getPath().getPathTowardsRoot();
+ Iterator<QName> it = schemaPath.iterator();
+ QName nodeQName = it.next();
+
+ QName parentQName = null;
+ if (it.hasNext()) {
+ parentQName = it.next();
+ }
+ if (isEqualNamespaceAndRevision(parentQName, nodeQName)) {
+ return node.getQName().getLocalName();
+ } else {
+ return resolveFullNameFromNode(node, schemaContext);
+ }
+ }
+
+ private synchronized static String resolveFullNameFromNode(final SchemaNode node, final SchemaContext schemaContext) {
+ final URI namespace = node.getQName().getNamespace();
+ final Date revision = node.getQName().getRevision();
+
+ Map<Date, Module> revisionToModule = namespaceAndRevisionToModule.get(namespace);
+ if (revisionToModule == null) {
+ revisionToModule = new HashMap<>();
+ namespaceAndRevisionToModule.put(namespace, revisionToModule);
+ }
+ Module module = revisionToModule.get(revision);
+ if (module == null) {
+ module = schemaContext.findModuleByNamespaceAndRevision(namespace, revision);
+ revisionToModule.put(revision, module);
+ }
+ if (module != null) {
+ return module.getName() + ":" + node.getQName().getLocalName();
+ }
+ return node.getQName().getLocalName();
+ }
+
+ public static String resolveNodesName(final SchemaNode node, final Module module, final SchemaContext schemaContext) {
+ if (node.getQName().getNamespace().equals(module.getQNameModule().getNamespace())
+ && node.getQName().getRevision().equals(module.getQNameModule().getRevision())) {
+ return node.getQName().getLocalName();
+ } else {
+ return resolveFullNameFromNode(node, schemaContext);
+ }
+ }
+
+ private static boolean isEqualNamespaceAndRevision(QName parentQName, QName nodeQName) {
+ if (parentQName == null) {
+ if (nodeQName == null) {
+ return true;
+ }
+ return false;
+ }
+ return parentQName.getNamespace().equals(nodeQName.getNamespace())
+ && parentQName.getRevision().equals(nodeQName.getRevision());
+ }
+}
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
-
import javax.ws.rs.core.UriInfo;
-
import junit.framework.Assert;
-
+import org.json.JSONException;
+import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.rest.doc.swagger.Resource;
import org.opendaylight.controller.sal.rest.doc.swagger.ResourceList;
import org.opendaylight.yangtools.yang.model.api.Module;
-
-import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
/**
*
public static final String HTTP_HOST = "http://host";
private ApiDocGenerator generator;
private DocGenTestHelper helper;
+ private SchemaContext schemaContext;
@Before
public void setUp() throws Exception {
generator = new ApiDocGenerator();
helper = new DocGenTestHelper();
helper.setUp();
+ schemaContext = new YangParserImpl().resolveSchemaContext(new HashSet<Module>(helper.getModules().values()));
}
@After
for (Entry<File, Module> m : helper.getModules().entrySet()) {
if (m.getKey().getAbsolutePath().endsWith("toaster_short.yang")) {
ApiDeclaration doc = generator.getSwaggerDocSpec(m.getValue(),
- "http://localhost:8080/restconf", "");
+ "http://localhost:8080/restconf", "",schemaContext);
validateToaster(doc);
+ validateTosterDocContainsModulePrefixes(doc);
Assert.assertNotNull(doc);
}
}
for (Entry<File, Module> m : helper.getModules().entrySet()) {
if (m.getKey().getAbsolutePath().endsWith("toaster.yang")) {
ApiDeclaration doc = generator.getSwaggerDocSpec(m.getValue(),
- "http://localhost:8080/restconf", "");
+ "http://localhost:8080/restconf", "",schemaContext);
Assert.assertNotNull(doc);
//testing bugs.opendaylight.org bug 1290. UnionType model type.
}
}
+ /**
+ * Tests whether from yang files are generated all required paths for HTTP operations (GET, DELETE, PUT, POST)
+ *
+ * If container | list is augmented then in path there should be specified module name followed with collon (e. g.
+ * "/config/module1:element1/element2/module2:element3")
+ *
+ * @param doc
+ * @throws Exception
+ */
private void validateToaster(ApiDeclaration doc) throws Exception {
Set<String> expectedUrls = new TreeSet<>(Arrays.asList(new String[] {
"/config/toaster2:toaster/", "/operational/toaster2:toaster/",
"/operations/toaster2:cancel-toast", "/operations/toaster2:make-toast",
- "/operations/toaster2:restock-toaster" }));
+ "/operations/toaster2:restock-toaster",
+ "/config/toaster2:toaster/toasterSlot/{slotId}/toaster-augmented:slotInfo/" }));
Set<String> actualUrls = new TreeSet<>();
@Test
public void testGetResourceListing() throws Exception {
UriInfo info = helper.createMockUriInfo(HTTP_HOST);
- SchemaService mockSchemaService = helper.createMockSchemaService();
+ SchemaService mockSchemaService = helper.createMockSchemaService(schemaContext);
generator.setSchemaService(mockSchemaService);
assertEquals(HTTP_HOST + "/toaster2(2009-11-20)", toaster2.getPath());
}
+ private void validateTosterDocContainsModulePrefixes(ApiDeclaration doc) {
+ JSONObject topLevelJson = doc.getModels();
+ try {
+ JSONObject configToaster = topLevelJson.getJSONObject("(config)toaster");
+ assertNotNull("(config)toaster JSON object missing", configToaster);
+ //without module prefix
+ containsProperties(configToaster, "toasterSlot");
+
+ JSONObject toasterSlot = topLevelJson.getJSONObject("(config)toasterSlot");
+ assertNotNull("(config)toasterSlot JSON object missing", toasterSlot);
+ //with module prefix
+ containsProperties(toasterSlot, "toaster-augmented:slotInfo");
+
+ } catch (JSONException e) {
+ fail("Json exception while reading JSON object. Original message "+e.getMessage());
+ }
+ }
+
+ private void containsProperties(final JSONObject jsonObject,final String...properties) throws JSONException {
+ for (String property : properties) {
+ JSONObject propertiesObject = jsonObject.getJSONObject("properties");
+ assertNotNull("Properties object missing in ", propertiesObject);
+ JSONObject concretePropertyObject = propertiesObject.getJSONObject(property);
+ assertNotNull(property + " is missing",concretePropertyObject);
+ }
+ }
}
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
import java.io.File;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
-
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.parser.api.YangModelParser;
+import org.opendaylight.yangtools.yang.model.parser.api.YangContextParser;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-
public class DocGenTestHelper {
private Map<File, Module> modules;
URISyntaxException {
URI resourceDirUri = getClass().getResource(resourceDirectory).toURI();
- final YangModelParser parser = new YangParserImpl();
+ final YangContextParser parser = new YangParserImpl();
final File testDir = new File(resourceDirUri);
final String[] fileList = testDir.list();
final List<File> testFiles = new ArrayList<>();
final ArgumentCaptor<String> moduleCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<Date> dateCapture = ArgumentCaptor.forClass(Date.class);
+ final ArgumentCaptor<URI> namespaceCapture = ArgumentCaptor.forClass(URI.class);
when(mockContext.findModuleByName(moduleCapture.capture(), dateCapture.capture())).then(
new Answer<Module>() {
@Override
return null;
}
});
+ when(mockContext.findModuleByNamespaceAndRevision(namespaceCapture.capture(), dateCapture.capture())).then(
+ new Answer<Module>() {
+ @Override
+ public Module answer(InvocationOnMock invocation) throws Throwable {
+ URI namespace = namespaceCapture.getValue();
+ Date date = dateCapture.getValue();
+ for (Module m : modules.values()) {
+ if (m.getNamespace().equals(namespace) && m.getRevision().equals(date)) {
+ return m;
+ }
+ }
+ return null;
+ }
+ });
return mockContext;
}
import java.net.URISyntaxException;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-
import javax.ws.rs.core.UriInfo;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.rest.doc.swagger.ResourceList;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
public class MountPointSwaggerTest {
private static final String INSTANCE_URL = "nodes/node/123/";
private MountPointSwagger swagger;
private DocGenTestHelper helper;
+ private SchemaContext schemaContext;
@Before
public void setUp() throws Exception {
swagger = new MountPointSwagger();
helper = new DocGenTestHelper();
helper.setUp();
+ schemaContext = new YangParserImpl().resolveSchemaContext(new HashSet<Module>(helper.getModules().values()));
}
@Test()
--- /dev/null
+module toaster-augmented {
+
+ yang-version 1;
+
+ namespace
+ "http://netconfcentral.org/ns/toaster/augmented";
+
+ prefix toast;
+ import toaster2 {prefix tst; revision-date 2009-11-20;}
+
+ revision "2014-7-14" {
+ }
+
+ augment "/tst:toaster/tst:toasterSlot" {
+ container slotInfo {
+ leaf numberOfToastPrepared {
+ type uint32;
+ }
+ }
+ }
+}
\ No newline at end of file
Microsoft Toaster.";
}
+ list toasterSlot {
+ key "slotId";
+ leaf slotId {
+ type string;
+ }
+ }
+
leaf toasterModelNumber {
type DisplayString;
config false;