<repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
<feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
<feature version='${project.version}'>odl-mdsal-broker</feature>
- <feature version='${project.version}'>odl-mdsal-clustering</feature>
+ <feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${project.version}'>odl-mdsal-xsql</feature>
<feature version='${project.version}'>odl-toaster</feature>
</feature>
<configfile finalname='${config.configfile.directory}/${config.netconf.mdsal.configfile}'>mvn:org.opendaylight.controller/netconf-mdsal-config/${netconf.version}/xml/config</configfile>
</feature>
- <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
+ <feature name='odl-mdsal-broker-local' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
<feature version='${yangtools.version}'>odl-yangtools-common</feature>
<feature version='${yangtools.version}'>odl-yangtools-binding</feature>
<feature version='${yangtools.version}'>odl-yangtools-models</feature>
<configfile finalname="${config.configfile.directory}/${config.xsql.configfile}">mvn:org.opendaylight.controller/sal-dom-xsql-config/${project.version}/xml/config</configfile>
</feature>
<feature name ='odl-mdsal-clustering-commons' version='${project.version}'>
- <feature version='${project.version}'>odl-mdsal-broker</feature>
+ <feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${akka.version}'>odl-akka-system</feature>
<feature version='${akka.version}'>odl-akka-persistence</feature>
<bundle>mvn:org.opendaylight.controller/sal-clustering-commons/${project.version}</bundle>
<bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
</feature>
<feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
- <feature version='${project.version}'>odl-mdsal-broker</feature>
+ <feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
<feature version='${akka.version}'>odl-akka-clustering</feature>
<bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version}</bundle>
</feature>
<feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
- <feature version='${project.version}'>odl-mdsal-broker</feature>
+ <feature version='${project.version}'>odl-mdsal-broker-local</feature>
<feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
<feature version='${akka.version}'>odl-akka-clustering</feature>
<feature version='0.7'>odl-akka-leveldb</feature>
<bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version}</bundle>
</feature>
- <feature name ='odl-mdsal-clustering' version='${project.version}'>
+ <feature name ='odl-mdsal-broker' version='${project.version}'>
<feature version='${project.version}'>odl-mdsal-remoterpc-connector</feature>
<feature version='${project.version}'>odl-mdsal-distributed-datastore</feature>
<configfile finalname="${config.configfile.directory}/${config.clustering.configfile}">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config</configfile>
<configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
<configfile finalname="etc/org.opendaylight.controller.cluster.datastore.cfg">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/cfg/datastore</configfile>
</feature>
-
+ <feature name ='odl-mdsal-clustering' version='${project.version}'>
+ <feature version='${project.version}'>odl-mdsal-broker</feature>
+ </feature>
<feature name='odl-clustering-test-app' version='${project.version}'>
<feature version='${yangtools.version}'>odl-yangtools-models</feature>
- <feature version='${project.version}'>odl-mdsal-broker</feature>
+ <feature version='${project.version}'>odl-mdsal-broker-local</feature>
<bundle>mvn:org.opendaylight.controller.samples/clustering-it-model/${project.version}</bundle>
<bundle>mvn:org.opendaylight.controller.samples/clustering-it-provider/${project.version}</bundle>
<configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
* The current state determines the current behavior of a RaftActor
* A Raft Actor always starts off in the Follower State
*/
- private RaftActorBehavior currentBehavior;
+ private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
/**
* This context should NOT be passed directly to any other actor it is
private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
- /**
- * The in-memory journal
- */
- private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
- -1, -1, replicatedLog, peerAddresses,
- (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
- LOG);
+ -1, -1, peerAddresses,
+ (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG);
+
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
}
private void initRecoveryTimer() {
} else if (message instanceof ApplyJournalEntries) {
onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
} else if (message instanceof UpdateElectionTerm) {
context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
((UpdateElectionTerm) message).getVotedFor());
// Create a replicated log with the snapshot information
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
- replicatedLog = new ReplicatedLogImpl(snapshot);
- context.setReplicatedLog(replicatedLog);
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+ currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog.size(), persistenceId(), timer.toString(),
- replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
+ replicatedLog().size(), persistenceId(), timer.toString(),
+ replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
}
- replicatedLog.append(logEntry);
+ replicatedLog().append(logEntry);
}
private void onRecoveredApplyLogEntries(long toIndex) {
}
for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog.get(i));
+ batchRecoveredLogEntry(replicatedLog().get(i));
}
context.setLastApplied(toIndex);
"Persistence Id = " + persistenceId() +
" Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm(), replicatedLog.size());
+ replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
initializeBehavior();
}
}
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
- reusableBehaviorStateHolder.init(currentBehavior);
- currentBehavior = newBehavior;
- handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+ reusableBehaviorStateHolder.init(getCurrentBehavior());
+ setCurrentBehavior(newBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
@Override public void handleCommand(Object message) {
applySnapshot(snapshot.getState());
//clears the followers log, sets the snapshot index to ensure adjusted-index works
- replicatedLog = new ReplicatedLogImpl(snapshot);
- context.setReplicatedLog(replicatedLog);
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+ currentBehavior));
context.setLastApplied(snapshot.getLastAppliedIndex());
} else if (message instanceof FindLeader) {
} else if (message.equals(COMMIT_SNAPSHOT)) {
commitSnapshot(-1);
} else {
- reusableBehaviorStateHolder.init(currentBehavior);
+ reusableBehaviorStateHolder.init(getCurrentBehavior());
- currentBehavior = currentBehavior.handleMessage(getSender(), message);
+ setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
- handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+ handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
}
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
- .inMemoryJournalDataSize(replicatedLog.dataSize())
- .inMemoryJournalLogSize(replicatedLog.size())
+ .inMemoryJournalDataSize(replicatedLog().dataSize())
+ .inMemoryJournalLogSize(replicatedLog().size())
.isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
.lastApplied(context.getLastApplied())
- .lastIndex(replicatedLog.lastIndex())
- .lastTerm(replicatedLog.lastTerm())
+ .lastIndex(replicatedLog().lastIndex())
+ .lastTerm(replicatedLog().lastTerm())
.leader(getLeaderId())
.raftState(currentBehavior.state().toString())
.replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
- .snapshotIndex(replicatedLog.getSnapshotIndex())
- .snapshotTerm(replicatedLog.getSnapshotTerm())
+ .snapshotIndex(replicatedLog().getSnapshotIndex())
+ .snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
.peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
builder.lastLogTerm(lastLogEntry.getTerm());
}
- if(currentBehavior instanceof AbstractLeader) {
- AbstractLeader leader = (AbstractLeader)currentBehavior;
+ if(getCurrentBehavior() instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
Collection<String> followerIds = leader.getFollowerIds();
List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
for(String id: followerIds) {
final RaftActorContext raftContext = getRaftActorContext();
- replicatedLog
- .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
- if(!hasFollowers()){
- // Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry.getIndex());
- raftContext.setLastApplied(replicatedLogEntry.getIndex());
+ replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
+ @Override
+ public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+ if(!hasFollowers()){
+ // Increment the Commit Index and the Last Applied values
+ raftContext.setCommitIndex(replicatedLogEntry.getIndex());
+ raftContext.setLastApplied(replicatedLogEntry.getIndex());
- // Apply the state immediately
- applyState(clientActor, identifier, data);
+ // Apply the state immediately
+ applyState(clientActor, identifier, data);
- // Send a ApplyJournalEntries message so that we write the fact that we applied
- // the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
+ // the state to durable storage
+ self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
- context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+ context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
- } else if (clientActor != null) {
- // Send message for replication
- currentBehavior.handleMessage(getSelf(),
- new Replicate(clientActor, identifier,
- replicatedLogEntry)
- );
- }
+ } else if (clientActor != null) {
+ // Send message for replication
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier, replicatedLogEntry));
+ }
+ }
+ });
+ }
- }
- }); }
+ private ReplicatedLog replicatedLog() {
+ return context.getReplicatedLog();
+ }
protected String getId() {
return context.getId();
}
+ @VisibleForTesting
+ void setCurrentBehavior(RaftActorBehavior behavior) {
+ currentBehavior.setDelegate(behavior);
+ }
+
+ protected RaftActorBehavior getCurrentBehavior() {
+ return currentBehavior.getDelegate();
+ }
+
/**
* Derived actors can call the isLeader method to check if the current
* RaftActor is the Leader or not
}
protected ReplicatedLogEntry getLastLogEntry() {
- return replicatedLog.last();
+ return replicatedLog().last();
}
protected Long getCurrentTerm(){
private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
- context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
- }
-
- protected long getTotalMemory() {
- return Runtime.getRuntime().totalMemory();
+ context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
}
protected boolean hasFollowers(){
- return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
- }
-
- private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
- private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0L;
-
-
- public ReplicatedLogImpl(Snapshot snapshot) {
- super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries());
- }
-
- public ReplicatedLogImpl() {
- super();
- }
-
- @Override public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
- // FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
-
- @Override
- public void apply(DeleteEntries param)
- throws Exception {
- //FIXME : Doing nothing for now
- dataSize = 0;
- for (ReplicatedLogEntry entry : journal) {
- dataSize += entry.size();
- }
- }
- });
- }
-
- @Override public void appendAndPersist(
- final ReplicatedLogEntry replicatedLogEntry) {
- appendAndPersist(replicatedLogEntry, null);
- }
-
- public void appendAndPersist(
- final ReplicatedLogEntry replicatedLogEntry,
- final Procedure<ReplicatedLogEntry> callback) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
- }
-
- // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- journal.add(replicatedLogEntry);
-
- // When persisting events with persist it is guaranteed that the
- // persistent actor will not receive further commands between the
- // persist call and the execution(s) of the associated event
- // handler. This also holds for multiple persist calls in context
- // of a single command.
- persistence().persist(replicatedLogEntry,
- new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(ReplicatedLogEntry evt) throws Exception {
- int logEntrySize = replicatedLogEntry.size();
-
- dataSize += logEntrySize;
- long dataSizeForCheck = dataSize;
-
- dataSizeSinceLastSnapshot += logEntrySize;
-
- if (!hasFollowers()) {
- // When we do not have followers we do not maintain an in-memory log
- // due to this the journalSize will never become anything close to the
- // snapshot batch count. In fact will mostly be 1.
- // Similarly since the journal's dataSize depends on the entries in the
- // journal the journal's dataSize will never reach a value close to the
- // memory threshold.
- // By maintaining the dataSize outside the journal we are tracking essentially
- // what we have written to the disk however since we no longer are in
- // need of doing a snapshot just for the sake of freeing up memory we adjust
- // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
- // as if we were maintaining a real snapshot
- dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
- }
- long journalSize = replicatedLogEntry.getIndex() + 1;
- long dataThreshold = getTotalMemory() *
- context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
- || dataSizeForCheck > dataThreshold)) {
-
- boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
- currentBehavior.getReplicatedToAllIndex());
-
- if(started){
- dataSizeSinceLastSnapshot = 0;
- }
-
- }
-
- if (callback != null){
- callback.apply(replicatedLogEntry);
- }
- }
- }
- );
- }
-
+ return getRaftActorContext().hasFollowers();
}
static class DeleteEntries implements Serializable {
}
}
- @VisibleForTesting
- void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
- currentBehavior = behavior;
- }
-
- protected RaftActorBehavior getCurrentBehavior() {
- return currentBehavior;
- }
-
private static class BehaviorStateHolder {
private RaftActorBehavior behavior;
private String leaderId;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
import java.util.Map;
import org.slf4j.Logger;
SnapshotManager getSnapshotManager();
+ boolean hasFollowers();
+
+ long getTotalMemory();
+
+ @VisibleForTesting
+ void setTotalMemoryRetriever(Supplier<Long> retriever);
+
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
import java.util.Map;
import org.slf4j.Logger;
private ConfigParams configParams;
- private boolean snapshotCaptureInitiated;
+ @VisibleForTesting
+ private Supplier<Long> totalMemoryRetriever;
// Snapshot manager will need to be created on demand as it needs raft actor context which cannot
// be passed to it in the constructor
private SnapshotManager snapshotManager;
- public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
- String id,
- ElectionTerm termInformation, long commitIndex,
- long lastApplied, ReplicatedLog replicatedLog,
- Map<String, String> peerAddresses, ConfigParams configParams,
- Logger logger) {
+ public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+ ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
+ ConfigParams configParams, Logger logger) {
this.actor = actor;
this.context = context;
this.id = id;
this.termInformation = termInformation;
this.commitIndex = commitIndex;
this.lastApplied = lastApplied;
- this.replicatedLog = replicatedLog;
this.peerAddresses = peerAddresses;
this.configParams = configParams;
this.LOG = logger;
peerAddresses.put(peerId, peerAddress);
}
+ @Override
public SnapshotManager getSnapshotManager() {
if(snapshotManager == null){
snapshotManager = new SnapshotManager(this, LOG);
}
return snapshotManager;
}
+
+ @Override
+ public long getTotalMemory() {
+ return totalMemoryRetriever != null ? totalMemoryRetriever.get() : Runtime.getRuntime().totalMemory();
+ }
+
+ @Override
+ public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+ totalMemoryRetriever = retriever;
+ }
+
+ @Override
+ public boolean hasFollowers() {
+ return getPeerAddresses().keySet().size() > 0;
+ }
}
package org.opendaylight.controller.cluster.raft;
+import akka.japi.Procedure;
import java.util.List;
/**
*/
void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
+ void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
+
/**
*
* @param index the index of the log entry
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+/**
+ * Implementation of ReplicatedLog used by the RaftActor.
+ */
+class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+ private static final int DATA_SIZE_DIVIDER = 5;
+
+ private long dataSizeSinceLastSnapshot = 0L;
+ private final RaftActorContext context;
+ private final DataPersistenceProvider persistence;
+ private final RaftActorBehavior currentBehavior;
+
+ private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
+ @Override
+ public void apply(DeleteEntries param) {
+ dataSize = 0;
+ for (ReplicatedLogEntry entry : journal) {
+ dataSize += entry.size();
+ }
+ }
+ };
+
+ static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
+ DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+ snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+ }
+
+ static ReplicatedLog newInstance(RaftActorContext context,
+ DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
+ persistence, currentBehavior);
+ }
+
+ private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
+ RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+ super(snapshotIndex, snapshotTerm, unAppliedEntries);
+ this.context = context;
+ this.persistence = persistence;
+ this.currentBehavior = currentBehavior;
+ }
+
+ @Override
+ public void removeFromAndPersist(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+
+ if (adjustedIndex < 0) {
+ return;
+ }
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+ }
+
+ @Override
+ public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(replicatedLogEntry, null);
+ }
+
+ @Override
+ public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback) {
+
+ if(context.getLogger().isDebugEnabled()) {
+ context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
+ }
+
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
+ persistence.persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ @Override
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ int logEntrySize = replicatedLogEntry.size();
+
+ dataSize += logEntrySize;
+ long dataSizeForCheck = dataSize;
+
+ dataSizeSinceLastSnapshot += logEntrySize;
+
+ if (!context.hasFollowers()) {
+ // When we do not have followers we do not maintain an in-memory log
+ // due to this the journalSize will never become anything close to the
+ // snapshot batch count. In fact will mostly be 1.
+ // Similarly since the journal's dataSize depends on the entries in the
+ // journal the journal's dataSize will never reach a value close to the
+ // memory threshold.
+ // By maintaining the dataSize outside the journal we are tracking essentially
+ // what we have written to the disk however since we no longer are in
+ // need of doing a snapshot just for the sake of freeing up memory we adjust
+ // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+ // as if we were maintaining a real snapshot
+ dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+ }
+ long journalSize = replicatedLogEntry.getIndex() + 1;
+ long dataThreshold = context.getTotalMemory() *
+ context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+ if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+ || dataSizeForCheck > dataThreshold)) {
+
+ boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+ currentBehavior.getReplicatedToAllIndex());
+
+ if(started){
+ dataSizeSinceLastSnapshot = 0;
+ }
+ }
+
+ if (callback != null){
+ callback.apply(replicatedLogEntry);
+ }
+ }
+ }
+ );
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * A RaftActorBehavior implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingRaftActorBehavior implements RaftActorBehavior {
+ private RaftActorBehavior delegate;
+
+ public RaftActorBehavior getDelegate() {
+ return delegate;
+ }
+
+ public void setDelegate(RaftActorBehavior delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void close() throws Exception {
+ delegate.close();
+ }
+
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ return delegate.handleMessage(sender, message);
+ }
+
+ @Override
+ public RaftState state() {
+ return delegate.state();
+ }
+
+ @Override
+ public String getLeaderId() {
+ return delegate.getLeaderId();
+ }
+
+ @Override
+ public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+ delegate.setReplicatedToAllIndex(replicatedToAllIndex);
+ }
+
+ @Override
+ public long getReplicatedToAllIndex() {
+ return delegate.getReplicatedToAllIndex();
+ }
+}
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
private final TestActorRef<MessageCollectorActor> collectorActor;
private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
private volatile byte[] snapshot;
- private volatile long mockTotalMemory;
private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
TestActorRef<MessageCollectorActor> collectorActor) {
dropMessages.remove(msgClass);
}
- void setMockTotalMemory(long mockTotalMemory) {
- this.mockTotalMemory = mockTotalMemory;
- }
+ void setMockTotalMemory(final long mockTotalMemory) {
+ if(mockTotalMemory > 0) {
+ getRaftActorContext().setTotalMemoryRetriever(new Supplier<Long>() {
+ @Override
+ public Long get() {
+ return mockTotalMemory;
+ }
- @Override
- protected long getTotalMemory() {
- return mockTotalMemory > 0 ? mockTotalMemory : super.getTotalMemory();
+ });
+ } else {
+ getRaftActorContext().setTotalMemoryRetriever(null);
+ }
}
@Override
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import akka.japi.Procedure;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public List<ReplicatedLogEntry> getEntriesTill(final int index) {
return journal.subList(0, index);
}
+
+ @Override
+ public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+ }
}
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
+import akka.japi.Procedure;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.protobuf.GeneratedMessage;
import java.io.Serializable;
import java.util.HashMap;
this.configParams = configParams;
}
+ @Override
+ public long getTotalMemory() {
+ return Runtime.getRuntime().totalMemory();
+ }
+
+ @Override
+ public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+ }
+
+ @Override
+ public boolean hasFollowers() {
+ return getPeerAddresses().keySet().size() > 0;
+ }
+
public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
@Override public void appendAndPersist(
ReplicatedLogEntry replicatedLogEntry) {
@Override public void removeFromAndPersist(long index) {
removeFrom(index);
}
+
+ @Override
+ public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+ append(replicatedLogEntry);
+
+ if(callback != null) {
+ try {
+ callback.apply(replicatedLogEntry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
public static class MockPayload extends Payload implements Serializable {
import java.util.Collection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
+import org.opendaylight.yangtools.yang.binding.ChildOf;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.Identifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
/**
* Represents modification of Data Object.
*
*/
-public interface DataObjectModification<T extends DataObject> extends Identifiable<PathArgument> {
+public interface DataObjectModification<T extends DataObject> extends org.opendaylight.yangtools.concepts.Identifiable<PathArgument> {
enum ModificationType {
/**
*/
@Nonnull Collection<DataObjectModification<? extends DataObject>> getModifiedChildren();
+ /**
+ * Returns container child modification if {@code child} was modified by this
+ * modification.
+ *
+ * For accessing all modified list items consider iterating over {@link #getModifiedChildren()}.
+ *
+ * @param child Type of child - must be only container
+ * @return Modification of {@code child} if {@code child} was modified, null otherwise.
+ * @throws IllegalArgumentException If supplied {@code child} class is not valid child according
+ * to generated model.
+ */
+ @Nullable <C extends ChildOf<? super T>> DataObjectModification<C> getModifiedChildContainer(@Nonnull Class<C> child);
+
+ /**
+ * Returns augmentation child modification if {@code augmentation} was modified by this
+ * modification.
+ *
+ * For accessing all modified list items consider iterating over {@link #getModifiedChildren()}.
+ *
+ * @param augmentation Type of augmentation - must be only container
+ * @return Modification of {@code augmentation} if {@code augmentation} was modified, null otherwise.
+ * @throws IllegalArgumentException If supplied {@code augmentation} class is not valid augmentation
+ * according to generated model.
+ */
+ @Nullable <C extends Augmentation<T> & DataObject> DataObjectModification<C> getModifiedAugmentation(@Nonnull Class<C> augmentation);
+
+
+ /**
+ * Returns child list item modification if {@code child} was modified by this modification.
+ *
+ * @param listItem Type of list item - must be list item with key
+ * @param listKey List item key
+ * @return Modification of {@code child} if {@code child} was modified, null otherwise.
+ * @throws IllegalArgumentException If supplied {@code listItem} class is not valid child according
+ * to generated model.
+ */
+ <C extends Identifiable<K> & ChildOf<? super T>, K extends Identifier<C>> DataObjectModification<C> getModifiedChildListItem(
+ @Nonnull Class<C> listItem,@Nonnull K listKey);
+
+ /**
+ * Returns a child modification if a node identified by {@code childArgument} was modified by
+ * this modification.
+ *
+ * @param childArgument Path Argument of child node
+ * @return Modification of child identified by {@code childArgument} if {@code childArgument}
+ * was modified, null otherwise.
+ * @throws IllegalArgumentException If supplied path argument is not valid child according to
+ * generated model.
+ *
+ */
+ @Nullable DataObjectModification<? extends DataObject> getModifiedChild(PathArgument childArgument);
}
import java.util.Collection;
import java.util.EventListener;
import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.binding.DataObject;
/**
* Interface implemented by classes interested in receiving notifications about
* in that it provides a cursor-based view of the change, which has potentially
* lower overhead and allow more flexible consumption of change event.
*/
-public interface DataTreeChangeListener extends EventListener {
+public interface DataTreeChangeListener<T extends DataObject> extends EventListener {
/**
* Invoked when there was data change for the supplied path, which was used
* to register this listener.
*
* @param changes Collection of change events, may not be null or empty.
*/
- void onDataTreeChanged(@Nonnull Collection<DataTreeModification> changes);
+ void onDataTreeChanged(@Nonnull Collection<DataTreeModification<T>> changes);
}
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
/**
* A {@link DOMService} which allows users to register for changes to a
* your listener using {@link ListenerRegistration#close()} to stop
* delivery of change events.
*/
- @Nonnull <L extends DataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(@Nonnull DataTreeIdentifier treeId, @Nonnull L listener);
+ @Nonnull <T extends DataObject,L extends DataTreeChangeListener<T>> ListenerRegistration<L> registerDataTreeChangeListener(@Nonnull DataTreeIdentifier<T> treeId, @Nonnull L listener);
}
\ No newline at end of file
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.concepts.Immutable;
import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
/**
* A unique identifier for a particular subtree. It is composed of the logical
* data store type and the instance identifier of the root node.
*/
-public final class DataTreeIdentifier implements Immutable, Path<DataTreeIdentifier>, Serializable {
+public final class DataTreeIdentifier<T extends DataObject> implements Immutable, Path<DataTreeIdentifier<?>>, Serializable {
private static final long serialVersionUID = 1L;
- private final InstanceIdentifier<?> rootIdentifier;
+ private final InstanceIdentifier<T> rootIdentifier;
private final LogicalDatastoreType datastoreType;
- public DataTreeIdentifier(final LogicalDatastoreType datastoreType, final InstanceIdentifier<?> rootIdentifier) {
+ public DataTreeIdentifier(final LogicalDatastoreType datastoreType, final InstanceIdentifier<T> rootIdentifier) {
this.datastoreType = Preconditions.checkNotNull(datastoreType);
this.rootIdentifier = Preconditions.checkNotNull(rootIdentifier);
}
}
@Override
- public boolean contains(final DataTreeIdentifier other) {
+ public boolean contains(final DataTreeIdentifier<?> other) {
return datastoreType == other.datastoreType && rootIdentifier.contains(other.rootIdentifier);
}
if (!(obj instanceof DataTreeIdentifier)) {
return false;
}
- DataTreeIdentifier other = (DataTreeIdentifier) obj;
+ final DataTreeIdentifier<?> other = (DataTreeIdentifier<?>) obj;
if (datastoreType != other.datastoreType) {
return false;
}
* @author Tony Tkacik <ttkacik@cisco.com>
*
*/
-public interface DataTreeModification {
+public interface DataTreeModification<T extends DataObject> {
/**
* Get the modification root path. This is the path of the root node
*
* @return absolute path of the root node
*/
- @Nonnull DataTreeIdentifier getRootPath();
+ @Nonnull DataTreeIdentifier<T> getRootPath();
/**
* Get the modification root node.
*
* @return modification root node
*/
- @Nonnull DataObjectModification<? extends DataObject> getRootNode();
+ @Nonnull DataObjectModification<T> getRootNode();
}
import java.util.Set;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMAdapterBuilder.Factory;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
/**
* The DataBrokerImpl simply defers to the DOMDataBroker for all its operations.
*
*/
-public class BindingDOMDataBrokerAdapter extends AbstractForwardedDataBroker implements DataBroker {
+public class BindingDOMDataBrokerAdapter extends AbstractForwardedDataBroker implements DataBroker, DataTreeChangeService {
static final Factory<DataBroker> BUILDER_FACTORY = new BindingDOMAdapterBuilder.Factory<DataBroker>() {
}
};
+ private final DataTreeChangeService treeChangeService;
public BindingDOMDataBrokerAdapter(final DOMDataBroker domDataBroker, final BindingToNormalizedNodeCodec codec) {
super(domDataBroker, codec);
- }
-
- @Deprecated
- public BindingDOMDataBrokerAdapter(final DOMDataBroker domDataBroker, final BindingToNormalizedNodeCodec codec, final SchemaService schemaService) {
- super(domDataBroker, codec,schemaService);
+ final DOMDataTreeChangeService domTreeChange = (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
+ if(domTreeChange != null) {
+ treeChangeService = BindingDOMDataTreeChangeServiceAdapter.create(codec, domTreeChange);
+ } else {
+ treeChangeService = null;
+ }
}
@Override
}
@Override
- protected DataBroker createInstance(BindingToNormalizedNodeCodec codec,
- ClassToInstanceMap<DOMService> delegates) {
- DOMDataBroker domDataBroker = delegates.getInstance(DOMDataBroker.class);
+ protected DataBroker createInstance(final BindingToNormalizedNodeCodec codec,
+ final ClassToInstanceMap<DOMService> delegates) {
+ final DOMDataBroker domDataBroker = delegates.getInstance(DOMDataBroker.class);
return new BindingDOMDataBrokerAdapter(domDataBroker, codec);
}
+ }
-
+ @Override
+ public <T extends DataObject, L extends DataTreeChangeListener<T>> ListenerRegistration<L> registerDataTreeChangeListener(
+ final DataTreeIdentifier<T> treeId, final L listener) {
+ if(treeChangeService == null) {
+ throw new UnsupportedOperationException("Underlying data broker does not expose DOMDataTreeChangeService.");
+ }
+ return treeChangeService.registerDataTreeChangeListener(treeId, listener);
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Adapter wrapping Binding {@link DataTreeChangeListener} and exposing
+ * it as {@link DOMDataTreeChangeListener} and translated DOM events
+ * to their Binding equivalent.
+ *
+ */
+final class BindingDOMDataTreeChangeListenerAdapter<T extends DataObject> implements DOMDataTreeChangeListener {
+
+ private final BindingToNormalizedNodeCodec codec;
+ private final DataTreeChangeListener<T> listener;
+ private final LogicalDatastoreType store;
+
+ BindingDOMDataTreeChangeListenerAdapter(final BindingToNormalizedNodeCodec codec, final DataTreeChangeListener<T> listener,
+ final LogicalDatastoreType store) {
+ this.codec = Preconditions.checkNotNull(codec);
+ this.listener = Preconditions.checkNotNull(listener);
+ this.store = Preconditions.checkNotNull(store);
+ }
+
+ @Override
+ public void onDataTreeChanged(final Collection<DataTreeCandidate> domChanges) {
+ final Collection<DataTreeModification<T>> bindingChanges = LazyDataTreeModification.from(codec, domChanges, store);
+ listener.onDataTreeChanged(bindingChanges);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+/**
+ *
+ * Adapter exposing Binding {@link DataTreeChangeService} and wrapping
+ * {@link DOMDataTreeChangeService} and is responsible for translation
+ * and instantiation of {@link BindingDOMDataTreeChangeListenerAdapter}
+ * adapters.
+ *
+ * Each registered {@link DataTreeChangeListener} is wrapped using
+ * adapter and registered directly to DOM service.
+ */
+final class BindingDOMDataTreeChangeServiceAdapter implements DataTreeChangeService {
+
+ private final BindingToNormalizedNodeCodec codec;
+ private final DOMDataTreeChangeService dataTreeChangeService;
+
+ private BindingDOMDataTreeChangeServiceAdapter(final BindingToNormalizedNodeCodec codec,
+ final DOMDataTreeChangeService dataTreeChangeService) {
+ this.codec = Preconditions.checkNotNull(codec);
+ this.dataTreeChangeService = Preconditions.checkNotNull(dataTreeChangeService);
+ }
+
+ static DataTreeChangeService create(final BindingToNormalizedNodeCodec codec,
+ final DOMDataTreeChangeService dataTreeChangeService) {
+ return new BindingDOMDataTreeChangeServiceAdapter(codec, dataTreeChangeService);
+ }
+
+ @Override
+ public <T extends DataObject, L extends DataTreeChangeListener<T>> ListenerRegistration<L> registerDataTreeChangeListener(
+ final DataTreeIdentifier<T> treeId, final L listener) {
+ final DOMDataTreeIdentifier domIdentifier = toDomTreeIdentifier(treeId);
+ final BindingDOMDataTreeChangeListenerAdapter<T> domListener = new BindingDOMDataTreeChangeListenerAdapter<>(codec,listener, treeId.getDatastoreType());
+ final ListenerRegistration<BindingDOMDataTreeChangeListenerAdapter<T>> domReg = dataTreeChangeService.registerDataTreeChangeListener(domIdentifier, domListener);
+ return new BindingDataTreeChangeListenerRegistration<>(listener,domReg);
+ }
+
+ private DOMDataTreeIdentifier toDomTreeIdentifier(final DataTreeIdentifier<?> treeId) {
+ final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifier(treeId.getRootIdentifier());
+ return new DOMDataTreeIdentifier(treeId.getDatastoreType(), domPath);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+class BindingDataTreeChangeListenerRegistration<L extends DataTreeChangeListener<?>> extends AbstractListenerRegistration<L> {
+
+ private final ListenerRegistration<?> domReg;
+
+ BindingDataTreeChangeListenerRegistration(final L listener, final ListenerRegistration<?> domReg) {
+ super(listener);
+ this.domReg = Preconditions.checkNotNull(domReg);
+ }
+
+ @Override
+ protected void removeRegistration() {
+ domReg.close();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Optional;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+
+/**
+ *
+ * Defines structural mapping of Normalized Node to Binding data
+ * addressable by Instance Identifier.
+ *
+ * Not all binding data are addressable by instance identifier
+ * and there are some differences.
+ *
+ * See {@link #NOT_ADDRESSABLE},{@link #INVISIBLE_CONTAINER},{@link #VISIBLE_CONTAINER}
+ * for more details.
+ *
+ *
+ */
+enum BindingStructuralType {
+
+ /**
+ * DOM Item is not addressable in Binding Instance Identifier,
+ * data is not lost, but are available only via parent object.
+ *
+ * Such types of data are leaf-lists, leafs, list without keys
+ * or anyxml.
+ *
+ */
+ NOT_ADDRESSABLE,
+ /**
+ * Data container is addressable in NormalizedNode format,
+ * but in Binding it is not represented in Instance Identifier.
+ *
+ * This are choice / case nodes.
+ *
+ * This data is still accessible using parent object and their
+ * children are addressable.
+ *
+ */
+ INVISIBLE_CONTAINER,
+ /**
+ * Data container is addressable in NormalizedNode format,
+ * but in Binding it is not represented in Instance Identifier.
+ *
+ * This are list nodes.
+ *
+ * This data is still accessible using parent object and their
+ * children are addressable.
+ *
+ */
+ INVISIBLE_LIST,
+ /**
+ * Data container is addressable in Binding Instance Identifier format
+ * and also YangInstanceIdentifier format.
+ *
+ */
+ VISIBLE_CONTAINER,
+ /**
+ * Mapping algorithm was unable to detect type or was not updated after introduction
+ * of new NormalizedNode type.
+ */
+ UNKNOWN;
+
+ static BindingStructuralType from(final DataTreeCandidateNode domChildNode) {
+ final Optional<NormalizedNode<?, ?>> dataBased = domChildNode.getDataAfter().or(domChildNode.getDataBefore());
+ if(dataBased.isPresent()) {
+ return from(dataBased.get());
+ }
+ return from(domChildNode.getIdentifier());
+ }
+
+ private static BindingStructuralType from(final PathArgument identifier) {
+ if(identifier instanceof NodeIdentifierWithPredicates || identifier instanceof AugmentationIdentifier) {
+ return VISIBLE_CONTAINER;
+ }
+ if(identifier instanceof NodeWithValue) {
+ return NOT_ADDRESSABLE;
+ }
+ return UNKNOWN;
+ }
+
+ static BindingStructuralType from(final NormalizedNode<?, ?> data) {
+ if(isNotAddressable(data)) {
+ return NOT_ADDRESSABLE;
+ }
+ if(data instanceof MapNode) {
+ return INVISIBLE_LIST;
+ }
+ if(data instanceof ChoiceNode) {
+ return INVISIBLE_CONTAINER;
+ }
+ if(isVisibleContainer(data)) {
+ return VISIBLE_CONTAINER;
+ }
+ return UNKNOWN;
+ }
+
+ private static boolean isVisibleContainer(final NormalizedNode<?, ?> data) {
+ return data instanceof MapEntryNode || data instanceof ContainerNode || data instanceof AugmentationNode;
+ }
+
+ private static boolean isNotAddressable(final NormalizedNode<?, ?> d) {
+ return d instanceof LeafNode
+ || d instanceof AnyXmlNode
+ || d instanceof LeafSetNode
+ || d instanceof LeafSetEntryNode;
+ }
+
+}
import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableBiMap;
import java.lang.reflect.Method;
+import java.util.AbstractMap.SimpleEntry;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTree;
import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
}
@Override
- public YangInstanceIdentifier toYangInstanceIdentifier(InstanceIdentifier<?> binding) {
+ public YangInstanceIdentifier toYangInstanceIdentifier(final InstanceIdentifier<?> binding) {
return codecRegistry.toYangInstanceIdentifier(binding);
}
@Override
public <T extends DataObject> Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
- InstanceIdentifier<T> path, T data) {
+ final InstanceIdentifier<T> path, final T data) {
return codecRegistry.toNormalizedNode(path, data);
}
}
@Override
- public Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode(YangInstanceIdentifier path,
- NormalizedNode<?, ?> data) {
+ public Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode(final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
return codecRegistry.fromNormalizedNode(path, data);
}
@Override
- public Notification fromNormalizedNodeNotification(SchemaPath path, ContainerNode data) {
+ public Notification fromNormalizedNodeNotification(final SchemaPath path, final ContainerNode data) {
return codecRegistry.fromNormalizedNodeNotification(path, data);
}
@Override
- public DataObject fromNormalizedNodeRpcData(SchemaPath path, ContainerNode data) {
+ public DataObject fromNormalizedNodeRpcData(final SchemaPath path, final ContainerNode data) {
return codecRegistry.fromNormalizedNodeRpcData(path, data);
}
@Override
- public InstanceIdentifier<?> fromYangInstanceIdentifier(YangInstanceIdentifier dom) {
+ public InstanceIdentifier<?> fromYangInstanceIdentifier(final YangInstanceIdentifier dom) {
return codecRegistry.fromYangInstanceIdentifier(dom);
}
@Override
- public ContainerNode toNormalizedNodeNotification(Notification data) {
+ public ContainerNode toNormalizedNodeNotification(final Notification data) {
return codecRegistry.toNormalizedNodeNotification(data);
}
@Override
- public ContainerNode toNormalizedNodeRpcData(DataContainer data) {
+ public ContainerNode toNormalizedNodeRpcData(final DataContainer data) {
return codecRegistry.toNormalizedNodeRpcData(data);
}
}
@Override
- public BindingCodecTree create(BindingRuntimeContext context) {
+ public BindingCodecTree create(final BindingRuntimeContext context) {
return codecRegistry.create(context);
}
@Override
- public BindingCodecTree create(SchemaContext context, Class<?>... bindingClasses) {
+ public BindingCodecTree create(final SchemaContext context, final Class<?>... bindingClasses) {
return codecRegistry.create(context, bindingClasses);
}
+ @Nonnull protected Map.Entry<InstanceIdentifier<?>, BindingCodecTreeNode<?>> getSubtreeCodec(
+ final YangInstanceIdentifier domIdentifier) {
+
+ final BindingCodecTree currentCodecTree = codecRegistry.getCodecContext();
+ final InstanceIdentifier<?> bindingPath = codecRegistry.fromYangInstanceIdentifier(domIdentifier);
+ Preconditions.checkArgument(bindingPath != null);
+ /**
+ * If we are able to deserialize YANG instance identifier, getSubtreeCodec must
+ * return non-null value.
+ */
+ final BindingCodecTreeNode<?> codecContext = currentCodecTree.getSubtreeCodec(bindingPath);
+ return new SimpleEntry<InstanceIdentifier<?>, BindingCodecTreeNode<?>>(bindingPath, codecContext);
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
+import org.opendaylight.yangtools.yang.binding.ChildOf;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Lazily translated {@link DataObjectModification} based on {@link DataTreeCandidateNode}.
+ *
+ * {@link LazyDataObjectModification} represents Data tree change event,
+ * but whole tree is not translated or resolved eagerly, but only child nodes
+ * which are directly accessed by user of data object modification.
+ *
+ * @param <T> Type of Binding Data Object
+ */
+class LazyDataObjectModification<T extends DataObject> implements DataObjectModification<T> {
+
+ private final static Logger LOG = LoggerFactory.getLogger(LazyDataObjectModification.class);
+
+ private final BindingCodecTreeNode<T> codec;
+ private final DataTreeCandidateNode domData;
+ private final PathArgument identifier;
+ private Collection<DataObjectModification<? extends DataObject>> childNodesCache;
+
+ private LazyDataObjectModification(final BindingCodecTreeNode<T> codec, final DataTreeCandidateNode domData) {
+ this.codec = Preconditions.checkNotNull(codec);
+ this.domData = Preconditions.checkNotNull(domData);
+ this.identifier = codec.deserializePathArgument(domData.getIdentifier());
+ }
+
+ static <T extends DataObject> DataObjectModification<T> create(final BindingCodecTreeNode<T> codec,
+ final DataTreeCandidateNode domData) {
+ return new LazyDataObjectModification<>(codec,domData);
+ }
+
+ static Collection<DataObjectModification<? extends DataObject>> from(final BindingCodecTreeNode<?> parentCodec,
+ final Collection<DataTreeCandidateNode> domChildNodes) {
+ final ArrayList<DataObjectModification<? extends DataObject>> result = new ArrayList<>(domChildNodes.size());
+ populateList(result, parentCodec, domChildNodes);
+ return result;
+ }
+
+ private static void populateList(final List<DataObjectModification<? extends DataObject>> result,
+ final BindingCodecTreeNode<?> parentCodec, final Collection<DataTreeCandidateNode> domChildNodes) {
+ for (final DataTreeCandidateNode domChildNode : domChildNodes) {
+ final BindingStructuralType type = BindingStructuralType.from(domChildNode);
+ if (type != BindingStructuralType.NOT_ADDRESSABLE) {
+ /*
+ * Even if type is UNKNOWN, from perspective of BindingStructuralType
+ * we try to load codec for it. We will use that type to further specify
+ * debug log.
+ */
+ try {
+ final BindingCodecTreeNode<?> childCodec =
+ parentCodec.yangPathArgumentChild(domChildNode.getIdentifier());
+ populateList(result,type, childCodec, domChildNode);
+ } catch (final IllegalArgumentException e) {
+ if(type == BindingStructuralType.UNKNOWN) {
+ LOG.debug("Unable to deserialize unknown DOM node {}",domChildNode,e);
+ } else {
+ LOG.debug("Binding representation for DOM node {} was not found",domChildNode,e);
+ }
+ }
+ }
+ }
+ }
+
+
+ private static void populateList(final List<DataObjectModification<? extends DataObject>> result,
+ final BindingStructuralType type, final BindingCodecTreeNode<?> childCodec,
+ final DataTreeCandidateNode domChildNode) {
+ switch (type) {
+ case INVISIBLE_LIST:
+ // We use parent codec intentionally.
+ populateListWithSingleCodec(result, childCodec, domChildNode.getChildNodes());
+ break;
+ case INVISIBLE_CONTAINER:
+ populateList(result, childCodec, domChildNode.getChildNodes());
+ break;
+ case UNKNOWN:
+ case VISIBLE_CONTAINER:
+ result.add(create(childCodec, domChildNode));
+ default:
+ break;
+ }
+ }
+
+ private static void populateListWithSingleCodec(final List<DataObjectModification<? extends DataObject>> result,
+ final BindingCodecTreeNode<?> codec, final Collection<DataTreeCandidateNode> childNodes) {
+ for (final DataTreeCandidateNode child : childNodes) {
+ result.add(create(codec, child));
+ }
+ }
+
+ @Override
+ public T getDataAfter() {
+ return deserialize(domData.getDataAfter());
+ }
+
+ @Override
+ public Class<T> getDataType() {
+ return codec.getBindingClass();
+ }
+
+ @Override
+ public PathArgument getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public DataObjectModification.ModificationType getModificationType() {
+ switch(domData.getModificationType()) {
+ case WRITE:
+ return DataObjectModification.ModificationType.WRITE;
+ case SUBTREE_MODIFIED:
+ return DataObjectModification.ModificationType.SUBTREE_MODIFIED;
+ case DELETE:
+ return DataObjectModification.ModificationType.DELETE;
+
+ default:
+ // TODO: Should we lie about modification type instead of exception?
+ throw new IllegalStateException("Unsupported DOM Modification type " + domData.getModificationType());
+ }
+ }
+
+ @Override
+ public Collection<DataObjectModification<? extends DataObject>> getModifiedChildren() {
+ if(childNodesCache == null) {
+ childNodesCache = from(codec,domData.getChildNodes());
+ }
+ return childNodesCache;
+ }
+
+ @Override
+ public DataObjectModification<? extends DataObject> getModifiedChild(final PathArgument arg) {
+ final List<YangInstanceIdentifier.PathArgument> domArgumentList = new ArrayList<>();
+ final BindingCodecTreeNode<?> childCodec = codec.bindingPathArgumentChild(arg, domArgumentList);
+ final Iterator<YangInstanceIdentifier.PathArgument> toEnter = domArgumentList.iterator();
+ DataTreeCandidateNode current = domData;
+ while (toEnter.hasNext() && current != null) {
+ current = current.getModifiedChild(toEnter.next());
+ }
+ if (current != null) {
+ return create(childCodec, current);
+ }
+ return null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <C extends Identifiable<K> & ChildOf<? super T>, K extends Identifier<C>> DataObjectModification<C> getModifiedChildListItem(
+ final Class<C> listItem, final K listKey) {
+ return (DataObjectModification<C>) getModifiedChild(new InstanceIdentifier.IdentifiableItem<>(listItem, listKey));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <C extends ChildOf<? super T>> DataObjectModification<C> getModifiedChildContainer(final Class<C> arg) {
+ return (DataObjectModification<C>) getModifiedChild(new InstanceIdentifier.Item<>(arg));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <C extends Augmentation<T> & DataObject> DataObjectModification<C> getModifiedAugmentation(
+ final Class<C> augmentation) {
+ return (DataObjectModification<C>) getModifiedChild(new InstanceIdentifier.Item<>(augmentation));
+ }
+
+ private T deserialize(final Optional<NormalizedNode<?, ?>> dataAfter) {
+ if(dataAfter.isPresent()) {
+ return codec.deserialize(dataAfter.get());
+ }
+ return null;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Lazily translated {@link DataTreeModification} based on {@link DataTreeCandidate}.
+ *
+ * {@link DataTreeModification} represents Data tree change event,
+ * but whole tree is not translated or resolved eagerly, but only child nodes
+ * which are directly accessed by user of data object modification.
+ *
+ */
+class LazyDataTreeModification<T extends DataObject> implements DataTreeModification<T> {
+
+ private final DataTreeIdentifier<T> path;
+ private final DataObjectModification<T> rootNode;
+
+ LazyDataTreeModification(final LogicalDatastoreType datastoreType, final InstanceIdentifier<T> path, final BindingCodecTreeNode<T> codec, final DataTreeCandidate domChange) {
+ this.path = new DataTreeIdentifier<>(datastoreType, path);
+ this.rootNode = LazyDataObjectModification.create(codec, domChange.getRootNode());
+ }
+
+ @Override
+ public DataObjectModification<T> getRootNode() {
+ return rootNode;
+ }
+
+ @Override
+ public DataTreeIdentifier<T> getRootPath() {
+ return path;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static <T extends DataObject> DataTreeModification<T> create(final BindingToNormalizedNodeCodec codec, final DataTreeCandidate domChange,
+ final LogicalDatastoreType datastoreType) {
+ final Entry<InstanceIdentifier<?>, BindingCodecTreeNode<?>> codecCtx =
+ codec.getSubtreeCodec(domChange.getRootPath());
+ return (DataTreeModification<T>) new LazyDataTreeModification(datastoreType, codecCtx.getKey(), codecCtx.getValue(), domChange);
+ }
+
+ static <T extends DataObject> Collection<DataTreeModification<T>> from(final BindingToNormalizedNodeCodec codec,
+ final Collection<DataTreeCandidate> domChanges, final LogicalDatastoreType datastoreType) {
+ final List<DataTreeModification<T>> result = new ArrayList<>(domChanges.size());
+ for (final DataTreeCandidate domChange : domChanges) {
+ result.add(LazyDataTreeModification.<T>create(codec, domChange, datastoreType));
+ }
+ return result;
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.md.sal.binding.impl.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_BAR_KEY;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.top;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
+import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.augment.rev140709.TreeComplexUsesAugment;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.TwoLevelList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+
+public class DataTreeChangeListenerTest extends AbstractDataBrokerTest {
+
+ private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier.create(Top.class);
+ private static final PathArgument TOP_ARGUMENT= TOP_PATH.getPathArguments().iterator().next();
+ private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
+ private static final PathArgument FOO_ARGUMENT = Iterables.getLast(FOO_PATH.getPathArguments());
+ private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
+ private static final InstanceIdentifier<TopLevelList> BAR_PATH = path(TOP_BAR_KEY);
+ private static final PathArgument BAR_ARGUMENT = Iterables.getLast(BAR_PATH.getPathArguments());
+ private static final TopLevelList BAR_DATA = topLevelList(TOP_BAR_KEY);
+private static final DataTreeIdentifier<Top> TOP_IDENTIFIER = new DataTreeIdentifier<Top>(LogicalDatastoreType.OPERATIONAL,
+ TOP_PATH);
+
+ private static final Top TOP_INITIAL_DATA = top(FOO_DATA);
+
+ private BindingDOMDataBrokerAdapter dataBrokerImpl;
+
+ private static final class EventCapturingListener<T extends DataObject> implements DataTreeChangeListener<T> {
+
+ private SettableFuture<Collection<DataTreeModification<T>>> changes = SettableFuture.create();
+
+ @Override
+ public void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
+ this.changes.set(changes);
+
+ }
+
+ Collection<DataTreeModification<T>> nextEvent() throws Exception {
+ final Collection<DataTreeModification<T>> result = changes.get(200,TimeUnit.MILLISECONDS);
+ changes = SettableFuture.create();
+ return result;
+ }
+
+ }
+
+ @Override
+ protected Iterable<YangModuleInfo> getModuleInfos() throws Exception {
+ return ImmutableSet.of(
+ BindingReflections.getModuleInfo(TwoLevelList.class),
+ BindingReflections.getModuleInfo(TreeComplexUsesAugment.class)
+ );
+ }
+
+ @Override
+ protected void setupWithDataBroker(final DataBroker dataBroker) {
+ dataBrokerImpl = (BindingDOMDataBrokerAdapter) dataBroker;
+ }
+
+ @Test
+ public void testTopLevelListener() throws Exception {
+ final EventCapturingListener<Top> listener = new EventCapturingListener<>();
+ dataBrokerImpl.registerDataTreeChangeListener(TOP_IDENTIFIER, listener);
+
+ createAndVerifyTop(listener);
+
+ putTx(BAR_PATH, BAR_DATA).submit().checkedGet();
+ final DataObjectModification<Top> afterBarPutEvent = Iterables.getOnlyElement(listener.nextEvent()).getRootNode();
+ verifyModification(afterBarPutEvent, TOP_ARGUMENT, ModificationType.SUBTREE_MODIFIED);
+ final DataObjectModification<TopLevelList> barPutMod = afterBarPutEvent.getModifiedChildListItem(TopLevelList.class, TOP_BAR_KEY);
+ assertNotNull(barPutMod);
+ verifyModification(barPutMod, BAR_ARGUMENT, ModificationType.WRITE);
+
+ deleteTx(BAR_PATH).submit().checkedGet();
+ final DataObjectModification<Top> afterBarDeleteEvent = Iterables.getOnlyElement(listener.nextEvent()).getRootNode();
+ verifyModification(afterBarDeleteEvent, TOP_ARGUMENT, ModificationType.SUBTREE_MODIFIED);
+ final DataObjectModification<TopLevelList> barDeleteMod = afterBarDeleteEvent.getModifiedChildListItem(TopLevelList.class, TOP_BAR_KEY);
+ verifyModification(barDeleteMod, BAR_ARGUMENT, ModificationType.DELETE);
+ }
+
+ @Test
+ public void testWildcardedListListener() throws Exception {
+ final EventCapturingListener<TopLevelList> listener = new EventCapturingListener<>();
+ final DataTreeIdentifier<TopLevelList> wildcard = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, TOP_PATH.child(TopLevelList.class));
+ dataBrokerImpl.registerDataTreeChangeListener(wildcard, listener);
+
+ putTx(TOP_PATH, TOP_INITIAL_DATA).submit().checkedGet();
+
+ final DataTreeModification<TopLevelList> fooWriteEvent = Iterables.getOnlyElement(listener.nextEvent());
+ assertEquals(FOO_PATH, fooWriteEvent.getRootPath().getRootIdentifier());
+ verifyModification(fooWriteEvent.getRootNode(), FOO_ARGUMENT, ModificationType.WRITE);
+
+ putTx(BAR_PATH, BAR_DATA).submit().checkedGet();
+ final DataTreeModification<TopLevelList> barWriteEvent = Iterables.getOnlyElement(listener.nextEvent());
+ assertEquals(BAR_PATH, barWriteEvent.getRootPath().getRootIdentifier());
+ verifyModification(barWriteEvent.getRootNode(), BAR_ARGUMENT, ModificationType.WRITE);
+
+ deleteTx(BAR_PATH).submit().checkedGet();
+ final DataTreeModification<TopLevelList> barDeleteEvent = Iterables.getOnlyElement(listener.nextEvent());
+ assertEquals(BAR_PATH, barDeleteEvent.getRootPath().getRootIdentifier());
+ verifyModification(barDeleteEvent.getRootNode(), BAR_ARGUMENT, ModificationType.DELETE);
+ }
+
+
+
+ private void createAndVerifyTop(final EventCapturingListener<Top> listener) throws Exception {
+ putTx(TOP_PATH,TOP_INITIAL_DATA).submit().checkedGet();
+ final Collection<DataTreeModification<Top>> events = listener.nextEvent();
+
+ assertFalse("Non empty collection should be received.",events.isEmpty());
+ final DataTreeModification<Top> initialWrite = Iterables.getOnlyElement(events);
+ final DataObjectModification<? extends DataObject> initialNode = initialWrite.getRootNode();
+ verifyModification(initialNode,TOP_PATH.getPathArguments().iterator().next(),ModificationType.WRITE);
+ assertEquals(TOP_INITIAL_DATA, initialNode.getDataAfter());
+ }
+
+ private void verifyModification(final DataObjectModification<? extends DataObject> barWrite, final PathArgument pathArg,
+ final ModificationType eventType) {
+ assertEquals(pathArg.getType(), barWrite.getDataType());
+ assertEquals(eventType,barWrite.getModificationType());
+ assertEquals(pathArg, barWrite.getIdentifier());
+ }
+
+ private <T extends DataObject> WriteTransaction putTx(final InstanceIdentifier<T> path,final T data) {
+ final WriteTransaction tx = dataBrokerImpl.newWriteOnlyTransaction();
+ tx.put(LogicalDatastoreType.OPERATIONAL, path, data);
+ return tx;
+ }
+
+ private WriteTransaction deleteTx(final InstanceIdentifier<?> path) {
+ final WriteTransaction tx = dataBrokerImpl.newWriteOnlyTransaction();
+ tx.delete(LogicalDatastoreType.OPERATIONAL, path);
+ return tx;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import java.util.List;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import scala.concurrent.Future;
+
+/**
+ * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} instances returned by this
+ * implementation. In addition to the usual set of methods it also contains the list of actor
+ * futures.
+ */
+abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ abstract List<Future<ActorSelection>> getCohortFutures();
+}
}
@Override
- protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
+ public AbstractThreePhaseCommitCohort ready() {
+ final AbstractThreePhaseCommitCohort ret = super.ready();
+ readyFutures = ret.getCohortFutures();
LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
- readyFutures.size(), getTransactionChainId());
- this.readyFutures = readyFutures;
+ readyFutures.size(), getTransactionChainId());
+ return ret;
}
/**
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorSelection;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import java.util.Collections;
+import java.util.List;
+import scala.concurrent.Future;
/**
- * A {@link DOMStoreThreePhaseCommitCohort} instance given out for empty transactions.
+ * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
+ * instance given out for empty transactions.
*/
-final class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort {
static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
public ListenableFuture<Void> commit() {
return IMMEDIATE_VOID_SUCCESS;
}
-}
\ No newline at end of file
+
+ @Override
+ List<Future<ActorSelection>> getCohortFutures() {
+ return Collections.emptyList();
+ }
+}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
*/
-public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
}, actorContext.getClientDispatcher());
}
- @VisibleForTesting
+ @Override
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
}
@Override
- public void readData(
- final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
+ public void readData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
- // Send the remaining batched modifications if any.
+ // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+ // public API contract.
sendAndRecordBatchedModifications();
- // If there were any previous recorded put/merge/delete operation reply Futures then we
- // must wait for them to successfully complete. This is necessary to honor the read
- // uncommitted semantics of the public API contract. If any one fails then fail the read.
-
- if(recordedOperationCount() == 0) {
- finishReadData(path, returnFuture);
- } else {
- LOG.debug("Tx {} readData: verifying {} previous recorded operations",
- getIdentifier(), recordedOperationCount());
-
- // Note: we make a copy of recordedOperationFutures to be on the safe side in case
- // Futures#sequence accesses the passed List on a different thread, as
- // recordedOperationFutures is not synchronized.
-
- Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
- copyRecordedOperationFutures(), actorContext.getClientDispatcher());
-
- OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
- @Override
- public void onComplete(Throwable failure, Iterable<Object> notUsed)
- throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} readData: a recorded operation failed: {}",
- getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "The read could not be performed because a previous put, merge,"
- + "or delete operation failed", failure));
- } else {
- finishReadData(path, returnFuture);
- }
- }
- };
-
- combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
- }
-
- }
-
- private void finishReadData(final YangInstanceIdentifier path,
- final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
-
- LOG.debug("Tx {} finishReadData called path = {}", getIdentifier(), path);
-
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object readResponse) throws Throwable {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- // Send the remaining batched modifications if any.
+ // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+ // public API contract.
sendAndRecordBatchedModifications();
- // If there were any previous recorded put/merge/delete operation reply Futures then we
- // must wait for them to successfully complete. This is necessary to honor the read
- // uncommitted semantics of the public API contract. If any one fails then fail this
- // request.
-
- if(recordedOperationCount() == 0) {
- finishDataExists(path, returnFuture);
- } else {
- LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
- getIdentifier(), recordedOperationCount());
-
- // Note: we make a copy of recordedOperationFutures to be on the safe side in case
- // Futures#sequence accesses the passed List on a different thread, as
- // recordedOperationFutures is not synchronized.
-
- Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
- copyRecordedOperationFutures(),
- actorContext.getClientDispatcher());
- OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
- @Override
- public void onComplete(Throwable failure, Iterable<Object> notUsed)
- throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
- getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "The data exists could not be performed because a previous "
- + "put, merge, or delete operation failed", failure));
- } else {
- finishDataExists(path, returnFuture);
- }
- }
- };
-
- combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
- }
- }
-
- private void finishDataExists(final YangInstanceIdentifier path,
- final SettableFuture<Boolean> returnFuture) {
-
- LOG.debug("Tx {} finishDataExists called path = {}", getIdentifier(), path);
-
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
}
+ private static enum TransactionState {
+ OPEN,
+ READY,
+ CLOSED,
+ }
+
static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
new Mapper<Throwable, Throwable>() {
@Override
private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
FiniteDuration.create(1, TimeUnit.SECONDS);
- /**
- * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
- * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
- * trickery to clean up its internal thread when the bundle is unloaded.
- */
- private static final FinalizableReferenceQueue phantomReferenceQueue =
- new FinalizableReferenceQueue();
-
- /**
- * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
- * necessary because PhantomReferences need a hard reference so they're not garbage collected.
- * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
- * and thus becomes eligible for garbage collection.
- */
- private static final Map<TransactionProxyCleanupPhantomReference,
- TransactionProxyCleanupPhantomReference> phantomReferenceCache =
- new ConcurrentHashMap<>();
-
- /**
- * A PhantomReference that closes remote transactions for a TransactionProxy when it's
- * garbage collected. This is used for read-only transactions as they're not explicitly closed
- * by clients. So the only way to detect that a transaction is no longer in use and it's safe
- * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
- * but TransactionProxy instances should generally be short-lived enough to avoid being moved
- * to the old generation space and thus should be cleaned up in a timely manner as the GC
- * runs on the young generation (eden, swap1...) space much more frequently.
- */
- private static class TransactionProxyCleanupPhantomReference
- extends FinalizablePhantomReference<TransactionProxy> {
-
- private final List<ActorSelection> remoteTransactionActors;
- private final AtomicBoolean remoteTransactionActorsMB;
- private final ActorContext actorContext;
- private final TransactionIdentifier identifier;
-
- protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
- super(referent, phantomReferenceQueue);
-
- // Note we need to cache the relevant fields from the TransactionProxy as we can't
- // have a hard reference to the TransactionProxy instance itself.
-
- remoteTransactionActors = referent.remoteTransactionActors;
- remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
- actorContext = referent.actorContext;
- identifier = referent.getIdentifier();
- }
-
- @Override
- public void finalizeReferent() {
- LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
- remoteTransactionActors.size(), identifier);
-
- phantomReferenceCache.remove(this);
-
- // Access the memory barrier volatile to ensure all previous updates to the
- // remoteTransactionActors list are visible to this thread.
-
- if(remoteTransactionActorsMB.get()) {
- for(ActorSelection actor : remoteTransactionActors) {
- LOG.trace("Sending CloseTransaction to {}", actor);
- actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
- }
- }
- }
- }
-
/**
* Stores the remote Tx actors for each requested data store path to be used by the
* PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
* remoteTransactionActors list so they will be visible to the thread accessing the
* PhantomReference.
*/
- private List<ActorSelection> remoteTransactionActors;
- private volatile AtomicBoolean remoteTransactionActorsMB;
+ List<ActorSelection> remoteTransactionActors;
+ volatile AtomicBoolean remoteTransactionActorsMB;
/**
* Stores the create transaction results per shard.
private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
private final TransactionType transactionType;
- private final ActorContext actorContext;
+ final ActorContext actorContext;
private final String transactionChainId;
private final SchemaContext schemaContext;
- private boolean inReadyState;
+ private TransactionState state = TransactionState.OPEN;
private volatile boolean initialized;
private Semaphore operationLimiter;
private void checkModificationState() {
Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
"Modification operation on read-only transaction is not allowed");
- Preconditions.checkState(!inReadyState,
+ Preconditions.checkState(state == TransactionState.OPEN,
"Transaction is sealed - further modifications are not allowed");
}
}
}
-
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
});
}
- @Override
- public DOMStoreThreePhaseCommitCohort ready() {
+ private boolean seal(final TransactionState newState) {
+ if (state == TransactionState.OPEN) {
+ state = newState;
+ return true;
+ } else {
+ return false;
+ }
+ }
- checkModificationState();
+ @Override
+ public AbstractThreePhaseCommitCohort ready() {
+ Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+ "Read-only transactions cannot be readied");
- inReadyState = true;
+ final boolean success = seal(TransactionState.READY);
+ Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
txFutureCallbackMap.size());
if (txFutureCallbackMap.isEmpty()) {
- onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
}
throttleOperation(txFutureCallbackMap.size());
- List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
+ List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
cohortFutures.add(future);
}
- onTransactionReady(cohortFutures);
-
return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
getIdentifier().toString());
}
- /**
- * Method for derived classes to be notified when the transaction has been readied.
- *
- * @param cohortFutures the cohort Futures for each shard transaction.
- */
- protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
- }
-
@Override
public void close() {
+ if (!seal(TransactionState.CLOSED)) {
+ if (state == TransactionState.CLOSED) {
+ // Idempotent no-op as per AutoCloseable recommendation
+ return;
+ }
+
+ throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
+ getIdentifier()));
+ }
+
for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
@Override
remoteTransactionActors = Lists.newArrayList();
remoteTransactionActorsMB = new AtomicBoolean();
- TransactionProxyCleanupPhantomReference cleanup =
- new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
- phantomReferenceCache.put(cleanup, cleanup);
+ TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
}
// Add the actor to the remoteTransactionActors list for access by the
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+final class TransactionProxyCleanupPhantomReference
+ extends FinalizablePhantomReference<TransactionProxy> {
+ private static final Logger LOG = LoggerFactory.getLogger(TransactionProxyCleanupPhantomReference.class);
+ /**
+ * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+ * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+ * trickery to clean up its internal thread when the bundle is unloaded.
+ */
+ private static final FinalizableReferenceQueue phantomReferenceQueue =
+ new FinalizableReferenceQueue();
+
+ /**
+ * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+ * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+ * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+ * and thus becomes eligible for garbage collection.
+ */
+ private static final Map<TransactionProxyCleanupPhantomReference,
+ TransactionProxyCleanupPhantomReference> phantomReferenceCache =
+ new ConcurrentHashMap<>();
+
+ private final List<ActorSelection> remoteTransactionActors;
+ private final AtomicBoolean remoteTransactionActorsMB;
+ private final ActorContext actorContext;
+ private final TransactionIdentifier identifier;
+
+ private TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+ super(referent, phantomReferenceQueue);
+
+ // Note we need to cache the relevant fields from the TransactionProxy as we can't
+ // have a hard reference to the TransactionProxy instance itself.
+
+ remoteTransactionActors = referent.remoteTransactionActors;
+ remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+ actorContext = referent.actorContext;
+ identifier = referent.getIdentifier();
+ }
+
+ static void track(TransactionProxy referent) {
+ final TransactionProxyCleanupPhantomReference ret = new TransactionProxyCleanupPhantomReference(referent);
+ phantomReferenceCache.put(ret, ret);
+ }
+
+ @Override
+ public void finalizeReferent() {
+ LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+ remoteTransactionActors.size(), identifier);
+
+ phantomReferenceCache.remove(this);
+
+ // Access the memory barrier volatile to ensure all previous updates to the
+ // remoteTransactionActors list are visible to this thread.
+
+ if(remoteTransactionActorsMB.get()) {
+ for(ActorSelection actor : remoteTransactionActors) {
+ LOG.trace("Sending CloseTransaction to {}", actor);
+ actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
+ }
+ }
+ }
+}
\ No newline at end of file
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
testReadWithExceptionOnInitialCreateTransaction(new TestException());
}
- @Test(expected = TestException.class)
- public void testReadWithPriorRecordingOperationFailure() throws Throwable {
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
- when(mockActorContext).getDatastoreContext();
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectFailedBatchedModifications(actorRef);
-
- doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- try {
- propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
- } finally {
- verify(mockActorContext, times(0)).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedReadData());
- }
- }
-
@Test
public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
}
- @Test(expected = TestException.class)
- public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
- doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
- when(mockActorContext).getDatastoreContext();
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectFailedBatchedModifications(actorRef);
-
- doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
-
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_WRITE);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- try {
- propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
- } finally {
- verify(mockActorContext, times(0)).executeOperationAsync(
- eq(actorSelection(actorRef)), eqSerializedDataExists());
- }
- }
-
@Test
public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
</type>
<name>XSQL</name>
<data-broker>
- <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
- <name>binding-data-broker</name>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+ <name>binding-data-broker</name>
</data-broker>
<async-data-broker>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.odl.xsql;
import java.sql.Connection;
import java.util.logging.Logger;
import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCConnection;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class JDBCDriver implements Driver {
public static JDBCDriver drv = new JDBCDriver();
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.md.sal.dom.xsql;
import java.io.InputStream;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Map;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class TablesResultSet implements ResultSet {
private String tables[] = null;
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.md.sal.dom.xsql;
import java.io.File;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class XSQLAdapter extends Thread implements SchemaContextListener {
private static final int SLEEP = 10000;
private String pinningFile;
private ServerSocket serverSocket = null;
private DOMDataBroker domDataBroker = null;
+ private static final String REFERENCE_FIELD_NAME = "reference";
private XSQLAdapter() {
XSQLAdapter.log("Starting Adapter");
List<Object> result = new LinkedList<Object>();
YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier
.builder()
- .node(XSQLODLUtils.getPath(table.getODLNode()).get(0))
+ .node(XSQLODLUtils.getPath(table.getFirstFromSchemaNodes()).get(0))
.toInstance();
DOMDataReadTransaction t = this.domDataBroker
.newReadOnlyTransaction();
Object node = t.read(type,
instanceIdentifier).get();
- node = XSQLODLUtils.get(node, "reference");
+ node = XSQLODLUtils.get(node, REFERENCE_FIELD_NAME);
if (node == null) {
return result;
}
-
- Map<?, ?> children = XSQLODLUtils.getChildren(node);
- for (Object c : children.values()) {
- result.add(c);
- /* I don't remember why i did this... possibly to prevent different siblings queried together
- Map<?, ?> sons = XSQLODLUtils.getChildren(c);
- for (Object child : sons.values()) {
- result.add(child);
- }*/
- }
-
+ result.add(node);
return result;
} catch (Exception err) {
XSQLAdapter.log(err);
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.md.sal.dom.xsql;
import java.io.DataInputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class XSQLBluePrint implements DatabaseMetaData, Serializable {
private static final long serialVersionUID = 1L;
return result;
}
- public void addToBluePrintCache(XSQLBluePrintNode blNode) {
- this.tableNameToBluePrint.put(blNode.getBluePrintNodeName(), blNode);
- Map<String, XSQLBluePrintNode> map = this.odlNameToBluePrint.get(blNode
- .getODLTableName());
- if (map == null) {
- map = new HashMap<String, XSQLBluePrintNode>();
- this.odlNameToBluePrint.put(blNode.getODLTableName(), map);
+ public XSQLBluePrintNode addToBluePrintCache(XSQLBluePrintNode blNode,XSQLBluePrintNode parent) {
+ XSQLBluePrintNode existingNode = this.tableNameToBluePrint.get(blNode.getBluePrintNodeName());
+ if(existingNode!=null){
+ existingNode.mergeAugmentation(blNode);
+ return existingNode;
+ }else{
+ this.tableNameToBluePrint.put(blNode.getBluePrintNodeName(), blNode);
+ Map<String, XSQLBluePrintNode> map = this.odlNameToBluePrint.get(blNode.getODLTableName());
+ if (map == null) {
+ map = new HashMap<String, XSQLBluePrintNode>();
+ this.odlNameToBluePrint.put(blNode.getODLTableName(), map);
+ }
+ map.put(blNode.getBluePrintNodeName(), blNode);
+ if(parent!=null)
+ parent.addChild(blNode);
+ return blNode;
}
- map.put(blNode.getBluePrintNodeName(), blNode);
}
public Class<?> getGenericType(ParameterizedType type) {
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.md.sal.dom.xsql;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class XSQLBluePrintNode implements Serializable {
private static final long serialVersionUID = 1L;
private Set<XSQLColumn> columns = new HashSet<XSQLColumn>();
private Map<String, XSQLColumn> origNameToColumn = new HashMap<String, XSQLColumn>();
- private transient Object odlNode = null;
+ private transient Object[] odlSchemaNodes = null;
private boolean module = false;
private String bluePrintTableName = null;
private String odlTableName = null;
private String origName = null;
+ public void mergeAugmentation(XSQLBluePrintNode aug) {
+ this.relations.addAll(aug.relations);
+ this.inheritingNodes.addAll(aug.inheritingNodes);
+ this.children.addAll(aug.children);
+ this.columns.addAll(aug.columns);
+ this.origNameToColumn.putAll(aug.origNameToColumn);
+ if (aug.odlSchemaNodes != null) {
+ for (Object sn : aug.odlSchemaNodes) {
+ addToSchemaNodes(sn);
+ }
+ }
+ }
+
public XSQLBluePrintNode(String name, String _origName, int _level) {
this.level = _level;
this.odlTableName = name;
public XSQLBluePrintNode(Object _odlNode, int _level,
XSQLBluePrintNode _parent) {
- this.odlNode = _odlNode;
+ addToSchemaNodes(_odlNode);
this.level = _level;
this.module = XSQLODLUtils.isModule(_odlNode);
this.parent = _parent;
this.bluePrintTableName = XSQLODLUtils.getBluePrintName(_odlNode);
- this.odlTableName = XSQLODLUtils.getODLNodeName(this.odlNode);
+ this.odlTableName = XSQLODLUtils
+ .getODLNodeName(getFirstFromSchemaNodes());
+ }
+
+ private void addToSchemaNodes(Object schemaObject) {
+ if (this.odlSchemaNodes == null)
+ this.odlSchemaNodes = new Object[1];
+ else {
+ Object[] temp = new Object[this.odlSchemaNodes.length + 1];
+ System.arraycopy(this.odlSchemaNodes, 0, temp, 0,
+ this.odlSchemaNodes.length);
+ this.odlSchemaNodes = temp;
+ }
+ this.odlSchemaNodes[this.odlSchemaNodes.length - 1] = schemaObject;
+ }
+
+ public Object getFirstFromSchemaNodes() {
+ if (this.odlSchemaNodes == null) {
+ return null;
+ }
+ return this.odlSchemaNodes[0];
}
public String getOrigName() {
public String getODLTableName() {
if (this.odlTableName == null) {
- this.odlTableName = XSQLODLUtils.getODLNodeName(this.odlNode);
+ this.odlTableName = XSQLODLUtils
+ .getODLNodeName(getFirstFromSchemaNodes());
}
return this.odlTableName;
}
- public Object getODLNode() {
- return this.odlNode;
- }
-
- public void AddChild(XSQLBluePrintNode ch) {
+ public void addChild(XSQLBluePrintNode ch) {
this.children.add(ch);
}
if (myInterfaceName != null) {
return myInterfaceName;
}
- if (odlNode != null) {
+ if (this.odlSchemaNodes != null) {
return getBluePrintNodeName();
}
if (odlTableName != null) {
@Override
public boolean equals(Object obj) {
XSQLBluePrintNode other = (XSQLBluePrintNode) obj;
- if (odlNode != null) {
+ if (this.odlSchemaNodes != null) {
return getBluePrintNodeName().equals(other.getBluePrintNodeName());
} else if (this.odlTableName == null && other.odlTableName != null) {
return false;
}
if (this.odlTableName != null && other.odlTableName == null) {
return false;
- }
- else {
+ } else {
return this.odlTableName.equals(other.odlTableName);
}
}
public int hashCode() {
if (myInterfaceString != null) {
return myInterfaceString.hashCode();
- } else if (odlNode != null) {
+ } else if (this.odlSchemaNodes != null) {
return bluePrintTableName.hashCode();
}
return 0;
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.md.sal.dom.xsql;
import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
import org.opendaylight.yangtools.yang.model.util.Uint32;
import org.opendaylight.yangtools.yang.model.util.Uint64;
import org.opendaylight.yangtools.yang.model.util.Uint8;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class XSQLODLUtils {
private static Map<Class<?>, Class<?>> types =
public static boolean createOpenDaylightCache(XSQLBluePrint bluePrint,Object module) {
XSQLBluePrintNode node = new XSQLBluePrintNode(module, 0,null);
- bluePrint.addToBluePrintCache(node);
+ bluePrint.addToBluePrintCache(node,null);
collectODL(bluePrint, node, ((Module) module).getChildNodes(), 1);
return true;
}
return;
}
for (DataSchemaNode n : nodes) {
- if (n instanceof DataNodeContainer /*|| n instanceof LeafListSchemaNode*/
- || n instanceof ListSchemaNode) {
+ if (n instanceof DataNodeContainer) {
XSQLBluePrintNode bn = new XSQLBluePrintNode(n, level,parent);
- bluePrint.addToBluePrintCache(bn);
- parent.AddChild(bn);
- if (n instanceof DataNodeContainer) {
+ bn = bluePrint.addToBluePrintCache(bn,parent);
+ if (n instanceof ListSchemaNode) {
level++;
- collectODL(bluePrint, bn,
- ((DataNodeContainer) n).getChildNodes(), level);
+ collectODL(bluePrint, bn,((ListSchemaNode) n).getChildNodes(), level);
+ Set<AugmentationSchema> s = ((ListSchemaNode)n).getAvailableAugmentations();
+ if(s!=null){
+ for(AugmentationSchema as:s){
+ collectODL(bluePrint, bn,as.getChildNodes(), level);
+ }
+ }
level--;
- } else if (n instanceof ListSchemaNode) {
+ }else{
level++;
- collectODL(bluePrint, bn,
- ((ListSchemaNode) n).getChildNodes(), level);
+ collectODL(bluePrint, bn,((DataNodeContainer) n).getChildNodes(), level);
+ if(n instanceof ContainerSchemaNode){
+ Set<AugmentationSchema> s = ((ContainerSchemaNode)n).getAvailableAugmentations();
+ if(s!=null){
+ for(AugmentationSchema as:s){
+ collectODL(bluePrint, bn,as.getChildNodes(), level);
+ }
+ }
+ }
level--;
}
} else {
Field f = findField(c, name);
return f.get(o);
} catch (Exception err) {
- XSQLAdapter.log(err);
+ //XSQLAdapter.log(err);
}
return null;
}
return (Map<?, ?>) get(o, "children");
}
+ public static Collection<?> getChildrenCollection(Object o) {
+ Object value = get(o, "children");
+ if(value==null)
+ return Collections.emptyList();
+ if(value instanceof Map)
+ return ((Map<?,?>)value).values();
+ else
+ if(value instanceof Collection){
+ return (Collection<?>)value;
+ }else{
+ XSQLAdapter.log("Unknown Child Value Type="+value.getClass().getName());
+ return new ArrayList();
+ }
+ }
+
public static Object getValue(Object o) {
return get(o, "value");
}
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.controller.md.sal.dom.xsql.jdbc;
import java.io.InputStream;
import java.io.Reader;
import java.io.Serializable;
import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrint;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrintNode;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLColumn;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLCriteria;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLODLUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
-public class JDBCResultSet implements Serializable, ResultSet,
- ResultSetMetaData {
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
+public class JDBCResultSet implements Serializable, ResultSet, ResultSetMetaData {
private static final long serialVersionUID = -7450200738431047057L;
private static final ClassLoader CLASS_LOADER = JDBCResultSet.class.getClassLoader();
private static final Class<?>[] PROXY_INTERFACES = new Class[] { ResultSet.class };
private Map<String, Map<XSQLColumn, List<XSQLCriteria>>> criteria = new ConcurrentHashMap<String, Map<XSQLColumn, List<XSQLCriteria>>>();
private Exception err = null;
private List<Record> EMPTY_RESULT = new LinkedList<Record>();
- private transient Map<String,JDBCResultSet> subQueries = new HashMap<String,JDBCResultSet>();
+ private transient Map<String, JDBCResultSet> subQueries = new HashMap<String, JDBCResultSet>();
public ResultSet getProxy() {
- return (ResultSet) Proxy.newProxyInstance(CLASS_LOADER, PROXY_INTERFACES, new JDBCProxy(this));
+ return this;
+ //return (ResultSet) Proxy.newProxyInstance(CLASS_LOADER, PROXY_INTERFACES, new JDBCProxy(this));
}
public void setSQL(String _sql) {
this.sql = _sql;
}
- public JDBCResultSet addSubQuery(String _sql,String logicalName) {
+ public JDBCResultSet addSubQuery(String _sql, String logicalName) {
if (subQueries == null) {
- subQueries = new HashMap<String,JDBCResultSet>();
+ subQueries = new HashMap<String, JDBCResultSet>();
}
JDBCResultSet rs = new JDBCResultSet(_sql);
- this.subQueries.put(logicalName,rs);
+ this.subQueries.put(logicalName, rs);
return rs;
}
- public Map<String,JDBCResultSet> getSubQueries() {
- if (this.subQueries==null) {
+ public Map<String, JDBCResultSet> getSubQueries() {
+ if (this.subQueries == null) {
this.subQueries = new HashMap<>();
}
return this.subQueries;
}
}
- public int isObjectFitCriteria(Map<String, Object> objValues, String tableName) {
+ public int isObjectFitCriteria(Map<String, Object> objValues,
+ String tableName) {
Map<XSQLColumn, List<XSQLCriteria>> tblCriteria = criteria
.get(tableName);
if (tblCriteria == null) {
}
public static class Record {
+ // The map container the Attribute 2 the attribute value
public Map<String, Object> data = new HashMap<>();
+ // The Element Object (Possibly some kind of NormalizedNode
public Object element = null;
+ // Does this record fit the criteria
+ // In case of a list property, we first collect the list and only then
+ // we
+ // we decide which list item should be included or not.
+ public boolean fitCriteria = true;
public Map<String, Object> getRecord() {
return this.data;
}
}
- private Map<String, Object> collectColumnValues(Object node, XSQLBluePrintNode bpn) {
- Map<?, ?> subChildren = XSQLODLUtils.getChildren(node);
- Map<String, Object> result = new HashMap<>();
- for (Object stc : subChildren.values()) {
- if (stc.getClass().getName().endsWith("ImmutableAugmentationNode")) {
+ public static class RecordsContainer {
+ public List<Record> records = new LinkedList<Record>();
+ public List<Record> fitRecords = new LinkedList<Record>();
+ public Object currentObject = null;
+ }
+
+ private void collectColumnValues(RecordsContainer rContainer,
+ XSQLBluePrintNode bpn) {
+ Collection<?> subChildren = XSQLODLUtils
+ .getChildrenCollection(rContainer.currentObject);
+ Record r = new Record();
+ r.element = rContainer.currentObject;
+ for (Object stc : subChildren) {
+ if (stc.getClass().getName()
+ .endsWith("ImmutableUnkeyedListEntryNode")) {
+ r.fitCriteria = false;
+ rContainer.currentObject = stc;
+ collectColumnValues(rContainer, bpn);
+ } else if (stc.getClass().getName()
+ .endsWith("ImmutableAugmentationNode")) {
Map<?, ?> values = XSQLODLUtils.getChildren(stc);
for (Object key : values.keySet()) {
Object val = values.get(key);
Object value = XSQLODLUtils.getValue(val);
String k = XSQLODLUtils.getNodeName(val);
if (value != null) {
- result.put(bpn.getBluePrintNodeName() + "." + k,
+ r.data.put(bpn.getBluePrintNodeName() + "." + k,
value.toString());
}
}
String k = XSQLODLUtils.getNodeName(stc);
Object value = XSQLODLUtils.getValue(stc);
if (value != null) {
- result.put(bpn.getBluePrintNodeName() + "." + k,
+ r.data.put(bpn.getBluePrintNodeName() + "." + k,
value.toString());
}
}
}
- return result;
+ if (r.fitCriteria) {
+ rContainer.records.add(r);
+ }
}
- private void addToData(Record rec, XSQLBluePrintNode bpn,
- XSQLBluePrint bluePrint, Map<String, Object> fullRecord) {
+ private void addToData(Record rec, XSQLBluePrintNode bpn,XSQLBluePrint bluePrint, Map<String, Object> fullRecord) {
XSQLBluePrintNode eNodes[] = bluePrint
.getBluePrintNodeByODLTableName(XSQLODLUtils
.getNodeIdentiofier(rec.element));
String odlNodeName = XSQLODLUtils.getNodeIdentiofier(child);
if (odlNodeName == null) {
+ if (child instanceof DataContainerNode) {
+ List<Object> augChidlren = getChildren(child, tableName,
+ bluePrint);
+ result.addAll(augChidlren);
+ }
continue;
}
continue;
}
- if (child.getClass().getName().endsWith("ImmutableContainerNode")) {
+ if (child.getClass().getName().endsWith("ImmutableUnkeyedListNode")) {
+ result.add(child);
+ } else if (child.getClass().getName()
+ .endsWith("ImmutableContainerNode")) {
result.add(child);
} else if (child.getClass().getName()
.endsWith("ImmutableAugmentationNode")) {
}
} else if (child.getClass().getName().endsWith("ImmutableMapNode")) {
result.addAll(XSQLODLUtils.getMChildren(child));
+ } else {
+ XSQLAdapter.log("Missed Node Data OF Type="
+ + child.getClass().getName());
}
}
return result;
}
- public List<Record> addRecords(Object element, XSQLBluePrintNode node,boolean root, String tableName, XSQLBluePrint bluePrint) {
+ public List<Record> addRecords(Object element, XSQLBluePrintNode node,
+ boolean root, String tableName, XSQLBluePrint bluePrint) {
List<Record> result = new LinkedList<Record>();
- //In case this is a sibling to the requested table, the elenment type
- //won't be in the path of the leaf node
- if(node==null){
- return result;
- }
String nodeID = XSQLODLUtils.getNodeIdentiofier(element);
if (node.getODLTableName().equals(nodeID)) {
- XSQLBluePrintNode bluePrintNode = bluePrint.getBluePrintNodeByODLTableName(nodeID)[0];
- Record rec = new Record();
- rec.element = element;
- XSQLBluePrintNode bpn = this.tablesInQueryMap.get(bluePrintNode.getBluePrintNodeName());
- if (this.criteria.containsKey(bluePrintNode.getBluePrintNodeName()) || bpn != null) {
- Map<String, Object> allKeyValues = collectColumnValues(element, bpn);
- if (!(isObjectFitCriteria(allKeyValues,
- bpn.getBluePrintNodeName()) == 1)) {
- return EMPTY_RESULT;
+ XSQLBluePrintNode bluePrintNode = bluePrint
+ .getBluePrintNodeByODLTableName(nodeID)[0];
+ RecordsContainer rContainer = new RecordsContainer();
+ rContainer.currentObject = element;
+ XSQLBluePrintNode bpn = this.tablesInQueryMap.get(bluePrintNode
+ .getBluePrintNodeName());
+ if (this.criteria.containsKey(bluePrintNode.getBluePrintNodeName())
+ || bpn != null) {
+ collectColumnValues(rContainer, bpn);
+ for (Record r : rContainer.records) {
+ if (!(isObjectFitCriteria(r.data,
+ bpn.getBluePrintNodeName()) == 1)) {
+ r.fitCriteria = false;
+ }
+ if (r.fitCriteria) {
+ Record rec = new Record();
+ rec.element = r.element;
+ addToData(rec, bpn, bluePrint, r.data);
+ rContainer.fitRecords.add(rec);
+ }
}
- addToData(rec, bpn, bluePrint, allKeyValues);
+ if (rContainer.fitRecords.isEmpty())
+ return EMPTY_RESULT;
}
- if (root) {
- addRecord(rec.data);
+ if (rContainer.records.isEmpty()) {
+ Record rec = new Record();
+ rec.element = rContainer.currentObject;
+ if (root) {
+ addRecord(rec.data);
+ } else {
+ result.add(rec);
+ }
} else {
- result.add(rec);
+ for (Record rec : rContainer.fitRecords) {
+ if (root) {
+ addRecord(rec.data);
+ } else {
+ result.add(rec);
+ }
+ }
}
return result;
}
XSQLBluePrintNode parent = node.getParent();
- List<Record> subRecords = addRecords(element, parent, false, tableName,bluePrint);
+ List<Record> subRecords = addRecords(element, parent, false, tableName,
+ bluePrint);
for (Record subRec : subRecords) {
List<Object> subO = getChildren(subRec.element, tableName,
bluePrint);
if (subO != null) {
for (Object subData : subO) {
- Record rec = new Record();
- rec.element = subData;
- rec.data.putAll(subRec.data);
+ RecordsContainer rContainer = new RecordsContainer();
+ rContainer.currentObject = subData;
- String recID = XSQLODLUtils.getNodeIdentiofier(rec.element);
+ String recID = XSQLODLUtils
+ .getNodeIdentiofier(rContainer.currentObject);
XSQLBluePrintNode eNodes[] = bluePrint
.getBluePrintNodeByODLTableName(recID);
XSQLBluePrintNode bpn = null;
break;
}
}
- boolean isObjectInCriteria = true;
if (bpn != null) {
- Map<String, Object> allKeyValues = collectColumnValues(rec.element, bpn);
- if ((isObjectFitCriteria(allKeyValues,
- bpn.getBluePrintNodeName()) == 1)) {
- addToData(rec, bpn, bluePrint, allKeyValues);
- } else {
- isObjectInCriteria = false;
+ collectColumnValues(rContainer, bpn);
+ for (Record r : rContainer.records) {
+ if ((isObjectFitCriteria(r.data,
+ bpn.getBluePrintNodeName()) == 1)) {
+ Record rec = new Record();
+ rec.data.putAll(subRec.data);
+ rec.element = r.element;
+ addToData(rec, bpn, bluePrint, r.data);
+ } else {
+ r.fitCriteria = false;
+ }
}
}
-
- if (isObjectInCriteria) {
+ if (rContainer.records.isEmpty()) {
+ Record rec = new Record();
+ rec.data.putAll(subRec.data);
+ rec.element = rContainer.currentObject;
if (root) {
if (!rec.data.isEmpty()) {
addRecord(rec.data);
} else {
result.add(rec);
}
+ } else {
+ for (Record r : rContainer.records) {
+ r.data.putAll(subRec.data);
+ if (r.fitCriteria) {
+ if (root) {
+ if (!r.data.isEmpty()) {
+ addRecord(r.data);
+ }
+ } else {
+ result.add(r);
+ }
+ }
+ }
}
}
}
}
-
return result;
}
}
}
- public static void execute(JDBCResultSet rs, XSQLAdapter adapter)
- throws SQLException {
+ public static void execute(JDBCResultSet rs, XSQLAdapter adapter)throws SQLException {
if(rs.getSQL().toLowerCase().trim().equals("select 1")){
rs.setFinished(true);
return;
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.xsql;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQL;
import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQLBuilder;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Created by root on 6/26/14.
- */
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class XSQLProvider implements AutoCloseable {
public static final InstanceIdentifier<XSQL> ID = InstanceIdentifier.builder(XSQL.class).build();
- private static final Logger LOG = LoggerFactory.getLogger(XSQLProvider.class);
+ //public static final InstanceIdentifier<SalTest> ID2 = InstanceIdentifier.builder(SalTest.class).build();
public void close() {
}
- public XSQL buildXSQL(DataProviderService dps) {
+ public XSQL buildXSQL(DataBroker dps) {
+ XSQLAdapter.log("Building XSL...");
XSQLBuilder builder = new XSQLBuilder();
builder.setPort("34343");
XSQL xsql = builder.build();
try {
if (dps != null) {
- final DataModificationTransaction t = dps.beginTransaction();
- t.removeOperationalData(ID);
- t.putOperationalData(ID,xsql);
- t.commit().get();
+ XSQLAdapter.log("Starting TRansaction...");
+ WriteTransaction t = dps.newReadWriteTransaction();
+ t.delete(LogicalDatastoreType.OPERATIONAL, ID);
+ t.put(LogicalDatastoreType.OPERATIONAL,ID,xsql);
+ XSQLAdapter.log("Submitting...");
+ t.submit();
}
} catch (Exception e) {
- LOG.warn("Failed to update XSQL port status, ", e);
+ XSQLAdapter.log(e);
}
return xsql;
}
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
package org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
import org.opendaylight.xsql.XSQLProvider;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
public class XSQLModule extends org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.AbstractXSQLModule {
+ private static final long SLEEP_TIME_BEFORE_CREATING_TRANSACTION = 10000;
public XSQLModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
XSQLAdapter xsqlAdapter = XSQLAdapter.getInstance();
getSchemaServiceDependency().registerSchemaContextListener(xsqlAdapter);
xsqlAdapter.setDataBroker(getAsyncDataBrokerDependency());
- XSQLProvider p = new XSQLProvider();
- //p.buildXSQL(getDataBrokerDependency());
+ final XSQLProvider p = new XSQLProvider();
+ Runnable runthis = new Runnable() {
+ @Override
+ public void run() {
+ try{Thread.sleep(SLEEP_TIME_BEFORE_CREATING_TRANSACTION);}catch(Exception err){}
+ p.buildXSQL(getDataBrokerDependency());
+ }
+ };
return p;
}
-
}
</type>
<name>XSQL</name>
<data-broker>
- <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
- <name>binding-data-broker</name>
+ <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+ <name>binding-data-broker</name>
</data-broker>
<async-data-broker>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
case XSQL {
when "/config:modules/config:module/config:type = 'XSQL'";
- container data-broker {
+ container data-broker {
uses config:service-ref {
refine type {
mandatory false;
- config:required-identity mdsal:binding-data-broker;
+ config:required-identity mdsal:binding-async-data-broker;
}
}
- }
+ }
container async-data-broker {
uses config:service-ref {
import java.io.InputStream;
import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrint;
import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCResultSet;
import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCServer;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
public class XSQLTest {
-
- XSQLBluePrint bluePrint = null;
+ private static final String DATASTORE_TEST_YANG = "/sal-persisted-dom-test.yang";
+ private XSQLBluePrint bluePrint = null;
+ //private static SchemaContext schemaContext = null;
+ @BeforeClass
+ public static void loadSchemaContext(){
+ //schemaContext = createTestContext();
+ }
@Before
public void before() {
System.out.print("*** XSQL Tests -");
System.out.println(str);
}
+
+ public static final InputStream getDatastoreTestInputStream() {
+ return getInputStream(DATASTORE_TEST_YANG);
+ }
+
+ private static InputStream getInputStream(final String resourceName) {
+ return XSQLTest.class.getResourceAsStream(DATASTORE_TEST_YANG);
+ }
+
+ public static SchemaContext createTestContext() {
+ YangParserImpl parser = new YangParserImpl();
+ Set<Module> modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream()));
+ return parser.resolveSchemaContext(modules);
+ }
}
<param-name>javax.ws.rs.Application</param-name>
<param-value>org.opendaylight.controller.sal.rest.doc.jaxrs.ApiDocApplication</param-value>
</init-param>
+ <!-- AAA Auth Filter -->
+ <init-param>
+ <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+ <param-value> org.opendaylight.aaa.sts.TokenAuthFilter</param-value>
+ </init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<security-constraint>
<web-resource-collection>
- <web-resource-name>free access</web-resource-name>
- <url-pattern>/explorer/css/*</url-pattern>
- <url-pattern>/explorer/images/*</url-pattern>
- <url-pattern>/explorer/lib/*</url-pattern>
- <url-pattern>/explorer/*</url-pattern>
+ <web-resource-name>API Doc</web-resource-name>
+ <url-pattern>/*</url-pattern>
</web-resource-collection>
</security-constraint>