Merge "Bug 2933: BidningDOMDataBrokerAdaper implements DataTreeChangeService"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 7 Apr 2015 18:17:20 +0000 (18:17 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 7 Apr 2015 18:17:20 +0000 (18:17 +0000)
42 files changed:
features/mdsal/src/main/resources/features.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-dom-xsql-config/src/main/resources/04-xsql.xml
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/odl/xsql/JDBCDriver.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/TablesResultSet.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLBluePrint.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLBluePrintNode.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLODLUtils.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/jdbc/JDBCResultSet.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/jdbc/JDBCServer.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/xsql/XSQLProvider.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/yang/gen/v1/http/netconfcentral/org/ns/xsql/rev140626/XSQLModule.java
opendaylight/md-sal/sal-dom-xsql/src/main/resources/04-xsql.xml
opendaylight/md-sal/sal-dom-xsql/src/main/yang/XSQL.yang
opendaylight/md-sal/sal-dom-xsql/src/test/java/org/opendaylight/xsql/test/XSQLTest.java
opendaylight/md-sal/sal-dom-xsql/src/test/resources/BluePrintCache.dat
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java [deleted file]
opendaylight/md-sal/sal-rest-docgen/src/main/resources/WEB-INF/web.xml

index 021ed1d..f8bbfec 100644 (file)
@@ -11,7 +11,7 @@
     <repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
-        <feature version='${project.version}'>odl-mdsal-clustering</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-xsql</feature>
         <feature version='${project.version}'>odl-toaster</feature>
     </feature>
@@ -41,7 +41,7 @@
         <configfile finalname='${config.configfile.directory}/${config.netconf.mdsal.configfile}'>mvn:org.opendaylight.controller/netconf-mdsal-config/${netconf.version}/xml/config</configfile>
     </feature>
 
-    <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
+    <feature name='odl-mdsal-broker-local' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <feature version='${yangtools.version}'>odl-yangtools-common</feature>
         <feature version='${yangtools.version}'>odl-yangtools-binding</feature>
         <feature version='${yangtools.version}'>odl-yangtools-models</feature>
@@ -76,7 +76,7 @@
         <configfile finalname="${config.configfile.directory}/${config.xsql.configfile}">mvn:org.opendaylight.controller/sal-dom-xsql-config/${project.version}/xml/config</configfile>
     </feature>
     <feature name ='odl-mdsal-clustering-commons' version='${project.version}'>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${akka.version}'>odl-akka-system</feature>
         <feature version='${akka.version}'>odl-akka-persistence</feature>
         <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/${project.version}</bundle>
         <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
     </feature>
     <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
         <feature version='${akka.version}'>odl-akka-clustering</feature>
         <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version}</bundle>
     </feature>
     <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
         <feature version='${akka.version}'>odl-akka-clustering</feature>
         <feature version='0.7'>odl-akka-leveldb</feature>
         <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version}</bundle>
     </feature>
-    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+    <feature name ='odl-mdsal-broker' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-remoterpc-connector</feature>
         <feature version='${project.version}'>odl-mdsal-distributed-datastore</feature>
         <configfile finalname="${config.configfile.directory}/${config.clustering.configfile}">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config</configfile>
         <configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
         <configfile finalname="etc/org.opendaylight.controller.cluster.datastore.cfg">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/cfg/datastore</configfile>
     </feature>
-
+    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+      <feature version='${project.version}'>odl-mdsal-broker</feature>
+    </feature>
     <feature name='odl-clustering-test-app' version='${project.version}'>
         <feature version='${yangtools.version}'>odl-yangtools-models</feature>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <bundle>mvn:org.opendaylight.controller.samples/clustering-it-model/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller.samples/clustering-it-provider/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
index a13b6ff..1c30fe2 100644 (file)
@@ -43,7 +43,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -107,7 +107,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
      */
-    private RaftActorBehavior currentBehavior;
+    private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
 
     /**
      * This context should NOT be passed directly to any other actor it is
@@ -119,11 +119,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
 
-    /**
-     * The in-memory journal
-     */
-    private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -139,9 +134,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
-            -1, -1, replicatedLog, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
-            LOG);
+            -1, -1, peerAddresses,
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG);
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
     }
 
     private void initRecoveryTimer() {
@@ -184,7 +180,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             } else if (message instanceof ApplyJournalEntries) {
                 onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
-                replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
                         ((UpdateElectionTerm) message).getVotedFor());
@@ -219,9 +215,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         // Create a replicated log with the snapshot information
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
-        replicatedLog = new ReplicatedLogImpl(snapshot);
 
-        context.setReplicatedLog(replicatedLog);
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+                currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
@@ -232,8 +228,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
+                replicatedLog().size(), persistenceId(), timer.toString(),
+                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -241,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
-        replicatedLog.append(logEntry);
+        replicatedLog().append(logEntry);
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
@@ -251,7 +247,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog.get(i));
+            batchRecoveredLogEntry(replicatedLog().get(i));
         }
 
         context.setLastApplied(toIndex);
@@ -297,8 +293,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm(), replicatedLog.size());
+            replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+            replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         initializeBehavior();
     }
@@ -308,9 +304,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        reusableBehaviorStateHolder.init(currentBehavior);
-        currentBehavior = newBehavior;
-        handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+        reusableBehaviorStateHolder.init(getCurrentBehavior());
+        setCurrentBehavior(newBehavior);
+        handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
     @Override public void handleCommand(Object message) {
@@ -353,8 +349,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            replicatedLog = new ReplicatedLogImpl(snapshot);
-            context.setReplicatedLog(replicatedLog);
+            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+                    currentBehavior));
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
@@ -391,11 +387,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message.equals(COMMIT_SNAPSHOT)) {
             commitSnapshot(-1);
         } else {
-            reusableBehaviorStateHolder.init(currentBehavior);
+            reusableBehaviorStateHolder.init(getCurrentBehavior());
 
-            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+            setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
 
-            handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+            handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
         }
     }
 
@@ -405,17 +401,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
-                .inMemoryJournalDataSize(replicatedLog.dataSize())
-                .inMemoryJournalLogSize(replicatedLog.size())
+                .inMemoryJournalDataSize(replicatedLog().dataSize())
+                .inMemoryJournalLogSize(replicatedLog().size())
                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
-                .lastIndex(replicatedLog.lastIndex())
-                .lastTerm(replicatedLog.lastTerm())
+                .lastIndex(replicatedLog().lastIndex())
+                .lastTerm(replicatedLog().lastTerm())
                 .leader(getLeaderId())
                 .raftState(currentBehavior.state().toString())
                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
-                .snapshotIndex(replicatedLog.getSnapshotIndex())
-                .snapshotTerm(replicatedLog.getSnapshotTerm())
+                .snapshotIndex(replicatedLog().getSnapshotIndex())
+                .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
                 .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
 
@@ -425,8 +421,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             builder.lastLogTerm(lastLogEntry.getTerm());
         }
 
-        if(currentBehavior instanceof AbstractLeader) {
-            AbstractLeader leader = (AbstractLeader)currentBehavior;
+        if(getCurrentBehavior() instanceof AbstractLeader) {
+            AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
             Collection<String> followerIds = leader.getFollowerIds();
             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
             for(String id: followerIds) {
@@ -490,39 +486,49 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog
-                .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
-                        if(!hasFollowers()){
-                            // Increment the Commit Index and the Last Applied values
-                            raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                            raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
+            @Override
+            public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+                if(!hasFollowers()){
+                    // Increment the Commit Index and the Last Applied values
+                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
+                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
 
-                            // Apply the state immediately
-                            applyState(clientActor, identifier, data);
+                    // Apply the state immediately
+                    applyState(clientActor, identifier, data);
 
-                            // Send a ApplyJournalEntries message so that we write the fact that we applied
-                            // the state to durable storage
-                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                    // Send a ApplyJournalEntries message so that we write the fact that we applied
+                    // the state to durable storage
+                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+                    context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
 
-                        } else if (clientActor != null) {
-                            // Send message for replication
-                            currentBehavior.handleMessage(getSelf(),
-                                    new Replicate(clientActor, identifier,
-                                            replicatedLogEntry)
-                            );
-                        }
+                } else if (clientActor != null) {
+                    // Send message for replication
+                    currentBehavior.handleMessage(getSelf(),
+                            new Replicate(clientActor, identifier, replicatedLogEntry));
+                }
+            }
+        });
+    }
 
-                    }
-                });    }
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
 
     protected String getId() {
         return context.getId();
     }
 
+    @VisibleForTesting
+    void setCurrentBehavior(RaftActorBehavior behavior) {
+        currentBehavior.setDelegate(behavior);
+    }
+
+    protected RaftActorBehavior getCurrentBehavior() {
+        return currentBehavior.getDelegate();
+    }
+
     /**
      * Derived actors can call the isLeader method to check if the current
      * RaftActor is the Leader or not
@@ -563,7 +569,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog.last();
+        return replicatedLog().last();
     }
 
     protected Long getCurrentTerm(){
@@ -750,125 +756,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
-    }
-
-    protected long getTotalMemory() {
-        return Runtime.getRuntime().totalMemory();
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
     }
 
     protected boolean hasFollowers(){
-        return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
-    }
-
-    private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-        private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0L;
-
-
-        public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries());
-        }
-
-        public ReplicatedLogImpl() {
-            super();
-        }
-
-        @Override public void removeFromAndPersist(long logEntryIndex) {
-            int adjustedIndex = adjustedIndex(logEntryIndex);
-
-            if (adjustedIndex < 0) {
-                return;
-            }
-
-            // FIXME: Maybe this should be done after the command is saved
-            journal.subList(adjustedIndex , journal.size()).clear();
-
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
-
-                @Override
-                public void apply(DeleteEntries param)
-                        throws Exception {
-                    //FIXME : Doing nothing for now
-                    dataSize = 0;
-                    for (ReplicatedLogEntry entry : journal) {
-                        dataSize += entry.size();
-                    }
-                }
-            });
-        }
-
-        @Override public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry) {
-            appendAndPersist(replicatedLogEntry, null);
-        }
-
-        public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
-            }
-
-            // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-            journal.add(replicatedLogEntry);
-
-            // When persisting events with persist it is guaranteed that the
-            // persistent actor will not receive further commands between the
-            // persist call and the execution(s) of the associated event
-            // handler. This also holds for multiple persist calls in context
-            // of a single command.
-            persistence().persist(replicatedLogEntry,
-                new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry evt) throws Exception {
-                        int logEntrySize = replicatedLogEntry.size();
-
-                        dataSize += logEntrySize;
-                        long dataSizeForCheck = dataSize;
-
-                        dataSizeSinceLastSnapshot += logEntrySize;
-
-                        if (!hasFollowers()) {
-                            // When we do not have followers we do not maintain an in-memory log
-                            // due to this the journalSize will never become anything close to the
-                            // snapshot batch count. In fact will mostly be 1.
-                            // Similarly since the journal's dataSize depends on the entries in the
-                            // journal the journal's dataSize will never reach a value close to the
-                            // memory threshold.
-                            // By maintaining the dataSize outside the journal we are tracking essentially
-                            // what we have written to the disk however since we no longer are in
-                            // need of doing a snapshot just for the sake of freeing up memory we adjust
-                            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
-                            // as if we were maintaining a real snapshot
-                            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
-                        }
-                        long journalSize = replicatedLogEntry.getIndex() + 1;
-                        long dataThreshold = getTotalMemory() *
-                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
-                                || dataSizeForCheck > dataThreshold)) {
-
-                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                                    currentBehavior.getReplicatedToAllIndex());
-
-                            if(started){
-                                dataSizeSinceLastSnapshot = 0;
-                            }
-
-                        }
-
-                        if (callback != null){
-                            callback.apply(replicatedLogEntry);
-                        }
-                    }
-                }
-            );
-        }
-
+        return getRaftActorContext().hasFollowers();
     }
 
     static class DeleteEntries implements Serializable {
@@ -911,15 +803,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    @VisibleForTesting
-    void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
-        currentBehavior = behavior;
-    }
-
-    protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior;
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
index 2e7eb5e..9f4b7cb 100644 (file)
@@ -12,6 +12,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import java.util.Map;
 import org.slf4j.Logger;
 
@@ -168,4 +170,11 @@ public interface RaftActorContext {
 
     SnapshotManager getSnapshotManager();
 
+    boolean hasFollowers();
+
+    long getTotalMemory();
+
+    @VisibleForTesting
+    void setTotalMemoryRetriever(Supplier<Long> retriever);
+
 }
index eb059d6..684845c 100644 (file)
@@ -14,6 +14,8 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import java.util.Map;
 import org.slf4j.Logger;
 
@@ -39,25 +41,22 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private ConfigParams configParams;
 
-    private boolean snapshotCaptureInitiated;
+    @VisibleForTesting
+    private Supplier<Long> totalMemoryRetriever;
 
     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
     // be passed to it in the constructor
     private SnapshotManager snapshotManager;
 
-    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
-        String id,
-        ElectionTerm termInformation, long commitIndex,
-        long lastApplied, ReplicatedLog replicatedLog,
-        Map<String, String> peerAddresses, ConfigParams configParams,
-        Logger logger) {
+    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+            ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
+            ConfigParams configParams, Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
         this.termInformation = termInformation;
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.replicatedLog = replicatedLog;
         this.peerAddresses = peerAddresses;
         this.configParams = configParams;
         this.LOG = logger;
@@ -161,10 +160,26 @@ public class RaftActorContextImpl implements RaftActorContext {
         peerAddresses.put(peerId, peerAddress);
     }
 
+    @Override
     public SnapshotManager getSnapshotManager() {
         if(snapshotManager == null){
             snapshotManager = new SnapshotManager(this, LOG);
         }
         return snapshotManager;
     }
+
+    @Override
+    public long getTotalMemory() {
+        return totalMemoryRetriever != null ? totalMemoryRetriever.get() : Runtime.getRuntime().totalMemory();
+    }
+
+    @Override
+    public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+        totalMemoryRetriever = retriever;
+    }
+
+    @Override
+    public boolean hasFollowers() {
+        return getPeerAddresses().keySet().size() > 0;
+    }
 }
index 82d0839..3e4d727 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import akka.japi.Procedure;
 import java.util.List;
 
 /**
@@ -85,6 +86,8 @@ public interface ReplicatedLog {
      */
     void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
 
+    void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
+
     /**
      *
      * @param index the index of the log entry
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
new file mode 100644 (file)
index 0000000..fdb6305
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+/**
+ * Implementation of ReplicatedLog used by the RaftActor.
+ */
+class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+    private static final int DATA_SIZE_DIVIDER = 5;
+
+    private long dataSizeSinceLastSnapshot = 0L;
+    private final RaftActorContext context;
+    private final DataPersistenceProvider persistence;
+    private final RaftActorBehavior currentBehavior;
+
+    private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
+        @Override
+        public void apply(DeleteEntries param) {
+            dataSize = 0;
+            for (ReplicatedLogEntry entry : journal) {
+                dataSize += entry.size();
+            }
+        }
+    };
+
+    static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
+            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+                snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+    }
+
+    static ReplicatedLog newInstance(RaftActorContext context,
+            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
+                persistence, currentBehavior);
+    }
+
+    private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
+            RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        super(snapshotIndex, snapshotTerm, unAppliedEntries);
+        this.context = context;
+        this.persistence = persistence;
+        this.currentBehavior = currentBehavior;
+    }
+
+    @Override
+    public void removeFromAndPersist(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+
+        if (adjustedIndex < 0) {
+            return;
+        }
+
+        // FIXME: Maybe this should be done after the command is saved
+        journal.subList(adjustedIndex , journal.size()).clear();
+
+        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+    }
+
+    @Override
+    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+        appendAndPersist(replicatedLogEntry, null);
+    }
+
+    @Override
+    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback)  {
+
+        if(context.getLogger().isDebugEnabled()) {
+            context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
+        }
+
+        // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+        journal.add(replicatedLogEntry);
+
+        // When persisting events with persist it is guaranteed that the
+        // persistent actor will not receive further commands between the
+        // persist call and the execution(s) of the associated event
+        // handler. This also holds for multiple persist calls in context
+        // of a single command.
+        persistence.persist(replicatedLogEntry,
+            new Procedure<ReplicatedLogEntry>() {
+                @Override
+                public void apply(ReplicatedLogEntry evt) throws Exception {
+                    int logEntrySize = replicatedLogEntry.size();
+
+                    dataSize += logEntrySize;
+                    long dataSizeForCheck = dataSize;
+
+                    dataSizeSinceLastSnapshot += logEntrySize;
+
+                    if (!context.hasFollowers()) {
+                        // When we do not have followers we do not maintain an in-memory log
+                        // due to this the journalSize will never become anything close to the
+                        // snapshot batch count. In fact will mostly be 1.
+                        // Similarly since the journal's dataSize depends on the entries in the
+                        // journal the journal's dataSize will never reach a value close to the
+                        // memory threshold.
+                        // By maintaining the dataSize outside the journal we are tracking essentially
+                        // what we have written to the disk however since we no longer are in
+                        // need of doing a snapshot just for the sake of freeing up memory we adjust
+                        // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+                        // as if we were maintaining a real snapshot
+                        dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+                    }
+                    long journalSize = replicatedLogEntry.getIndex() + 1;
+                    long dataThreshold = context.getTotalMemory() *
+                            context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+                    if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                            || dataSizeForCheck > dataThreshold)) {
+
+                        boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                currentBehavior.getReplicatedToAllIndex());
+
+                        if(started){
+                            dataSizeSinceLastSnapshot = 0;
+                        }
+                    }
+
+                    if (callback != null){
+                        callback.apply(replicatedLogEntry);
+                    }
+                }
+            }
+        );
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java
new file mode 100644 (file)
index 0000000..776dae7
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * A RaftActorBehavior implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingRaftActorBehavior implements RaftActorBehavior {
+    private RaftActorBehavior delegate;
+
+    public RaftActorBehavior getDelegate() {
+        return delegate;
+    }
+
+    public void setDelegate(RaftActorBehavior delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        return delegate.handleMessage(sender, message);
+    }
+
+    @Override
+    public RaftState state() {
+        return delegate.state();
+    }
+
+    @Override
+    public String getLeaderId() {
+        return delegate.getLeaderId();
+    }
+
+    @Override
+    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+        delegate.setReplicatedToAllIndex(replicatedToAllIndex);
+    }
+
+    @Override
+    public long getReplicatedToAllIndex() {
+        return delegate.getReplicatedToAllIndex();
+    }
+}
index dfaa8d5..b910313 100644 (file)
@@ -18,6 +18,7 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
@@ -52,7 +53,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
         private volatile byte[] snapshot;
-        private volatile long mockTotalMemory;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -74,13 +74,18 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             dropMessages.remove(msgClass);
         }
 
-        void setMockTotalMemory(long mockTotalMemory) {
-            this.mockTotalMemory = mockTotalMemory;
-        }
+        void setMockTotalMemory(final long mockTotalMemory) {
+            if(mockTotalMemory > 0) {
+                getRaftActorContext().setTotalMemoryRetriever(new Supplier<Long>() {
+                    @Override
+                    public Long get() {
+                        return mockTotalMemory;
+                    }
 
-        @Override
-        protected long getTotalMemory() {
-            return mockTotalMemory > 0 ? mockTotalMemory : super.getTotalMemory();
+                });
+            } else {
+                getRaftActorContext().setTotalMemoryRetriever(null);
+            }
         }
 
         @Override
index 885c3ab..8fdb7ea 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -220,5 +221,9 @@ public class AbstractReplicatedLogImplTest {
         public List<ReplicatedLogEntry> getEntriesTill(final int index) {
             return journal.subList(0, index);
         }
+
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+        }
     }
 }
index 53cca23..63f0df2 100644 (file)
@@ -12,7 +12,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.japi.Procedure;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
 import java.util.HashMap;
@@ -203,6 +205,20 @@ public class MockRaftActorContext implements RaftActorContext {
         this.configParams = configParams;
     }
 
+    @Override
+    public long getTotalMemory() {
+        return Runtime.getRuntime().totalMemory();
+    }
+
+    @Override
+    public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+    }
+
+    @Override
+    public boolean hasFollowers() {
+        return getPeerAddresses().keySet().size() > 0;
+    }
+
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
@@ -217,6 +233,19 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
+
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+            append(replicatedLogEntry);
+
+            if(callback != null) {
+                try {
+                    callback.apply(replicatedLogEntry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
     public static class MockPayload extends Payload implements Serializable {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..cac0f51
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import java.util.List;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import scala.concurrent.Future;
+
+/**
+ * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} instances returned by this
+ * implementation. In addition to the usual set of methods it also contains the list of actor
+ * futures.
+ */
+abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    abstract List<Future<ActorSelection>> getCohortFutures();
+}
index 933e87a..d94e1c6 100644 (file)
@@ -7,22 +7,40 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import scala.concurrent.Future;
 
 abstract class AbstractTransactionContext implements TransactionContext {
 
-    protected final TransactionIdentifier identifier;
-    protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+    private final List<Future<Object>> recordedOperationFutures = new ArrayList<>();
+    private final TransactionIdentifier identifier;
 
-    AbstractTransactionContext(TransactionIdentifier identifier) {
+    protected AbstractTransactionContext(TransactionIdentifier identifier) {
         this.identifier = identifier;
     }
 
     @Override
-    public List<Future<Object>> getRecordedOperationFutures() {
-        return recordedOperationFutures;
+    public final void copyRecordedOperationFutures(Collection<Future<Object>> target) {
+        target.addAll(recordedOperationFutures);
     }
-}
\ No newline at end of file
+
+    protected final TransactionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    protected final Collection<Future<Object>> copyRecordedOperationFutures() {
+        return ImmutableList.copyOf(recordedOperationFutures);
+    }
+
+    protected final int recordedOperationCount() {
+        return recordedOperationFutures.size();
+    }
+
+    protected final void recordOperationFuture(Future<Object> future) {
+        recordedOperationFutures.add(future);
+    }
+}
index c59a277..ed3aa85 100644 (file)
@@ -44,10 +44,12 @@ final class ChainedTransactionProxy extends TransactionProxy {
     }
 
     @Override
-    protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
+    public AbstractThreePhaseCommitCohort ready() {
+        final AbstractThreePhaseCommitCohort ret = super.ready();
+        readyFutures = ret.getCohortFutures();
         LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
-                readyFutures.size(), getTransactionChainId());
-        this.readyFutures = readyFutures;
+            readyFutures.size(), getTransactionChainId());
+        return ret;
     }
 
     /**
index 766cf1d..376b658 100644 (file)
@@ -7,14 +7,18 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import java.util.Collections;
+import java.util.List;
+import scala.concurrent.Future;
 
 /**
- * A {@link DOMStoreThreePhaseCommitCohort} instance given out for empty transactions.
+ * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
+ * instance given out for empty transactions.
  */
-final class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort {
     static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
 
     private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
@@ -43,4 +47,9 @@ final class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseComm
     public ListenableFuture<Void> commit() {
         return IMMEDIATE_VOID_SUCCESS;
     }
-}
\ No newline at end of file
+
+    @Override
+    List<Future<ActorSelection>> getCohortFutures() {
+        return Collections.emptyList();
+    }
+}
index 84f0776..672560b 100644 (file)
@@ -33,44 +33,44 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
 
     @Override
     public void closeTransaction() {
-        LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+        LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called", identifier);
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
         operationLimiter.release();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+        LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+        LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} writeData called path = {}", identifier, path);
+        LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     }
 
     @Override
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-        LOG.debug("Tx {} readData called path = {}", identifier, path);
+        LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
         operationLimiter.release();
         proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
     }
 
     @Override
     public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
-        LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+        LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
         operationLimiter.release();
         proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
index aeb4062..3a2bcf2 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -25,7 +24,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -34,7 +32,7 @@ import scala.runtime.AbstractFunction1;
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
-public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
@@ -209,7 +207,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> commit() {
-        OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+        OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
                 new TransactionRateLimitingCallback(actorContext);
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
@@ -322,7 +320,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         }, actorContext.getClientDispatcher());
     }
 
-    @VisibleForTesting
+    @Override
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
index 1b8e65e..a5a7494 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
+import java.util.Collection;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -34,5 +34,5 @@ interface TransactionContext {
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
 
-    List<Future<Object>> getRecordedOperationFutures();
+    void copyRecordedOperationFutures(Collection<Future<Object>> target);
 }
index 3a20963..c61682d 100644 (file)
@@ -86,7 +86,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public void closeTransaction() {
-        LOG.debug("Tx {} closeTransaction called", identifier);
+        LOG.debug("Tx {} closeTransaction called", getIdentifier());
 
         actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
     }
@@ -94,7 +94,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-                identifier, recordedOperationFutures.size());
+            getIdentifier(), recordedOperationCount());
 
         // Send the remaining batched modifications if any.
 
@@ -113,8 +113,8 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Future will fail. We need all prior operations and the ready operation to succeed
         // in order to attempt commit.
 
-        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
-        futureList.addAll(recordedOperationFutures);
+        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1);
+        copyRecordedOperationFutures(futureList);
         futureList.add(withLastReplyFuture);
 
         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
@@ -127,7 +127,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             @Override
             public ActorSelection checkedApply(Iterable<Object> notUsed) {
                 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
-                        identifier);
+                    getIdentifier());
 
                 // At this point all the Futures succeeded and we need to extract the cohort
                 // actor path from the ReadyTransactionReply. For the recorded operations, they
@@ -149,7 +149,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
                 } else {
                     // Throwing an exception here will fail the Future.
                     throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                            identifier, serializedReadyReply.getClass()));
+                        getIdentifier(), serializedReadyReply.getClass()));
                 }
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
@@ -161,7 +161,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
                     transactionChainId);
         }
 
@@ -176,7 +176,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     private void sendAndRecordBatchedModifications() {
         Future<Object> sentFuture = sendBatchedModifications();
         if(sentFuture != null) {
-            recordedOperationFutures.add(sentFuture);
+            recordOperationFuture(sentFuture);
         }
     }
 
@@ -188,14 +188,14 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         Future<Object> sent = null;
         if(batchedModifications != null) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+                LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
             }
 
             batchedModifications.setReady(ready);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
                     transactionChainId);
         }
 
@@ -204,89 +204,46 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+        LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
 
         batchModification(new DeleteModification(path));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+        LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
 
         batchModification(new MergeModification(path, data));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} writeData called path = {}", identifier, path);
+        LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
 
         batchModification(new WriteModification(path, data));
     }
 
     @Override
-    public void readData(
-            final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
+    public void readData(final YangInstanceIdentifier path,
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
 
-        LOG.debug("Tx {} readData called path = {}", identifier, path);
+        LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
 
-        // Send the remaining batched modifications if any.
+        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+        // public API contract.
 
         sendAndRecordBatchedModifications();
 
-        // If there were any previous recorded put/merge/delete operation reply Futures then we
-        // must wait for them to successfully complete. This is necessary to honor the read
-        // uncommitted semantics of the public API contract. If any one fails then fail the read.
-
-        if(recordedOperationFutures.isEmpty()) {
-            finishReadData(path, returnFuture);
-        } else {
-            LOG.debug("Tx {} readData: verifying {} previous recorded operations",
-                    identifier, recordedOperationFutures.size());
-
-            // Note: we make a copy of recordedOperationFutures to be on the safe side in case
-            // Futures#sequence accesses the passed List on a different thread, as
-            // recordedOperationFutures is not synchronized.
-
-            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                    Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getClientDispatcher());
-
-            OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<Object> notUsed)
-                        throws Throwable {
-                    if(failure != null) {
-                        LOG.debug("Tx {} readData: a recorded operation failed: {}",
-                                identifier, failure);
-                        returnFuture.setException(new ReadFailedException(
-                                "The read could not be performed because a previous put, merge,"
-                                + "or delete operation failed", failure));
-                    } else {
-                        finishReadData(path, returnFuture);
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
-        }
-
-    }
-
-    private void finishReadData(final YangInstanceIdentifier path,
-            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
-
-        LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
-
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+                    LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure);
                     returnFuture.setException(new ReadFailedException(
                             "Error reading data for path " + path, failure));
 
                 } else {
-                    LOG.debug("Tx {} read operation succeeded", identifier, failure);
+                    LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure);
 
                     if (readResponse instanceof ReadDataReply) {
                         ReadDataReply reply = (ReadDataReply) readResponse;
@@ -312,64 +269,22 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     @Override
     public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
 
-        LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+        LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
 
-        // Send the remaining batched modifications if any.
+        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+        // public API contract.
 
         sendAndRecordBatchedModifications();
 
-        // If there were any previous recorded put/merge/delete operation reply Futures then we
-        // must wait for them to successfully complete. This is necessary to honor the read
-        // uncommitted semantics of the public API contract. If any one fails then fail this
-        // request.
-
-        if(recordedOperationFutures.isEmpty()) {
-            finishDataExists(path, returnFuture);
-        } else {
-            LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
-                    identifier, recordedOperationFutures.size());
-
-            // Note: we make a copy of recordedOperationFutures to be on the safe side in case
-            // Futures#sequence accesses the passed List on a different thread, as
-            // recordedOperationFutures is not synchronized.
-
-            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                    Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getClientDispatcher());
-            OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<Object> notUsed)
-                        throws Throwable {
-                    if(failure != null) {
-                        LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
-                                identifier, failure);
-                        returnFuture.setException(new ReadFailedException(
-                                "The data exists could not be performed because a previous "
-                                + "put, merge, or delete operation failed", failure));
-                    } else {
-                        finishDataExists(path, returnFuture);
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
-        }
-    }
-
-    private void finishDataExists(final YangInstanceIdentifier path,
-            final SettableFuture<Boolean> returnFuture) {
-
-        LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
-
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+                    LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure);
                     returnFuture.setException(new ReadFailedException(
                             "Error checking data exists for path " + path, failure));
                 } else {
-                    LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+                    LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure);
 
                     if (response instanceof DataExistsReply) {
                         returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java
new file mode 100644 (file)
index 0000000..dc965ed
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Abstract superclass for transaction operations which should be executed
+ * on a {@link TransactionContext} at a later point in time.
+ */
+abstract class TransactionOperation {
+    /**
+     * Execute the delayed operation.
+     *
+     * @param transactionContext
+     */
+    protected abstract void invoke(TransactionContext transactionContext);
+}
index 7eabf9e..59c9298 100644 (file)
@@ -12,8 +12,6 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -21,11 +19,9 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,7 +30,6 @@ import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
@@ -42,7 +37,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -84,6 +78,12 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
+    private static enum TransactionState {
+        OPEN,
+        READY,
+        CLOSED,
+    }
+
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -102,72 +102,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
             FiniteDuration.create(1, TimeUnit.SECONDS);
 
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue phantomReferenceQueue =
-                                                                  new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionProxyCleanupPhantomReference,
-                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
-                                                                        new ConcurrentHashMap<>();
-
-    /**
-     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
-     * garbage collected. This is used for read-only transactions as they're not explicitly closed
-     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
-     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
-     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
-     * to the old generation space and thus should be cleaned up in a timely manner as the GC
-     * runs on the young generation (eden, swap1...) space much more frequently.
-     */
-    private static class TransactionProxyCleanupPhantomReference
-                                           extends FinalizablePhantomReference<TransactionProxy> {
-
-        private final List<ActorSelection> remoteTransactionActors;
-        private final AtomicBoolean remoteTransactionActorsMB;
-        private final ActorContext actorContext;
-        private final TransactionIdentifier identifier;
-
-        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
-            super(referent, phantomReferenceQueue);
-
-            // Note we need to cache the relevant fields from the TransactionProxy as we can't
-            // have a hard reference to the TransactionProxy instance itself.
-
-            remoteTransactionActors = referent.remoteTransactionActors;
-            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
-            actorContext = referent.actorContext;
-            identifier = referent.getIdentifier();
-        }
-
-        @Override
-        public void finalizeReferent() {
-            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
-                    remoteTransactionActors.size(), identifier);
-
-            phantomReferenceCache.remove(this);
-
-            // Access the memory barrier volatile to ensure all previous updates to the
-            // remoteTransactionActors list are visible to this thread.
-
-            if(remoteTransactionActorsMB.get()) {
-                for(ActorSelection actor : remoteTransactionActors) {
-                    LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
-                }
-            }
-        }
-    }
-
     /**
      * Stores the remote Tx actors for each requested data store path to be used by the
      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
@@ -175,8 +109,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
      * remoteTransactionActors list so they will be visible to the thread accessing the
      * PhantomReference.
      */
-    private List<ActorSelection> remoteTransactionActors;
-    private volatile AtomicBoolean remoteTransactionActorsMB;
+    List<ActorSelection> remoteTransactionActors;
+    volatile AtomicBoolean remoteTransactionActorsMB;
 
     /**
      * Stores the create transaction results per shard.
@@ -184,10 +118,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
 
     private final TransactionType transactionType;
-    private final ActorContext actorContext;
+    final ActorContext actorContext;
     private final String transactionChainId;
     private final SchemaContext schemaContext;
-    private boolean inReadyState;
+    private TransactionState state = TransactionState.OPEN;
 
     private volatile boolean initialized;
     private Semaphore operationLimiter;
@@ -224,8 +158,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if(transactionContext != null) {
-                recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+            if (transactionContext != null) {
+                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
             }
         }
 
@@ -293,7 +227,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(!inReadyState,
+        Preconditions.checkState(state == TransactionState.OPEN,
                 "Transaction is sealed - further modifications are not allowed");
     }
 
@@ -326,7 +260,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
-
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -381,26 +314,34 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         });
     }
 
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
+    private boolean seal(final TransactionState newState) {
+        if (state == TransactionState.OPEN) {
+            state = newState;
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-        checkModificationState();
+    @Override
+    public AbstractThreePhaseCommitCohort ready() {
+        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+                "Read-only transactions cannot be readied");
 
-        inReadyState = true;
+        final boolean success = seal(TransactionState.READY);
+        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
                     txFutureCallbackMap.size());
 
-        if(txFutureCallbackMap.size() == 0) {
-            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+        if (txFutureCallbackMap.isEmpty()) {
             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
         throttleOperation(txFutureCallbackMap.size());
 
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
@@ -425,22 +366,22 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        onTransactionReady(cohortFutures);
-
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
             getIdentifier().toString());
     }
 
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
     @Override
     public void close() {
+        if (!seal(TransactionState.CLOSED)) {
+            if (state == TransactionState.CLOSED) {
+                // Idempotent no-op as per AutoCloseable recommendation
+                return;
+            }
+
+            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
+                getIdentifier()));
+        }
+
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
@@ -500,13 +441,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return actorContext;
     }
 
-    /**
-     * Interfaces for transaction operations to be invoked later.
-     */
-    private static interface TransactionOperation {
-        void invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -731,9 +665,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                     remoteTransactionActors = Lists.newArrayList();
                     remoteTransactionActorsMB = new AtomicBoolean();
 
-                    TransactionProxyCleanupPhantomReference cleanup =
-                            new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
-                    phantomReferenceCache.put(cleanup, cleanup);
+                    TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
                 }
 
                 // Add the actor to the remoteTransactionActors list for access by the
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java
new file mode 100644 (file)
index 0000000..77834d9
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+final class TransactionProxyCleanupPhantomReference
+                                       extends FinalizablePhantomReference<TransactionProxy> {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionProxyCleanupPhantomReference.class);
+    /**
+     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+     * trickery to clean up its internal thread when the bundle is unloaded.
+     */
+    private static final FinalizableReferenceQueue phantomReferenceQueue =
+                                                                  new FinalizableReferenceQueue();
+
+    /**
+     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+     * and thus becomes eligible for garbage collection.
+     */
+    private static final Map<TransactionProxyCleanupPhantomReference,
+                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
+                                                                        new ConcurrentHashMap<>();
+
+    private final List<ActorSelection> remoteTransactionActors;
+    private final AtomicBoolean remoteTransactionActorsMB;
+    private final ActorContext actorContext;
+    private final TransactionIdentifier identifier;
+
+    private TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+        super(referent, phantomReferenceQueue);
+
+        // Note we need to cache the relevant fields from the TransactionProxy as we can't
+        // have a hard reference to the TransactionProxy instance itself.
+
+        remoteTransactionActors = referent.remoteTransactionActors;
+        remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+        actorContext = referent.actorContext;
+        identifier = referent.getIdentifier();
+    }
+
+    static void track(TransactionProxy referent) {
+        final TransactionProxyCleanupPhantomReference ret = new TransactionProxyCleanupPhantomReference(referent);
+        phantomReferenceCache.put(ret, ret);
+    }
+
+    @Override
+    public void finalizeReferent() {
+        LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+                remoteTransactionActors.size(), identifier);
+
+        phantomReferenceCache.remove(this);
+
+        // Access the memory barrier volatile to ensure all previous updates to the
+        // remoteTransactionActors list are visible to this thread.
+
+        if(remoteTransactionActorsMB.get()) {
+            for(ActorSelection actor : remoteTransactionActors) {
+                LOG.trace("Sending CloseTransaction to {}", actor);
+                actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
+            }
+        }
+    }
+}
\ No newline at end of file
index 3b4a190..e131354 100644 (file)
@@ -33,7 +33,7 @@ public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-                identifier, recordedOperationFutures.size());
+            getIdentifier(), recordedOperationCount());
 
         // Send the remaining batched modifications if any.
 
index ccfb329..c345033 100644 (file)
@@ -45,26 +45,26 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        recordedOperationFutures.add(executeOperationAsync(
+        recordOperationFuture(executeOperationAsync(
                 new DeleteData(path, getRemoteTransactionVersion())));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordedOperationFutures.add(executeOperationAsync(
+        recordOperationFuture(executeOperationAsync(
                 new MergeData(path, data, getRemoteTransactionVersion())));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordedOperationFutures.add(executeOperationAsync(
+        recordOperationFuture(executeOperationAsync(
                 new WriteData(path, data, getRemoteTransactionVersion())));
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-                identifier, recordedOperationFutures.size());
+            getIdentifier(), recordedOperationCount());
 
         // Send the ReadyTransaction message to the Tx actor.
 
index 265ec59..a247100 100644 (file)
@@ -10,7 +10,6 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
@@ -164,34 +163,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         testReadWithExceptionOnInitialCreateTransaction(new TestException());
     }
 
-    @Test(expected = TestException.class)
-    public void testReadWithPriorRecordingOperationFailure() throws Throwable {
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
-                when(mockActorContext).getDatastoreContext();
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        try {
-            propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
-        } finally {
-            verify(mockActorContext, times(0)).executeOperationAsync(
-                    eq(actorSelection(actorRef)), eqSerializedReadData());
-        }
-    }
-
     @Test
     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
@@ -301,35 +272,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
 
-    @Test(expected = TestException.class)
-    public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
-                when(mockActorContext).getDatastoreContext();
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        try {
-            propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
-        } finally {
-            verify(mockActorContext, times(0)).executeOperationAsync(
-                    eq(actorSelection(actorRef)), eqSerializedDataExists());
-        }
-    }
-
     @Test
     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
index d7d547d..1b9a37d 100644 (file)
@@ -8,8 +8,8 @@
                     </type>
                     <name>XSQL</name>
                     <data-broker>
-                       <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
-                       <name>binding-data-broker</name>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+                        <name>binding-data-broker</name>
                     </data-broker>
                     <async-data-broker>
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
index cc92b48..2cb2e7b 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.odl.xsql;
 
 import java.sql.Connection;
@@ -10,7 +17,9 @@ import java.util.Properties;
 import java.util.logging.Logger;
 
 import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCConnection;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class JDBCDriver implements Driver {
 
     public static JDBCDriver drv = new JDBCDriver();
index 2f28052..938d25e 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.InputStream;
@@ -21,7 +28,9 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Calendar;
 import java.util.Map;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class TablesResultSet implements ResultSet {
 
     private String tables[] = null;
index a5658cc..05f6522 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.File;
@@ -24,7 +31,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLAdapter extends Thread implements SchemaContextListener {
 
     private static final int SLEEP = 10000;
@@ -51,6 +60,7 @@ public class XSQLAdapter extends Thread implements SchemaContextListener {
     private String pinningFile;
     private ServerSocket serverSocket = null;
     private DOMDataBroker domDataBroker = null;
+    private static final String REFERENCE_FIELD_NAME = "reference";
 
     private XSQLAdapter() {
         XSQLAdapter.log("Starting Adapter");
@@ -152,28 +162,18 @@ public class XSQLAdapter extends Thread implements SchemaContextListener {
                 List<Object> result = new LinkedList<Object>();
                 YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier
                         .builder()
-                        .node(XSQLODLUtils.getPath(table.getODLNode()).get(0))
+                        .node(XSQLODLUtils.getPath(table.getFirstFromSchemaNodes()).get(0))
                         .toInstance();
                 DOMDataReadTransaction t = this.domDataBroker
                         .newReadOnlyTransaction();
                 Object node = t.read(type,
                         instanceIdentifier).get();
 
-                node = XSQLODLUtils.get(node, "reference");
+                node = XSQLODLUtils.get(node, REFERENCE_FIELD_NAME);
                 if (node == null) {
                     return result;
                 }
-
-                Map<?, ?> children = XSQLODLUtils.getChildren(node);
-                for (Object c : children.values()) {
-                    result.add(c);
-                    /* I don't remember why i did this... possibly to prevent different siblings queried together
-                    Map<?, ?> sons = XSQLODLUtils.getChildren(c);
-                    for (Object child : sons.values()) {
-                        result.add(child);
-                    }*/
-                }
-
+                result.add(node);
                 return result;
             } catch (Exception err) {
                 XSQLAdapter.log(err);
index a9c0f69..7615296 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.DataInputStream;
@@ -23,7 +30,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLBluePrint implements DatabaseMetaData, Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -203,15 +212,23 @@ public class XSQLBluePrint implements DatabaseMetaData, Serializable {
         return result;
     }
 
-    public void addToBluePrintCache(XSQLBluePrintNode blNode) {
-        this.tableNameToBluePrint.put(blNode.getBluePrintNodeName(), blNode);
-        Map<String, XSQLBluePrintNode> map = this.odlNameToBluePrint.get(blNode
-                .getODLTableName());
-        if (map == null) {
-            map = new HashMap<String, XSQLBluePrintNode>();
-            this.odlNameToBluePrint.put(blNode.getODLTableName(), map);
+    public XSQLBluePrintNode addToBluePrintCache(XSQLBluePrintNode blNode,XSQLBluePrintNode parent) {
+        XSQLBluePrintNode existingNode = this.tableNameToBluePrint.get(blNode.getBluePrintNodeName());
+        if(existingNode!=null){
+            existingNode.mergeAugmentation(blNode);
+            return existingNode;
+        }else{
+            this.tableNameToBluePrint.put(blNode.getBluePrintNodeName(), blNode);
+            Map<String, XSQLBluePrintNode> map = this.odlNameToBluePrint.get(blNode.getODLTableName());
+            if (map == null) {
+                map = new HashMap<String, XSQLBluePrintNode>();
+                this.odlNameToBluePrint.put(blNode.getODLTableName(), map);
+            }
+            map.put(blNode.getBluePrintNodeName(), blNode);
+            if(parent!=null)
+                parent.addChild(blNode);
+            return blNode;
         }
-        map.put(blNode.getBluePrintNodeName(), blNode);
     }
 
     public Class<?> getGenericType(ParameterizedType type) {
index 4a56545..d3cd91a 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.Serializable;
@@ -8,6 +15,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLBluePrintNode implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -24,12 +34,25 @@ public class XSQLBluePrintNode implements Serializable {
     private Set<XSQLColumn> columns = new HashSet<XSQLColumn>();
     private Map<String, XSQLColumn> origNameToColumn = new HashMap<String, XSQLColumn>();
 
-    private transient Object odlNode = null;
+    private transient Object[] odlSchemaNodes = null;
     private boolean module = false;
     private String bluePrintTableName = null;
     private String odlTableName = null;
     private String origName = null;
 
+    public void mergeAugmentation(XSQLBluePrintNode aug) {
+        this.relations.addAll(aug.relations);
+        this.inheritingNodes.addAll(aug.inheritingNodes);
+        this.children.addAll(aug.children);
+        this.columns.addAll(aug.columns);
+        this.origNameToColumn.putAll(aug.origNameToColumn);
+        if (aug.odlSchemaNodes != null) {
+            for (Object sn : aug.odlSchemaNodes) {
+                addToSchemaNodes(sn);
+            }
+        }
+    }
+
     public XSQLBluePrintNode(String name, String _origName, int _level) {
         this.level = _level;
         this.odlTableName = name;
@@ -46,12 +69,32 @@ public class XSQLBluePrintNode implements Serializable {
 
     public XSQLBluePrintNode(Object _odlNode, int _level,
             XSQLBluePrintNode _parent) {
-        this.odlNode = _odlNode;
+        addToSchemaNodes(_odlNode);
         this.level = _level;
         this.module = XSQLODLUtils.isModule(_odlNode);
         this.parent = _parent;
         this.bluePrintTableName = XSQLODLUtils.getBluePrintName(_odlNode);
-        this.odlTableName = XSQLODLUtils.getODLNodeName(this.odlNode);
+        this.odlTableName = XSQLODLUtils
+                .getODLNodeName(getFirstFromSchemaNodes());
+    }
+
+    private void addToSchemaNodes(Object schemaObject) {
+        if (this.odlSchemaNodes == null)
+            this.odlSchemaNodes = new Object[1];
+        else {
+            Object[] temp = new Object[this.odlSchemaNodes.length + 1];
+            System.arraycopy(this.odlSchemaNodes, 0, temp, 0,
+                    this.odlSchemaNodes.length);
+            this.odlSchemaNodes = temp;
+        }
+        this.odlSchemaNodes[this.odlSchemaNodes.length - 1] = schemaObject;
+    }
+
+    public Object getFirstFromSchemaNodes() {
+        if (this.odlSchemaNodes == null) {
+            return null;
+        }
+        return this.odlSchemaNodes[0];
     }
 
     public String getOrigName() {
@@ -72,16 +115,13 @@ public class XSQLBluePrintNode implements Serializable {
 
     public String getODLTableName() {
         if (this.odlTableName == null) {
-            this.odlTableName = XSQLODLUtils.getODLNodeName(this.odlNode);
+            this.odlTableName = XSQLODLUtils
+                    .getODLNodeName(getFirstFromSchemaNodes());
         }
         return this.odlTableName;
     }
 
-    public Object getODLNode() {
-        return this.odlNode;
-    }
-
-    public void AddChild(XSQLBluePrintNode ch) {
+    public void addChild(XSQLBluePrintNode ch) {
         this.children.add(ch);
     }
 
@@ -218,7 +258,7 @@ public class XSQLBluePrintNode implements Serializable {
         if (myInterfaceName != null) {
             return myInterfaceName;
         }
-        if (odlNode != null) {
+        if (this.odlSchemaNodes != null) {
             return getBluePrintNodeName();
         }
         if (odlTableName != null) {
@@ -238,15 +278,14 @@ public class XSQLBluePrintNode implements Serializable {
     @Override
     public boolean equals(Object obj) {
         XSQLBluePrintNode other = (XSQLBluePrintNode) obj;
-        if (odlNode != null) {
+        if (this.odlSchemaNodes != null) {
             return getBluePrintNodeName().equals(other.getBluePrintNodeName());
         } else if (this.odlTableName == null && other.odlTableName != null) {
             return false;
         }
         if (this.odlTableName != null && other.odlTableName == null) {
             return false;
-        }
-        else {
+        } else {
             return this.odlTableName.equals(other.odlTableName);
         }
     }
@@ -255,7 +294,7 @@ public class XSQLBluePrintNode implements Serializable {
     public int hashCode() {
         if (myInterfaceString != null) {
             return myInterfaceString.hashCode();
-        } else if (odlNode != null) {
+        } else if (this.odlSchemaNodes != null) {
             return bluePrintTableName.hashCode();
         }
         return 0;
index 17b8ae5..16a33b3 100644 (file)
@@ -1,15 +1,27 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
@@ -20,7 +32,9 @@ import org.opendaylight.yangtools.yang.model.util.Uint16;
 import org.opendaylight.yangtools.yang.model.util.Uint32;
 import org.opendaylight.yangtools.yang.model.util.Uint64;
 import org.opendaylight.yangtools.yang.model.util.Uint8;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLODLUtils {
 
     private static Map<Class<?>, Class<?>> types =
@@ -113,7 +127,7 @@ public class XSQLODLUtils {
 
     public static boolean createOpenDaylightCache(XSQLBluePrint bluePrint,Object module) {
         XSQLBluePrintNode node = new XSQLBluePrintNode(module, 0,null);
-        bluePrint.addToBluePrintCache(node);
+        bluePrint.addToBluePrintCache(node,null);
         collectODL(bluePrint, node, ((Module) module).getChildNodes(), 1);
         return true;
     }
@@ -124,20 +138,30 @@ public class XSQLODLUtils {
             return;
         }
         for (DataSchemaNode n : nodes) {
-            if (n instanceof DataNodeContainer /*|| n instanceof LeafListSchemaNode*/
-                || n instanceof ListSchemaNode) {
+            if (n instanceof DataNodeContainer) {
                 XSQLBluePrintNode bn = new XSQLBluePrintNode(n, level,parent);
-                bluePrint.addToBluePrintCache(bn);
-                parent.AddChild(bn);
-                if (n instanceof DataNodeContainer) {
+                bn = bluePrint.addToBluePrintCache(bn,parent);
+                if (n instanceof ListSchemaNode) {
                     level++;
-                    collectODL(bluePrint, bn,
-                        ((DataNodeContainer) n).getChildNodes(), level);
+                    collectODL(bluePrint, bn,((ListSchemaNode) n).getChildNodes(), level);
+                    Set<AugmentationSchema> s = ((ListSchemaNode)n).getAvailableAugmentations();
+                    if(s!=null){
+                        for(AugmentationSchema as:s){
+                            collectODL(bluePrint, bn,as.getChildNodes(), level);
+                        }
+                    }
                     level--;
-                } else if (n instanceof ListSchemaNode) {
+                }else{
                     level++;
-                    collectODL(bluePrint, bn,
-                        ((ListSchemaNode) n).getChildNodes(), level);
+                    collectODL(bluePrint, bn,((DataNodeContainer) n).getChildNodes(), level);
+                    if(n instanceof ContainerSchemaNode){
+                       Set<AugmentationSchema> s = ((ContainerSchemaNode)n).getAvailableAugmentations();
+                       if(s!=null){
+                           for(AugmentationSchema as:s){
+                               collectODL(bluePrint, bn,as.getChildNodes(), level);
+                           }
+                       }
+                    }
                     level--;
                 }
             } else {
@@ -189,7 +213,7 @@ public class XSQLODLUtils {
             Field f = findField(c, name);
             return f.get(o);
         } catch (Exception err) {
-            XSQLAdapter.log(err);
+            //XSQLAdapter.log(err);
         }
         return null;
     }
@@ -207,6 +231,21 @@ public class XSQLODLUtils {
         return (Map<?, ?>) get(o, "children");
     }
 
+    public static Collection<?> getChildrenCollection(Object o) {
+        Object value = get(o, "children");
+        if(value==null)
+            return Collections.emptyList();
+        if(value instanceof Map)
+            return ((Map<?,?>)value).values();
+        else
+        if(value instanceof Collection){
+            return (Collection<?>)value;
+        }else{
+            XSQLAdapter.log("Unknown Child Value Type="+value.getClass().getName());
+            return new ArrayList();
+        }
+    }
+
     public static Object getValue(Object o) {
         return get(o, "value");
     }
index 6689908..ea16e72 100644 (file)
@@ -1,10 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql.jdbc;
 
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.Serializable;
 import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
 import java.math.BigDecimal;
 import java.net.URL;
 import java.sql.Array;
@@ -24,6 +30,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -32,14 +39,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrint;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrintNode;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLColumn;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLCriteria;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLODLUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
 
-public class JDBCResultSet implements Serializable, ResultSet,
-        ResultSetMetaData {
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
+public class JDBCResultSet implements Serializable, ResultSet, ResultSetMetaData {
     private static final long serialVersionUID = -7450200738431047057L;
     private static final ClassLoader CLASS_LOADER = JDBCResultSet.class.getClassLoader();
     private static final Class<?>[] PROXY_INTERFACES = new Class[] { ResultSet.class };
@@ -57,27 +68,28 @@ public class JDBCResultSet implements Serializable, ResultSet,
     private Map<String, Map<XSQLColumn, List<XSQLCriteria>>> criteria = new ConcurrentHashMap<String, Map<XSQLColumn, List<XSQLCriteria>>>();
     private Exception err = null;
     private List<Record> EMPTY_RESULT = new LinkedList<Record>();
-    private transient Map<String,JDBCResultSet> subQueries = new HashMap<String,JDBCResultSet>();
+    private transient Map<String, JDBCResultSet> subQueries = new HashMap<String, JDBCResultSet>();
 
     public ResultSet getProxy() {
-         return (ResultSet) Proxy.newProxyInstance(CLASS_LOADER, PROXY_INTERFACES, new JDBCProxy(this));
+        return this;
+        //return (ResultSet) Proxy.newProxyInstance(CLASS_LOADER, PROXY_INTERFACES, new JDBCProxy(this));
     }
 
     public void setSQL(String _sql) {
         this.sql = _sql;
     }
 
-    public JDBCResultSet addSubQuery(String _sql,String logicalName) {
+    public JDBCResultSet addSubQuery(String _sql, String logicalName) {
         if (subQueries == null) {
-            subQueries = new HashMap<String,JDBCResultSet>();
+            subQueries = new HashMap<String, JDBCResultSet>();
         }
         JDBCResultSet rs = new JDBCResultSet(_sql);
-        this.subQueries.put(logicalName,rs);
+        this.subQueries.put(logicalName, rs);
         return rs;
     }
 
-    public Map<String,JDBCResultSet> getSubQueries() {
-        if (this.subQueries==null) {
+    public Map<String, JDBCResultSet> getSubQueries() {
+        if (this.subQueries == null) {
             this.subQueries = new HashMap<>();
         }
         return this.subQueries;
@@ -112,7 +124,8 @@ public class JDBCResultSet implements Serializable, ResultSet,
         }
     }
 
-    public int isObjectFitCriteria(Map<String, Object> objValues, String tableName) {
+    public int isObjectFitCriteria(Map<String, Object> objValues,
+            String tableName) {
         Map<XSQLColumn, List<XSQLCriteria>> tblCriteria = criteria
                 .get(tableName);
         if (tblCriteria == null) {
@@ -289,19 +302,41 @@ public class JDBCResultSet implements Serializable, ResultSet,
     }
 
     public static class Record {
+        // The map container the Attribute 2 the attribute value
         public Map<String, Object> data = new HashMap<>();
+        // The Element Object (Possibly some kind of NormalizedNode
         public Object element = null;
+        // Does this record fit the criteria
+        // In case of a list property, we first collect the list and only then
+        // we
+        // we decide which list item should be included or not.
+        public boolean fitCriteria = true;
 
         public Map<String, Object> getRecord() {
             return this.data;
         }
     }
 
-    private Map<String, Object> collectColumnValues(Object node, XSQLBluePrintNode bpn) {
-        Map<?, ?> subChildren = XSQLODLUtils.getChildren(node);
-        Map<String, Object> result = new HashMap<>();
-        for (Object stc : subChildren.values()) {
-            if (stc.getClass().getName().endsWith("ImmutableAugmentationNode")) {
+    public static class RecordsContainer {
+        public List<Record> records = new LinkedList<Record>();
+        public List<Record> fitRecords = new LinkedList<Record>();
+        public Object currentObject = null;
+    }
+
+    private void collectColumnValues(RecordsContainer rContainer,
+            XSQLBluePrintNode bpn) {
+        Collection<?> subChildren = XSQLODLUtils
+                .getChildrenCollection(rContainer.currentObject);
+        Record r = new Record();
+        r.element = rContainer.currentObject;
+        for (Object stc : subChildren) {
+            if (stc.getClass().getName()
+                    .endsWith("ImmutableUnkeyedListEntryNode")) {
+                r.fitCriteria = false;
+                rContainer.currentObject = stc;
+                collectColumnValues(rContainer, bpn);
+            } else if (stc.getClass().getName()
+                    .endsWith("ImmutableAugmentationNode")) {
                 Map<?, ?> values = XSQLODLUtils.getChildren(stc);
                 for (Object key : values.keySet()) {
                     Object val = values.get(key);
@@ -309,7 +344,7 @@ public class JDBCResultSet implements Serializable, ResultSet,
                         Object value = XSQLODLUtils.getValue(val);
                         String k = XSQLODLUtils.getNodeName(val);
                         if (value != null) {
-                            result.put(bpn.getBluePrintNodeName() + "." + k,
+                            r.data.put(bpn.getBluePrintNodeName() + "." + k,
                                     value.toString());
                         }
                     }
@@ -318,16 +353,17 @@ public class JDBCResultSet implements Serializable, ResultSet,
                 String k = XSQLODLUtils.getNodeName(stc);
                 Object value = XSQLODLUtils.getValue(stc);
                 if (value != null) {
-                    result.put(bpn.getBluePrintNodeName() + "." + k,
+                    r.data.put(bpn.getBluePrintNodeName() + "." + k,
                             value.toString());
                 }
             }
         }
-        return result;
+        if (r.fitCriteria) {
+            rContainer.records.add(r);
+        }
     }
 
-    private void addToData(Record rec, XSQLBluePrintNode bpn,
-            XSQLBluePrint bluePrint, Map<String, Object> fullRecord) {
+    private void addToData(Record rec, XSQLBluePrintNode bpn,XSQLBluePrint bluePrint, Map<String, Object> fullRecord) {
         XSQLBluePrintNode eNodes[] = bluePrint
                 .getBluePrintNodeByODLTableName(XSQLODLUtils
                         .getNodeIdentiofier(rec.element));
@@ -386,6 +422,11 @@ public class JDBCResultSet implements Serializable, ResultSet,
 
             String odlNodeName = XSQLODLUtils.getNodeIdentiofier(child);
             if (odlNodeName == null) {
+                if (child instanceof DataContainerNode) {
+                    List<Object> augChidlren = getChildren(child, tableName,
+                            bluePrint);
+                    result.addAll(augChidlren);
+                }
                 continue;
             }
 
@@ -407,7 +448,10 @@ public class JDBCResultSet implements Serializable, ResultSet,
                 continue;
             }
 
-            if (child.getClass().getName().endsWith("ImmutableContainerNode")) {
+            if (child.getClass().getName().endsWith("ImmutableUnkeyedListNode")) {
+                result.add(child);
+            } else if (child.getClass().getName()
+                    .endsWith("ImmutableContainerNode")) {
                 result.add(child);
             } else if (child.getClass().getName()
                     .endsWith("ImmutableAugmentationNode")) {
@@ -420,52 +464,76 @@ public class JDBCResultSet implements Serializable, ResultSet,
                 }
             } else if (child.getClass().getName().endsWith("ImmutableMapNode")) {
                 result.addAll(XSQLODLUtils.getMChildren(child));
+            } else {
+                XSQLAdapter.log("Missed Node Data OF Type="
+                        + child.getClass().getName());
             }
         }
         return result;
     }
 
-    public List<Record> addRecords(Object element, XSQLBluePrintNode node,boolean root, String tableName, XSQLBluePrint bluePrint) {
+    public List<Record> addRecords(Object element, XSQLBluePrintNode node,
+            boolean root, String tableName, XSQLBluePrint bluePrint) {
         List<Record> result = new LinkedList<Record>();
-        //In case this is a sibling to the requested table, the elenment type
-        //won't be in the path of the leaf node
-        if(node==null){
-            return result;
-        }
         String nodeID = XSQLODLUtils.getNodeIdentiofier(element);
         if (node.getODLTableName().equals(nodeID)) {
-            XSQLBluePrintNode bluePrintNode = bluePrint.getBluePrintNodeByODLTableName(nodeID)[0];
-            Record rec = new Record();
-            rec.element = element;
-            XSQLBluePrintNode bpn = this.tablesInQueryMap.get(bluePrintNode.getBluePrintNodeName());
-            if (this.criteria.containsKey(bluePrintNode.getBluePrintNodeName()) || bpn != null) {
-                Map<String, Object> allKeyValues = collectColumnValues(element, bpn);
-                if (!(isObjectFitCriteria(allKeyValues,
-                        bpn.getBluePrintNodeName()) == 1)) {
-                    return EMPTY_RESULT;
+            XSQLBluePrintNode bluePrintNode = bluePrint
+                    .getBluePrintNodeByODLTableName(nodeID)[0];
+            RecordsContainer rContainer = new RecordsContainer();
+            rContainer.currentObject = element;
+            XSQLBluePrintNode bpn = this.tablesInQueryMap.get(bluePrintNode
+                    .getBluePrintNodeName());
+            if (this.criteria.containsKey(bluePrintNode.getBluePrintNodeName())
+                    || bpn != null) {
+                collectColumnValues(rContainer, bpn);
+                for (Record r : rContainer.records) {
+                    if (!(isObjectFitCriteria(r.data,
+                            bpn.getBluePrintNodeName()) == 1)) {
+                        r.fitCriteria = false;
+                    }
+                    if (r.fitCriteria) {
+                        Record rec = new Record();
+                        rec.element = r.element;
+                        addToData(rec, bpn, bluePrint, r.data);
+                        rContainer.fitRecords.add(rec);
+                    }
                 }
-                addToData(rec, bpn, bluePrint, allKeyValues);
+                if (rContainer.fitRecords.isEmpty())
+                    return EMPTY_RESULT;
             }
-            if (root) {
-                addRecord(rec.data);
+            if (rContainer.records.isEmpty()) {
+                Record rec = new Record();
+                rec.element = rContainer.currentObject;
+                if (root) {
+                    addRecord(rec.data);
+                } else {
+                    result.add(rec);
+                }
             } else {
-                result.add(rec);
+                for (Record rec : rContainer.fitRecords) {
+                    if (root) {
+                        addRecord(rec.data);
+                    } else {
+                        result.add(rec);
+                    }
+                }
             }
             return result;
         }
 
         XSQLBluePrintNode parent = node.getParent();
-        List<Record> subRecords = addRecords(element, parent, false, tableName,bluePrint);
+        List<Record> subRecords = addRecords(element, parent, false, tableName,
+                bluePrint);
         for (Record subRec : subRecords) {
             List<Object> subO = getChildren(subRec.element, tableName,
                     bluePrint);
             if (subO != null) {
                 for (Object subData : subO) {
-                    Record rec = new Record();
-                    rec.element = subData;
-                    rec.data.putAll(subRec.data);
+                    RecordsContainer rContainer = new RecordsContainer();
+                    rContainer.currentObject = subData;
 
-                    String recID = XSQLODLUtils.getNodeIdentiofier(rec.element);
+                    String recID = XSQLODLUtils
+                            .getNodeIdentiofier(rContainer.currentObject);
                     XSQLBluePrintNode eNodes[] = bluePrint
                             .getBluePrintNodeByODLTableName(recID);
                     XSQLBluePrintNode bpn = null;
@@ -476,18 +544,24 @@ public class JDBCResultSet implements Serializable, ResultSet,
                             break;
                         }
                     }
-                    boolean isObjectInCriteria = true;
                     if (bpn != null) {
-                        Map<String, Object> allKeyValues = collectColumnValues(rec.element, bpn);
-                        if ((isObjectFitCriteria(allKeyValues,
-                                bpn.getBluePrintNodeName()) == 1)) {
-                            addToData(rec, bpn, bluePrint, allKeyValues);
-                        } else {
-                            isObjectInCriteria = false;
+                        collectColumnValues(rContainer, bpn);
+                        for (Record r : rContainer.records) {
+                            if ((isObjectFitCriteria(r.data,
+                                    bpn.getBluePrintNodeName()) == 1)) {
+                                Record rec = new Record();
+                                rec.data.putAll(subRec.data);
+                                rec.element = r.element;
+                                addToData(rec, bpn, bluePrint, r.data);
+                            } else {
+                                r.fitCriteria = false;
+                            }
                         }
                     }
-
-                    if (isObjectInCriteria) {
+                    if (rContainer.records.isEmpty()) {
+                        Record rec = new Record();
+                        rec.data.putAll(subRec.data);
+                        rec.element = rContainer.currentObject;
                         if (root) {
                             if (!rec.data.isEmpty()) {
                                 addRecord(rec.data);
@@ -495,11 +569,23 @@ public class JDBCResultSet implements Serializable, ResultSet,
                         } else {
                             result.add(rec);
                         }
+                    } else {
+                        for (Record r : rContainer.records) {
+                            r.data.putAll(subRec.data);
+                            if (r.fitCriteria) {
+                                if (root) {
+                                    if (!r.data.isEmpty()) {
+                                        addRecord(r.data);
+                                    }
+                                } else {
+                                    result.add(r);
+                                }
+                            }
+                        }
                     }
                 }
             }
         }
-
         return result;
     }
 
index 7b2733c..31941e4 100644 (file)
@@ -46,8 +46,7 @@ public class JDBCServer extends Thread {
         }
     }
 
-    public static void execute(JDBCResultSet rs, XSQLAdapter adapter)
-            throws SQLException {
+    public static void execute(JDBCResultSet rs, XSQLAdapter adapter)throws SQLException {
         if(rs.getSQL().toLowerCase().trim().equals("select 1")){
             rs.setFinished(true);
             return;
index cde0157..29a1945 100644 (file)
@@ -1,37 +1,47 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.xsql;
 
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQL;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQLBuilder;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Created by root on 6/26/14.
- */
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLProvider implements AutoCloseable {
 
     public static final InstanceIdentifier<XSQL> ID = InstanceIdentifier.builder(XSQL.class).build();
-    private static final Logger LOG = LoggerFactory.getLogger(XSQLProvider.class);
+    //public static final InstanceIdentifier<SalTest> ID2 = InstanceIdentifier.builder(SalTest.class).build();
 
     public void close() {
     }
 
-    public XSQL buildXSQL(DataProviderService dps) {
+    public XSQL buildXSQL(DataBroker dps) {
+            XSQLAdapter.log("Building XSL...");
             XSQLBuilder builder = new XSQLBuilder();
             builder.setPort("34343");
             XSQL xsql = builder.build();
             try {
                 if (dps != null) {
-                    final DataModificationTransaction t = dps.beginTransaction();
-                    t.removeOperationalData(ID);
-                    t.putOperationalData(ID,xsql);
-                    t.commit().get();
+                    XSQLAdapter.log("Starting TRansaction...");
+                    WriteTransaction t = dps.newReadWriteTransaction();
+                    t.delete(LogicalDatastoreType.OPERATIONAL, ID);
+                    t.put(LogicalDatastoreType.OPERATIONAL,ID,xsql);
+                    XSQLAdapter.log("Submitting...");
+                    t.submit();
                 }
             } catch (Exception e) {
-                LOG.warn("Failed to update XSQL port status, ", e);
+                XSQLAdapter.log(e);
             }
         return xsql;
     }
index c8a5a85..a669345 100644 (file)
@@ -1,9 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626;
 
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.xsql.XSQLProvider;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLModule extends org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.AbstractXSQLModule {
+    private static final long SLEEP_TIME_BEFORE_CREATING_TRANSACTION = 10000;
     public XSQLModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
@@ -22,9 +32,14 @@ public class XSQLModule extends org.opendaylight.yang.gen.v1.http.netconfcentral
         XSQLAdapter xsqlAdapter = XSQLAdapter.getInstance();
         getSchemaServiceDependency().registerSchemaContextListener(xsqlAdapter);
         xsqlAdapter.setDataBroker(getAsyncDataBrokerDependency());
-        XSQLProvider p = new XSQLProvider();
-        //p.buildXSQL(getDataBrokerDependency());
+        final XSQLProvider p = new XSQLProvider();
+        Runnable runthis = new Runnable() {
+            @Override
+            public void run() {
+                try{Thread.sleep(SLEEP_TIME_BEFORE_CREATING_TRANSACTION);}catch(Exception err){}
+                p.buildXSQL(getDataBrokerDependency());
+            }
+        };
         return p;
     }
-
 }
index d7d547d..1b9a37d 100644 (file)
@@ -8,8 +8,8 @@
                     </type>
                     <name>XSQL</name>
                     <data-broker>
-                       <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
-                       <name>binding-data-broker</name>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+                        <name>binding-data-broker</name>
                     </data-broker>
                     <async-data-broker>
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
index 0437e10..f0f5269 100644 (file)
@@ -37,14 +37,14 @@ module XSQL{
         case XSQL {
             when "/config:modules/config:module/config:type = 'XSQL'";
 
-                       container data-broker {
+            container data-broker {
                 uses config:service-ref {
                     refine type {
                         mandatory false;
-                        config:required-identity mdsal:binding-data-broker;
+                        config:required-identity mdsal:binding-async-data-broker;
                     }
                 }
-            }         
+            }
 
             container async-data-broker {
                 uses config:service-ref {
index 8a6b184..e3f5fbb 100644 (file)
@@ -2,18 +2,29 @@ package org.opendaylight.xsql.test;
 
 import java.io.InputStream;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrint;
 import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCResultSet;
 import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCServer;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 public class XSQLTest {
-
-    XSQLBluePrint bluePrint = null;
+    private static final String DATASTORE_TEST_YANG = "/sal-persisted-dom-test.yang";
+    private XSQLBluePrint bluePrint = null;
+    //private static SchemaContext schemaContext = null;
+    @BeforeClass
+    public static void loadSchemaContext(){
+        //schemaContext = createTestContext();
+    }
 
     @Before
     public void before() {
@@ -167,4 +178,18 @@ public class XSQLTest {
         System.out.print("*** XSQL Tests -");
         System.out.println(str);
     }
+
+    public static final InputStream getDatastoreTestInputStream() {
+        return getInputStream(DATASTORE_TEST_YANG);
+    }
+
+    private static InputStream getInputStream(final String resourceName) {
+        return XSQLTest.class.getResourceAsStream(DATASTORE_TEST_YANG);
+    }
+
+    public static SchemaContext createTestContext() {
+        YangParserImpl parser = new YangParserImpl();
+        Set<Module> modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream()));
+        return parser.resolveSchemaContext(modules);
+    }
 }
index b6b34ac..152f787 100644 (file)
Binary files a/opendaylight/md-sal/sal-dom-xsql/src/test/resources/BluePrintCache.dat and b/opendaylight/md-sal/sal-dom-xsql/src/test/resources/BluePrintCache.dat differ
index 999fb91..feb1b66 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invo
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChan
 
     @Override
     protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
-        final DataTreeCandidate candidate = new SimpleDataTreeCandidate(path, node);
+        final DataTreeCandidate candidate = new DefaultDataTreeCandidate(path, node);
 
         for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
             LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations);
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java
deleted file mode 100644 (file)
index 701841c..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-
-final class SimpleDataTreeCandidate implements DataTreeCandidate {
-    private final YangInstanceIdentifier rootPath;
-    private final DataTreeCandidateNode rootNode;
-
-    SimpleDataTreeCandidate(final YangInstanceIdentifier rootPath, final DataTreeCandidateNode rootNode) {
-        this.rootPath = Preconditions.checkNotNull(rootPath);
-        this.rootNode = Preconditions.checkNotNull(rootNode);
-    }
-
-    @Override
-    public DataTreeCandidateNode getRootNode() {
-        return rootNode;
-    }
-
-    @Override
-    public YangInstanceIdentifier getRootPath() {
-        return rootPath;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString();
-    }
-}
\ No newline at end of file
index d777942..0e660ea 100644 (file)
             <param-name>javax.ws.rs.Application</param-name>
             <param-value>org.opendaylight.controller.sal.rest.doc.jaxrs.ApiDocApplication</param-value>
         </init-param>
+        <!-- AAA Auth Filter -->
+        <init-param>
+            <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+            <param-value> org.opendaylight.aaa.sts.TokenAuthFilter</param-value>
+        </init-param>
         <load-on-startup>1</load-on-startup>
     </servlet>
 
 
     <security-constraint>
       <web-resource-collection>
-        <web-resource-name>free access</web-resource-name>
-        <url-pattern>/explorer/css/*</url-pattern>
-        <url-pattern>/explorer/images/*</url-pattern>
-        <url-pattern>/explorer/lib/*</url-pattern>
-        <url-pattern>/explorer/*</url-pattern>
+        <web-resource-name>API Doc</web-resource-name>
+        <url-pattern>/*</url-pattern>
       </web-resource-collection>
     </security-constraint>