Merge "BUG 2799: SPI for EventSources"
authorTony Tkacik <ttkacik@cisco.com>
Wed, 8 Apr 2015 19:04:09 +0000 (19:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 8 Apr 2015 19:04:10 +0000 (19:04 +0000)
91 files changed:
features/mdsal/src/main/resources/features.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeListener.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeChangeService.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeModification.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataBrokerAdapter.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeListenerAdapter.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeServiceAdapter.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataTreeChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingStructuralType.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataObjectModification.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataTreeModification.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/DataTreeChangeListenerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeIdentifier.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/ForwardingDOMStoreThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-xsql-config/src/main/resources/04-xsql.xml
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/odl/xsql/JDBCDriver.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/TablesResultSet.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLAdapter.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLBluePrint.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLBluePrintNode.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/XSQLODLUtils.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/jdbc/JDBCResultSet.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/jdbc/JDBCServer.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/xsql/XSQLProvider.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/yang/gen/v1/http/netconfcentral/org/ns/xsql/rev140626/XSQLModule.java
opendaylight/md-sal/sal-dom-xsql/src/main/resources/04-xsql.xml
opendaylight/md-sal/sal-dom-xsql/src/main/yang/XSQL.yang
opendaylight/md-sal/sal-dom-xsql/src/test/java/org/opendaylight/xsql/test/XSQLTest.java
opendaylight/md-sal/sal-dom-xsql/src/test/resources/BluePrintCache.dat
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java [deleted file]
opendaylight/md-sal/sal-rest-docgen/src/main/resources/WEB-INF/web.xml
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java
opendaylight/netconf/netconf-cli/pom.xml
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Cli.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionManager.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/CommandDispatcher.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/input/Input.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Disconnect.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Help.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/output/Output.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/remote/RemoteCommand.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/AbstractReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/Reader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/ConfigReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/EditContentReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/FilterReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/AnyXmlReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/BasicDataHolderReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/ChoiceReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/ContainerReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/GenericListReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/GenericReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/ListEntryReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/Writer.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/custom/DataWriter.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/AbstractWriter.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/CompositeNodeWriter.java [deleted file]
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/NormalizedNodeWriter.java
opendaylight/netconf/netconf-cli/src/test/java/org/opendaylight/controller/netconf/cli/NetconfCliTest.java
opendaylight/netconf/pom.xml

index 021ed1d..f8bbfec 100644 (file)
@@ -11,7 +11,7 @@
     <repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
-        <feature version='${project.version}'>odl-mdsal-clustering</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-xsql</feature>
         <feature version='${project.version}'>odl-toaster</feature>
     </feature>
@@ -41,7 +41,7 @@
         <configfile finalname='${config.configfile.directory}/${config.netconf.mdsal.configfile}'>mvn:org.opendaylight.controller/netconf-mdsal-config/${netconf.version}/xml/config</configfile>
     </feature>
 
-    <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
+    <feature name='odl-mdsal-broker-local' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <feature version='${yangtools.version}'>odl-yangtools-common</feature>
         <feature version='${yangtools.version}'>odl-yangtools-binding</feature>
         <feature version='${yangtools.version}'>odl-yangtools-models</feature>
@@ -76,7 +76,7 @@
         <configfile finalname="${config.configfile.directory}/${config.xsql.configfile}">mvn:org.opendaylight.controller/sal-dom-xsql-config/${project.version}/xml/config</configfile>
     </feature>
     <feature name ='odl-mdsal-clustering-commons' version='${project.version}'>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${akka.version}'>odl-akka-system</feature>
         <feature version='${akka.version}'>odl-akka-persistence</feature>
         <bundle>mvn:org.opendaylight.controller/sal-clustering-commons/${project.version}</bundle>
         <bundle>mvn:com.codahale.metrics/metrics-core/3.0.1</bundle>
     </feature>
     <feature name ='odl-mdsal-distributed-datastore' version='${project.version}'>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
         <feature version='${akka.version}'>odl-akka-clustering</feature>
         <bundle>mvn:org.opendaylight.controller/sal-distributed-datastore/${project.version}</bundle>
     </feature>
     <feature name ='odl-mdsal-remoterpc-connector' version='${project.version}'>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <feature version='${project.version}'>odl-mdsal-clustering-commons</feature>
         <feature version='${akka.version}'>odl-akka-clustering</feature>
         <feature version='0.7'>odl-akka-leveldb</feature>
         <bundle>mvn:org.opendaylight.controller/sal-remoterpc-connector/${project.version}</bundle>
     </feature>
-    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+    <feature name ='odl-mdsal-broker' version='${project.version}'>
         <feature version='${project.version}'>odl-mdsal-remoterpc-connector</feature>
         <feature version='${project.version}'>odl-mdsal-distributed-datastore</feature>
         <configfile finalname="${config.configfile.directory}/${config.clustering.configfile}">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/config</configfile>
         <configfile finalname="configuration/initial/modules.conf">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/xml/moduleconf</configfile>
         <configfile finalname="etc/org.opendaylight.controller.cluster.datastore.cfg">mvn:org.opendaylight.controller/sal-clustering-config/${project.version}/cfg/datastore</configfile>
     </feature>
-
+    <feature name ='odl-mdsal-clustering' version='${project.version}'>
+      <feature version='${project.version}'>odl-mdsal-broker</feature>
+    </feature>
     <feature name='odl-clustering-test-app' version='${project.version}'>
         <feature version='${yangtools.version}'>odl-yangtools-models</feature>
-        <feature version='${project.version}'>odl-mdsal-broker</feature>
+        <feature version='${project.version}'>odl-mdsal-broker-local</feature>
         <bundle>mvn:org.opendaylight.controller.samples/clustering-it-model/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller.samples/clustering-it-provider/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/20-clustering-test-app.xml">mvn:org.opendaylight.controller.samples/clustering-it-config/${project.version}/xml/config</configfile>
index a13b6ff..1c30fe2 100644 (file)
@@ -43,7 +43,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -107,7 +107,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
      */
-    private RaftActorBehavior currentBehavior;
+    private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
 
     /**
      * This context should NOT be passed directly to any other actor it is
@@ -119,11 +119,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
 
-    /**
-     * The in-memory journal
-     */
-    private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -139,9 +134,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
-            -1, -1, replicatedLog, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
-            LOG);
+            -1, -1, peerAddresses,
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG);
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
     }
 
     private void initRecoveryTimer() {
@@ -184,7 +180,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             } else if (message instanceof ApplyJournalEntries) {
                 onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
             } else if (message instanceof DeleteEntries) {
-                replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
             } else if (message instanceof UpdateElectionTerm) {
                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
                         ((UpdateElectionTerm) message).getVotedFor());
@@ -219,9 +215,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         // Create a replicated log with the snapshot information
         // The replicated log can be used later on to retrieve this snapshot
         // when we need to install it on a peer
-        replicatedLog = new ReplicatedLogImpl(snapshot);
 
-        context.setReplicatedLog(replicatedLog);
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+                currentBehavior));
         context.setLastApplied(snapshot.getLastAppliedIndex());
         context.setCommitIndex(snapshot.getLastAppliedIndex());
 
@@ -232,8 +228,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         timer.stop();
         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
+                replicatedLog().size(), persistenceId(), timer.toString(),
+                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
@@ -241,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
         }
 
-        replicatedLog.append(logEntry);
+        replicatedLog().append(logEntry);
     }
 
     private void onRecoveredApplyLogEntries(long toIndex) {
@@ -251,7 +247,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
 
         for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog.get(i));
+            batchRecoveredLogEntry(replicatedLog().get(i));
         }
 
         context.setLastApplied(toIndex);
@@ -297,8 +293,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 "Persistence Id =  " + persistenceId() +
                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
                 "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm(), replicatedLog.size());
+            replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+            replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         initializeBehavior();
     }
@@ -308,9 +304,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        reusableBehaviorStateHolder.init(currentBehavior);
-        currentBehavior = newBehavior;
-        handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+        reusableBehaviorStateHolder.init(getCurrentBehavior());
+        setCurrentBehavior(newBehavior);
+        handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
     @Override public void handleCommand(Object message) {
@@ -353,8 +349,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             applySnapshot(snapshot.getState());
 
             //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            replicatedLog = new ReplicatedLogImpl(snapshot);
-            context.setReplicatedLog(replicatedLog);
+            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
+                    currentBehavior));
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
         } else if (message instanceof FindLeader) {
@@ -391,11 +387,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message.equals(COMMIT_SNAPSHOT)) {
             commitSnapshot(-1);
         } else {
-            reusableBehaviorStateHolder.init(currentBehavior);
+            reusableBehaviorStateHolder.init(getCurrentBehavior());
 
-            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+            setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
 
-            handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
+            handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
         }
     }
 
@@ -405,17 +401,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
-                .inMemoryJournalDataSize(replicatedLog.dataSize())
-                .inMemoryJournalLogSize(replicatedLog.size())
+                .inMemoryJournalDataSize(replicatedLog().dataSize())
+                .inMemoryJournalLogSize(replicatedLog().size())
                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
-                .lastIndex(replicatedLog.lastIndex())
-                .lastTerm(replicatedLog.lastTerm())
+                .lastIndex(replicatedLog().lastIndex())
+                .lastTerm(replicatedLog().lastTerm())
                 .leader(getLeaderId())
                 .raftState(currentBehavior.state().toString())
                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
-                .snapshotIndex(replicatedLog.getSnapshotIndex())
-                .snapshotTerm(replicatedLog.getSnapshotTerm())
+                .snapshotIndex(replicatedLog().getSnapshotIndex())
+                .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
                 .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
 
@@ -425,8 +421,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             builder.lastLogTerm(lastLogEntry.getTerm());
         }
 
-        if(currentBehavior instanceof AbstractLeader) {
-            AbstractLeader leader = (AbstractLeader)currentBehavior;
+        if(getCurrentBehavior() instanceof AbstractLeader) {
+            AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
             Collection<String> followerIds = leader.getFollowerIds();
             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
             for(String id: followerIds) {
@@ -490,39 +486,49 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog
-                .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
-                        if(!hasFollowers()){
-                            // Increment the Commit Index and the Last Applied values
-                            raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                            raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
+            @Override
+            public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
+                if(!hasFollowers()){
+                    // Increment the Commit Index and the Last Applied values
+                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
+                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
 
-                            // Apply the state immediately
-                            applyState(clientActor, identifier, data);
+                    // Apply the state immediately
+                    applyState(clientActor, identifier, data);
 
-                            // Send a ApplyJournalEntries message so that we write the fact that we applied
-                            // the state to durable storage
-                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                    // Send a ApplyJournalEntries message so that we write the fact that we applied
+                    // the state to durable storage
+                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+                    context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
 
-                        } else if (clientActor != null) {
-                            // Send message for replication
-                            currentBehavior.handleMessage(getSelf(),
-                                    new Replicate(clientActor, identifier,
-                                            replicatedLogEntry)
-                            );
-                        }
+                } else if (clientActor != null) {
+                    // Send message for replication
+                    currentBehavior.handleMessage(getSelf(),
+                            new Replicate(clientActor, identifier, replicatedLogEntry));
+                }
+            }
+        });
+    }
 
-                    }
-                });    }
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
 
     protected String getId() {
         return context.getId();
     }
 
+    @VisibleForTesting
+    void setCurrentBehavior(RaftActorBehavior behavior) {
+        currentBehavior.setDelegate(behavior);
+    }
+
+    protected RaftActorBehavior getCurrentBehavior() {
+        return currentBehavior.getDelegate();
+    }
+
     /**
      * Derived actors can call the isLeader method to check if the current
      * RaftActor is the Leader or not
@@ -563,7 +569,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog.last();
+        return replicatedLog().last();
     }
 
     protected Long getCurrentTerm(){
@@ -750,125 +756,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
-    }
-
-    protected long getTotalMemory() {
-        return Runtime.getRuntime().totalMemory();
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
     }
 
     protected boolean hasFollowers(){
-        return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
-    }
-
-    private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-        private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0L;
-
-
-        public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries());
-        }
-
-        public ReplicatedLogImpl() {
-            super();
-        }
-
-        @Override public void removeFromAndPersist(long logEntryIndex) {
-            int adjustedIndex = adjustedIndex(logEntryIndex);
-
-            if (adjustedIndex < 0) {
-                return;
-            }
-
-            // FIXME: Maybe this should be done after the command is saved
-            journal.subList(adjustedIndex , journal.size()).clear();
-
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
-
-                @Override
-                public void apply(DeleteEntries param)
-                        throws Exception {
-                    //FIXME : Doing nothing for now
-                    dataSize = 0;
-                    for (ReplicatedLogEntry entry : journal) {
-                        dataSize += entry.size();
-                    }
-                }
-            });
-        }
-
-        @Override public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry) {
-            appendAndPersist(replicatedLogEntry, null);
-        }
-
-        public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
-            }
-
-            // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-            journal.add(replicatedLogEntry);
-
-            // When persisting events with persist it is guaranteed that the
-            // persistent actor will not receive further commands between the
-            // persist call and the execution(s) of the associated event
-            // handler. This also holds for multiple persist calls in context
-            // of a single command.
-            persistence().persist(replicatedLogEntry,
-                new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry evt) throws Exception {
-                        int logEntrySize = replicatedLogEntry.size();
-
-                        dataSize += logEntrySize;
-                        long dataSizeForCheck = dataSize;
-
-                        dataSizeSinceLastSnapshot += logEntrySize;
-
-                        if (!hasFollowers()) {
-                            // When we do not have followers we do not maintain an in-memory log
-                            // due to this the journalSize will never become anything close to the
-                            // snapshot batch count. In fact will mostly be 1.
-                            // Similarly since the journal's dataSize depends on the entries in the
-                            // journal the journal's dataSize will never reach a value close to the
-                            // memory threshold.
-                            // By maintaining the dataSize outside the journal we are tracking essentially
-                            // what we have written to the disk however since we no longer are in
-                            // need of doing a snapshot just for the sake of freeing up memory we adjust
-                            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
-                            // as if we were maintaining a real snapshot
-                            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
-                        }
-                        long journalSize = replicatedLogEntry.getIndex() + 1;
-                        long dataThreshold = getTotalMemory() *
-                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
-                                || dataSizeForCheck > dataThreshold)) {
-
-                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
-                                    currentBehavior.getReplicatedToAllIndex());
-
-                            if(started){
-                                dataSizeSinceLastSnapshot = 0;
-                            }
-
-                        }
-
-                        if (callback != null){
-                            callback.apply(replicatedLogEntry);
-                        }
-                    }
-                }
-            );
-        }
-
+        return getRaftActorContext().hasFollowers();
     }
 
     static class DeleteEntries implements Serializable {
@@ -911,15 +803,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    @VisibleForTesting
-    void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
-        currentBehavior = behavior;
-    }
-
-    protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior;
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
index 2e7eb5e..9f4b7cb 100644 (file)
@@ -12,6 +12,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import java.util.Map;
 import org.slf4j.Logger;
 
@@ -168,4 +170,11 @@ public interface RaftActorContext {
 
     SnapshotManager getSnapshotManager();
 
+    boolean hasFollowers();
+
+    long getTotalMemory();
+
+    @VisibleForTesting
+    void setTotalMemoryRetriever(Supplier<Long> retriever);
+
 }
index eb059d6..684845c 100644 (file)
@@ -14,6 +14,8 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import java.util.Map;
 import org.slf4j.Logger;
 
@@ -39,25 +41,22 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private ConfigParams configParams;
 
-    private boolean snapshotCaptureInitiated;
+    @VisibleForTesting
+    private Supplier<Long> totalMemoryRetriever;
 
     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
     // be passed to it in the constructor
     private SnapshotManager snapshotManager;
 
-    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
-        String id,
-        ElectionTerm termInformation, long commitIndex,
-        long lastApplied, ReplicatedLog replicatedLog,
-        Map<String, String> peerAddresses, ConfigParams configParams,
-        Logger logger) {
+    public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+            ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
+            ConfigParams configParams, Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
         this.termInformation = termInformation;
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.replicatedLog = replicatedLog;
         this.peerAddresses = peerAddresses;
         this.configParams = configParams;
         this.LOG = logger;
@@ -161,10 +160,26 @@ public class RaftActorContextImpl implements RaftActorContext {
         peerAddresses.put(peerId, peerAddress);
     }
 
+    @Override
     public SnapshotManager getSnapshotManager() {
         if(snapshotManager == null){
             snapshotManager = new SnapshotManager(this, LOG);
         }
         return snapshotManager;
     }
+
+    @Override
+    public long getTotalMemory() {
+        return totalMemoryRetriever != null ? totalMemoryRetriever.get() : Runtime.getRuntime().totalMemory();
+    }
+
+    @Override
+    public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+        totalMemoryRetriever = retriever;
+    }
+
+    @Override
+    public boolean hasFollowers() {
+        return getPeerAddresses().keySet().size() > 0;
+    }
 }
index 82d0839..3e4d727 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import akka.japi.Procedure;
 import java.util.List;
 
 /**
@@ -85,6 +86,8 @@ public interface ReplicatedLog {
      */
     void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry);
 
+    void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback);
+
     /**
      *
      * @param index the index of the log entry
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
new file mode 100644 (file)
index 0000000..fdb6305
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.japi.Procedure;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
+/**
+ * Implementation of ReplicatedLog used by the RaftActor.
+ */
+class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+    private static final int DATA_SIZE_DIVIDER = 5;
+
+    private long dataSizeSinceLastSnapshot = 0L;
+    private final RaftActorContext context;
+    private final DataPersistenceProvider persistence;
+    private final RaftActorBehavior currentBehavior;
+
+    private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
+        @Override
+        public void apply(DeleteEntries param) {
+            dataSize = 0;
+            for (ReplicatedLogEntry entry : journal) {
+                dataSize += entry.size();
+            }
+        }
+    };
+
+    static ReplicatedLog newInstance(Snapshot snapshot, RaftActorContext context,
+            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        return new ReplicatedLogImpl(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+                snapshot.getUnAppliedEntries(), context, persistence, currentBehavior);
+    }
+
+    static ReplicatedLog newInstance(RaftActorContext context,
+            DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        return new ReplicatedLogImpl(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), context,
+                persistence, currentBehavior);
+    }
+
+    private ReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries,
+            RaftActorContext context, DataPersistenceProvider persistence, RaftActorBehavior currentBehavior) {
+        super(snapshotIndex, snapshotTerm, unAppliedEntries);
+        this.context = context;
+        this.persistence = persistence;
+        this.currentBehavior = currentBehavior;
+    }
+
+    @Override
+    public void removeFromAndPersist(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+
+        if (adjustedIndex < 0) {
+            return;
+        }
+
+        // FIXME: Maybe this should be done after the command is saved
+        journal.subList(adjustedIndex , journal.size()).clear();
+
+        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+    }
+
+    @Override
+    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry) {
+        appendAndPersist(replicatedLogEntry, null);
+    }
+
+    @Override
+    public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+            final Procedure<ReplicatedLogEntry> callback)  {
+
+        if(context.getLogger().isDebugEnabled()) {
+            context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
+        }
+
+        // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+        journal.add(replicatedLogEntry);
+
+        // When persisting events with persist it is guaranteed that the
+        // persistent actor will not receive further commands between the
+        // persist call and the execution(s) of the associated event
+        // handler. This also holds for multiple persist calls in context
+        // of a single command.
+        persistence.persist(replicatedLogEntry,
+            new Procedure<ReplicatedLogEntry>() {
+                @Override
+                public void apply(ReplicatedLogEntry evt) throws Exception {
+                    int logEntrySize = replicatedLogEntry.size();
+
+                    dataSize += logEntrySize;
+                    long dataSizeForCheck = dataSize;
+
+                    dataSizeSinceLastSnapshot += logEntrySize;
+
+                    if (!context.hasFollowers()) {
+                        // When we do not have followers we do not maintain an in-memory log
+                        // due to this the journalSize will never become anything close to the
+                        // snapshot batch count. In fact will mostly be 1.
+                        // Similarly since the journal's dataSize depends on the entries in the
+                        // journal the journal's dataSize will never reach a value close to the
+                        // memory threshold.
+                        // By maintaining the dataSize outside the journal we are tracking essentially
+                        // what we have written to the disk however since we no longer are in
+                        // need of doing a snapshot just for the sake of freeing up memory we adjust
+                        // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
+                        // as if we were maintaining a real snapshot
+                        dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
+                    }
+                    long journalSize = replicatedLogEntry.getIndex() + 1;
+                    long dataThreshold = context.getTotalMemory() *
+                            context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
+                    if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                            || dataSizeForCheck > dataThreshold)) {
+
+                        boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                currentBehavior.getReplicatedToAllIndex());
+
+                        if(started){
+                            dataSizeSinceLastSnapshot = 0;
+                        }
+                    }
+
+                    if (callback != null){
+                        callback.apply(replicatedLogEntry);
+                    }
+                }
+            }
+        );
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/DelegatingRaftActorBehavior.java
new file mode 100644 (file)
index 0000000..776dae7
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+/**
+ * A RaftActorBehavior implementation that delegates to another implementation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DelegatingRaftActorBehavior implements RaftActorBehavior {
+    private RaftActorBehavior delegate;
+
+    public RaftActorBehavior getDelegate() {
+        return delegate;
+    }
+
+    public void setDelegate(RaftActorBehavior delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+        return delegate.handleMessage(sender, message);
+    }
+
+    @Override
+    public RaftState state() {
+        return delegate.state();
+    }
+
+    @Override
+    public String getLeaderId() {
+        return delegate.getLeaderId();
+    }
+
+    @Override
+    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+        delegate.setReplicatedToAllIndex(replicatedToAllIndex);
+    }
+
+    @Override
+    public long getReplicatedToAllIndex() {
+        return delegate.getReplicatedToAllIndex();
+    }
+}
index dfaa8d5..b910313 100644 (file)
@@ -18,6 +18,7 @@ import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
@@ -52,7 +53,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         private final TestActorRef<MessageCollectorActor> collectorActor;
         private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
         private volatile byte[] snapshot;
-        private volatile long mockTotalMemory;
 
         private TestRaftActor(String id, Map<String, String> peerAddresses, ConfigParams config,
                 TestActorRef<MessageCollectorActor> collectorActor) {
@@ -74,13 +74,18 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             dropMessages.remove(msgClass);
         }
 
-        void setMockTotalMemory(long mockTotalMemory) {
-            this.mockTotalMemory = mockTotalMemory;
-        }
+        void setMockTotalMemory(final long mockTotalMemory) {
+            if(mockTotalMemory > 0) {
+                getRaftActorContext().setTotalMemoryRetriever(new Supplier<Long>() {
+                    @Override
+                    public Long get() {
+                        return mockTotalMemory;
+                    }
 
-        @Override
-        protected long getTotalMemory() {
-            return mockTotalMemory > 0 ? mockTotalMemory : super.getTotalMemory();
+                });
+            } else {
+                getRaftActorContext().setTotalMemoryRetriever(null);
+            }
         }
 
         @Override
index 885c3ab..8fdb7ea 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -220,5 +221,9 @@ public class AbstractReplicatedLogImplTest {
         public List<ReplicatedLogEntry> getEntriesTill(final int index) {
             return journal.subList(0, index);
         }
+
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+        }
     }
 }
index 53cca23..63f0df2 100644 (file)
@@ -12,7 +12,9 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.japi.Procedure;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
 import java.util.HashMap;
@@ -203,6 +205,20 @@ public class MockRaftActorContext implements RaftActorContext {
         this.configParams = configParams;
     }
 
+    @Override
+    public long getTotalMemory() {
+        return Runtime.getRuntime().totalMemory();
+    }
+
+    @Override
+    public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+    }
+
+    @Override
+    public boolean hasFollowers() {
+        return getPeerAddresses().keySet().size() > 0;
+    }
+
     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
         @Override public void appendAndPersist(
             ReplicatedLogEntry replicatedLogEntry) {
@@ -217,6 +233,19 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
+
+        @Override
+        public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
+            append(replicatedLogEntry);
+
+            if(callback != null) {
+                try {
+                    callback.apply(replicatedLogEntry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
     public static class MockPayload extends Payload implements Serializable {
index 2eee0e8..678ac34 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.controller.md.sal.binding.api;
 import java.util.Collection;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
+import org.opendaylight.yangtools.yang.binding.ChildOf;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.Identifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
 
 /**
@@ -20,7 +23,7 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
  * Represents modification of Data Object.
  *
  */
-public interface DataObjectModification<T extends DataObject> extends Identifiable<PathArgument> {
+public interface DataObjectModification<T extends DataObject> extends org.opendaylight.yangtools.concepts.Identifiable<PathArgument> {
 
     enum ModificationType {
         /**
@@ -76,5 +79,56 @@ public interface DataObjectModification<T extends DataObject> extends Identifiab
      */
     @Nonnull Collection<DataObjectModification<? extends DataObject>> getModifiedChildren();
 
+    /**
+     * Returns container child modification if {@code child} was modified by this
+     * modification.
+     *
+     * For accessing all modified list items consider iterating over {@link #getModifiedChildren()}.
+     *
+     * @param child Type of child - must be only container
+     * @return Modification of {@code child} if {@code child} was modified, null otherwise.
+     * @throws IllegalArgumentException If supplied {@code child} class is not valid child according
+     *         to generated model.
+     */
+    @Nullable <C extends ChildOf<? super T>> DataObjectModification<C> getModifiedChildContainer(@Nonnull Class<C> child);
+
+    /**
+     * Returns augmentation child modification if {@code augmentation} was modified by this
+     * modification.
+     *
+     * For accessing all modified list items consider iterating over {@link #getModifiedChildren()}.
+     *
+     * @param augmentation Type of augmentation - must be only container
+     * @return Modification of {@code augmentation} if {@code augmentation} was modified, null otherwise.
+     * @throws IllegalArgumentException If supplied {@code augmentation} class is not valid augmentation
+     *         according to generated model.
+     */
+    @Nullable <C extends Augmentation<T> & DataObject> DataObjectModification<C> getModifiedAugmentation(@Nonnull Class<C> augmentation);
+
+
+    /**
+     * Returns child list item modification if {@code child} was modified by this modification.
+     *
+     * @param listItem Type of list item - must be list item with key
+     * @param listKey List item key
+     * @return Modification of {@code child} if {@code child} was modified, null otherwise.
+     * @throws IllegalArgumentException If supplied {@code listItem} class is not valid child according
+     *         to generated model.
+     */
+    <C extends Identifiable<K> & ChildOf<? super T>, K extends Identifier<C>> DataObjectModification<C> getModifiedChildListItem(
+            @Nonnull Class<C> listItem,@Nonnull  K listKey);
+
+    /**
+     * Returns a child modification if a node identified by {@code childArgument} was modified by
+     * this modification.
+     *
+     * @param childArgument Path Argument of child node
+     * @return Modification of child identified by {@code childArgument} if {@code childArgument}
+     *         was modified, null otherwise.
+     * @throws IllegalArgumentException If supplied path argument is not valid child according to
+     *         generated model.
+     *
+     */
+    @Nullable DataObjectModification<? extends DataObject> getModifiedChild(PathArgument childArgument);
 
 }
index 6b1df71..93ab968 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.md.sal.binding.api;
 import java.util.Collection;
 import java.util.EventListener;
 import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 
 /**
  * Interface implemented by classes interested in receiving notifications about
@@ -17,7 +18,7 @@ import javax.annotation.Nonnull;
  * in that it provides a cursor-based view of the change, which has potentially
  * lower overhead and allow more flexible consumption of change event.
  */
-public interface DataTreeChangeListener extends EventListener {
+public interface DataTreeChangeListener<T extends DataObject> extends EventListener {
     /**
      * Invoked when there was data change for the supplied path, which was used
      * to register this listener.
@@ -39,5 +40,5 @@ public interface DataTreeChangeListener extends EventListener {
      *
      * @param changes Collection of change events, may not be null or empty.
      */
-    void onDataTreeChanged(@Nonnull Collection<DataTreeModification> changes);
+    void onDataTreeChanged(@Nonnull Collection<DataTreeModification<T>> changes);
 }
index ae4e36f..9d12e44 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.md.sal.binding.api;
 
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 
 /**
  * A {@link DOMService} which allows users to register for changes to a
@@ -50,5 +51,5 @@ public interface DataTreeChangeService extends BindingService {
      *         your listener using {@link ListenerRegistration#close()} to stop
      *         delivery of change events.
      */
-    @Nonnull <L extends DataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(@Nonnull DataTreeIdentifier treeId, @Nonnull L listener);
+    @Nonnull <T extends DataObject,L extends DataTreeChangeListener<T>> ListenerRegistration<L> registerDataTreeChangeListener(@Nonnull DataTreeIdentifier<T> treeId, @Nonnull L listener);
 }
\ No newline at end of file
index 428957e..c1c23d5 100644 (file)
@@ -13,18 +13,19 @@ import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yangtools.concepts.Immutable;
 import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
 /**
  * A unique identifier for a particular subtree. It is composed of the logical
  * data store type and the instance identifier of the root node.
  */
-public final class DataTreeIdentifier implements Immutable, Path<DataTreeIdentifier>, Serializable {
+public final class DataTreeIdentifier<T extends DataObject> implements Immutable, Path<DataTreeIdentifier<?>>, Serializable {
     private static final long serialVersionUID = 1L;
-    private final InstanceIdentifier<?> rootIdentifier;
+    private final InstanceIdentifier<T> rootIdentifier;
     private final LogicalDatastoreType datastoreType;
 
-    public DataTreeIdentifier(final LogicalDatastoreType datastoreType, final InstanceIdentifier<?> rootIdentifier) {
+    public DataTreeIdentifier(final LogicalDatastoreType datastoreType, final InstanceIdentifier<T> rootIdentifier) {
         this.datastoreType = Preconditions.checkNotNull(datastoreType);
         this.rootIdentifier = Preconditions.checkNotNull(rootIdentifier);
     }
@@ -48,7 +49,7 @@ public final class DataTreeIdentifier implements Immutable, Path<DataTreeIdentif
     }
 
     @Override
-    public boolean contains(final DataTreeIdentifier other) {
+    public boolean contains(final DataTreeIdentifier<?> other) {
         return datastoreType == other.datastoreType && rootIdentifier.contains(other.rootIdentifier);
     }
 
@@ -69,7 +70,7 @@ public final class DataTreeIdentifier implements Immutable, Path<DataTreeIdentif
         if (!(obj instanceof DataTreeIdentifier)) {
             return false;
         }
-        DataTreeIdentifier other = (DataTreeIdentifier) obj;
+        final DataTreeIdentifier<?> other = (DataTreeIdentifier<?>) obj;
         if (datastoreType != other.datastoreType) {
             return false;
         }
index aac51a6..8163bac 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
  * @author Tony Tkacik &lt;ttkacik@cisco.com&gt;
  *
  */
-public interface DataTreeModification {
+public interface DataTreeModification<T extends DataObject> {
 
     /**
      * Get the modification root path. This is the path of the root node
@@ -25,13 +25,13 @@ public interface DataTreeModification {
      *
      * @return absolute path of the root node
      */
-    @Nonnull DataTreeIdentifier getRootPath();
+    @Nonnull DataTreeIdentifier<T> getRootPath();
 
     /**
      * Get the modification root node.
      *
      * @return modification root node
      */
-    @Nonnull DataObjectModification<? extends DataObject> getRootNode();
+    @Nonnull DataObjectModification<T> getRootNode();
 
 }
index b17be16..1c43f12 100644 (file)
@@ -13,14 +13,19 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Set;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.binding.impl.BindingDOMAdapterBuilder.Factory;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 
 /**
  * The DataBrokerImpl simply defers to the DOMDataBroker for all its operations.
@@ -32,7 +37,7 @@ import org.opendaylight.controller.sal.core.api.model.SchemaService;
  *
 
  */
-public class BindingDOMDataBrokerAdapter extends AbstractForwardedDataBroker implements DataBroker {
+public class BindingDOMDataBrokerAdapter extends AbstractForwardedDataBroker implements DataBroker, DataTreeChangeService {
 
 
     static final Factory<DataBroker> BUILDER_FACTORY = new BindingDOMAdapterBuilder.Factory<DataBroker>() {
@@ -43,14 +48,16 @@ public class BindingDOMDataBrokerAdapter extends AbstractForwardedDataBroker imp
         }
 
     };
+    private final DataTreeChangeService treeChangeService;
 
     public BindingDOMDataBrokerAdapter(final DOMDataBroker domDataBroker, final BindingToNormalizedNodeCodec codec) {
         super(domDataBroker, codec);
-    }
-
-    @Deprecated
-    public BindingDOMDataBrokerAdapter(final DOMDataBroker domDataBroker, final BindingToNormalizedNodeCodec codec, final SchemaService schemaService) {
-        super(domDataBroker, codec,schemaService);
+        final DOMDataTreeChangeService domTreeChange = (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
+        if(domTreeChange != null) {
+            treeChangeService = BindingDOMDataTreeChangeServiceAdapter.create(codec, domTreeChange);
+        } else {
+            treeChangeService = null;
+        }
     }
 
     @Override
@@ -82,13 +89,21 @@ public class BindingDOMDataBrokerAdapter extends AbstractForwardedDataBroker imp
         }
 
         @Override
-        protected DataBroker createInstance(BindingToNormalizedNodeCodec codec,
-                ClassToInstanceMap<DOMService> delegates) {
-            DOMDataBroker domDataBroker = delegates.getInstance(DOMDataBroker.class);
+        protected DataBroker createInstance(final BindingToNormalizedNodeCodec codec,
+                final ClassToInstanceMap<DOMService> delegates) {
+            final DOMDataBroker domDataBroker = delegates.getInstance(DOMDataBroker.class);
             return new BindingDOMDataBrokerAdapter(domDataBroker, codec);
         }
 
+    }
 
-
+    @Override
+    public <T extends DataObject, L extends DataTreeChangeListener<T>> ListenerRegistration<L> registerDataTreeChangeListener(
+            final DataTreeIdentifier<T> treeId, final L listener) {
+        if(treeChangeService == null) {
+            throw new UnsupportedOperationException("Underlying data broker does not expose DOMDataTreeChangeService.");
+        }
+        return treeChangeService.registerDataTreeChangeListener(treeId, listener);
     }
+
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeListenerAdapter.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeListenerAdapter.java
new file mode 100644 (file)
index 0000000..ab1348f
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Adapter wrapping Binding {@link DataTreeChangeListener} and exposing
+ * it as {@link DOMDataTreeChangeListener} and translated DOM events
+ * to their Binding equivalent.
+ *
+ */
+final class BindingDOMDataTreeChangeListenerAdapter<T extends DataObject> implements DOMDataTreeChangeListener {
+
+    private final BindingToNormalizedNodeCodec codec;
+    private final DataTreeChangeListener<T> listener;
+    private final LogicalDatastoreType store;
+
+    BindingDOMDataTreeChangeListenerAdapter(final BindingToNormalizedNodeCodec codec, final DataTreeChangeListener<T> listener,
+            final LogicalDatastoreType store) {
+        this.codec = Preconditions.checkNotNull(codec);
+        this.listener = Preconditions.checkNotNull(listener);
+        this.store = Preconditions.checkNotNull(store);
+    }
+
+    @Override
+    public void onDataTreeChanged(final Collection<DataTreeCandidate> domChanges) {
+        final Collection<DataTreeModification<T>> bindingChanges = LazyDataTreeModification.from(codec, domChanges, store);
+        listener.onDataTreeChanged(bindingChanges);
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeServiceAdapter.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDOMDataTreeChangeServiceAdapter.java
new file mode 100644 (file)
index 0000000..ad0ab54
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+
+/**
+ *
+ * Adapter exposing Binding {@link DataTreeChangeService} and wrapping
+ * {@link DOMDataTreeChangeService} and is responsible for translation
+ * and instantiation of {@link BindingDOMDataTreeChangeListenerAdapter}
+ * adapters.
+ *
+ * Each registered {@link DataTreeChangeListener} is wrapped using
+ * adapter and registered directly to DOM service.
+ */
+final class BindingDOMDataTreeChangeServiceAdapter implements DataTreeChangeService {
+
+    private final BindingToNormalizedNodeCodec codec;
+    private final DOMDataTreeChangeService dataTreeChangeService;
+
+    private BindingDOMDataTreeChangeServiceAdapter(final BindingToNormalizedNodeCodec codec,
+            final DOMDataTreeChangeService dataTreeChangeService) {
+        this.codec = Preconditions.checkNotNull(codec);
+        this.dataTreeChangeService = Preconditions.checkNotNull(dataTreeChangeService);
+    }
+
+    static DataTreeChangeService create(final BindingToNormalizedNodeCodec codec,
+            final DOMDataTreeChangeService dataTreeChangeService) {
+        return new BindingDOMDataTreeChangeServiceAdapter(codec, dataTreeChangeService);
+    }
+
+    @Override
+    public <T extends DataObject, L extends DataTreeChangeListener<T>> ListenerRegistration<L> registerDataTreeChangeListener(
+            final DataTreeIdentifier<T> treeId, final L listener) {
+        final DOMDataTreeIdentifier domIdentifier = toDomTreeIdentifier(treeId);
+        final BindingDOMDataTreeChangeListenerAdapter<T> domListener = new BindingDOMDataTreeChangeListenerAdapter<>(codec,listener, treeId.getDatastoreType());
+        final ListenerRegistration<BindingDOMDataTreeChangeListenerAdapter<T>> domReg = dataTreeChangeService.registerDataTreeChangeListener(domIdentifier, domListener);
+        return new BindingDataTreeChangeListenerRegistration<>(listener,domReg);
+    }
+
+    private DOMDataTreeIdentifier toDomTreeIdentifier(final DataTreeIdentifier<?> treeId) {
+        final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifier(treeId.getRootIdentifier());
+        return new DOMDataTreeIdentifier(treeId.getDatastoreType(), domPath);
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataTreeChangeListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataTreeChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..8a92e5f
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+class BindingDataTreeChangeListenerRegistration<L extends DataTreeChangeListener<?>> extends AbstractListenerRegistration<L> {
+
+    private final ListenerRegistration<?> domReg;
+
+    BindingDataTreeChangeListenerRegistration(final L listener, final ListenerRegistration<?> domReg) {
+        super(listener);
+        this.domReg = Preconditions.checkNotNull(domReg);
+    }
+
+    @Override
+    protected void removeRegistration() {
+        domReg.close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingStructuralType.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingStructuralType.java
new file mode 100644 (file)
index 0000000..7cd17dc
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Optional;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.AnyXmlNode;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+
+/**
+ *
+ * Defines structural mapping of Normalized Node to Binding data
+ * addressable by Instance Identifier.
+ *
+ * Not all binding data are addressable by instance identifier
+ * and there are some differences.
+ *
+ * See {@link #NOT_ADDRESSABLE},{@link #INVISIBLE_CONTAINER},{@link #VISIBLE_CONTAINER}
+ * for more details.
+ *
+ *
+ */
+enum BindingStructuralType {
+
+    /**
+     * DOM Item is not addressable in Binding Instance Identifier,
+     * data is not lost, but are available only via parent object.
+     *
+     * Such types of data are leaf-lists, leafs, list without keys
+     * or anyxml.
+     *
+     */
+    NOT_ADDRESSABLE,
+    /**
+     * Data container is addressable in NormalizedNode format,
+     * but in Binding it is not represented in Instance Identifier.
+     *
+     * This are choice / case nodes.
+     *
+     * This data is still accessible using parent object and their
+     * children are addressable.
+     *
+     */
+    INVISIBLE_CONTAINER,
+    /**
+     * Data container is addressable in NormalizedNode format,
+     * but in Binding it is not represented in Instance Identifier.
+     *
+     * This are list nodes.
+     *
+     * This data is still accessible using parent object and their
+     * children are addressable.
+     *
+     */
+    INVISIBLE_LIST,
+    /**
+     * Data container is addressable in Binding Instance Identifier format
+     * and also YangInstanceIdentifier format.
+     *
+     */
+    VISIBLE_CONTAINER,
+    /**
+     * Mapping algorithm was unable to detect type or was not updated after introduction
+     * of new NormalizedNode type.
+     */
+    UNKNOWN;
+
+    static BindingStructuralType from(final DataTreeCandidateNode domChildNode) {
+        final Optional<NormalizedNode<?, ?>> dataBased = domChildNode.getDataAfter().or(domChildNode.getDataBefore());
+        if(dataBased.isPresent()) {
+            return from(dataBased.get());
+        }
+        return from(domChildNode.getIdentifier());
+    }
+
+    private static BindingStructuralType from(final PathArgument identifier) {
+        if(identifier instanceof NodeIdentifierWithPredicates || identifier instanceof AugmentationIdentifier) {
+            return VISIBLE_CONTAINER;
+        }
+        if(identifier instanceof NodeWithValue) {
+            return NOT_ADDRESSABLE;
+        }
+        return UNKNOWN;
+    }
+
+    static BindingStructuralType from(final NormalizedNode<?, ?> data) {
+        if(isNotAddressable(data)) {
+            return NOT_ADDRESSABLE;
+        }
+        if(data instanceof MapNode) {
+            return INVISIBLE_LIST;
+        }
+        if(data instanceof ChoiceNode) {
+            return INVISIBLE_CONTAINER;
+        }
+        if(isVisibleContainer(data)) {
+            return VISIBLE_CONTAINER;
+        }
+        return UNKNOWN;
+    }
+
+    private static boolean isVisibleContainer(final NormalizedNode<?, ?> data) {
+        return data instanceof MapEntryNode || data instanceof ContainerNode || data instanceof AugmentationNode;
+    }
+
+    private static boolean isNotAddressable(final NormalizedNode<?, ?> d) {
+        return d instanceof LeafNode
+                || d instanceof AnyXmlNode
+                || d instanceof LeafSetNode
+                || d instanceof LeafSetEntryNode;
+    }
+
+}
index d9e58e5..b727e53 100644 (file)
@@ -9,9 +9,12 @@ package org.opendaylight.controller.md.sal.binding.impl;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableBiMap;
 import java.lang.reflect.Method;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
@@ -19,6 +22,7 @@ import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizat
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
 import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTree;
 import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeFactory;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
 import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
 import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
@@ -61,13 +65,13 @@ public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, Bi
     }
 
     @Override
-    public YangInstanceIdentifier toYangInstanceIdentifier(InstanceIdentifier<?> binding) {
+    public YangInstanceIdentifier toYangInstanceIdentifier(final InstanceIdentifier<?> binding) {
         return codecRegistry.toYangInstanceIdentifier(binding);
     }
 
     @Override
     public <T extends DataObject> Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
-            InstanceIdentifier<T> path, T data) {
+            final InstanceIdentifier<T> path, final T data) {
         return codecRegistry.toNormalizedNode(path, data);
     }
 
@@ -78,33 +82,33 @@ public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, Bi
     }
 
     @Override
-    public Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode(YangInstanceIdentifier path,
-            NormalizedNode<?, ?> data) {
+    public Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode(final YangInstanceIdentifier path,
+            final NormalizedNode<?, ?> data) {
         return codecRegistry.fromNormalizedNode(path, data);
     }
 
     @Override
-    public Notification fromNormalizedNodeNotification(SchemaPath path, ContainerNode data) {
+    public Notification fromNormalizedNodeNotification(final SchemaPath path, final ContainerNode data) {
         return codecRegistry.fromNormalizedNodeNotification(path, data);
     }
 
     @Override
-    public DataObject fromNormalizedNodeRpcData(SchemaPath path, ContainerNode data) {
+    public DataObject fromNormalizedNodeRpcData(final SchemaPath path, final ContainerNode data) {
         return codecRegistry.fromNormalizedNodeRpcData(path, data);
     }
 
     @Override
-    public InstanceIdentifier<?> fromYangInstanceIdentifier(YangInstanceIdentifier dom) {
+    public InstanceIdentifier<?> fromYangInstanceIdentifier(final YangInstanceIdentifier dom) {
         return codecRegistry.fromYangInstanceIdentifier(dom);
     }
 
     @Override
-    public ContainerNode toNormalizedNodeNotification(Notification data) {
+    public ContainerNode toNormalizedNodeNotification(final Notification data) {
         return codecRegistry.toNormalizedNodeNotification(data);
     }
 
     @Override
-    public ContainerNode toNormalizedNodeRpcData(DataContainer data) {
+    public ContainerNode toNormalizedNodeRpcData(final DataContainer data) {
         return codecRegistry.toNormalizedNodeRpcData(data);
     }
 
@@ -225,13 +229,27 @@ public class BindingToNormalizedNodeCodec implements BindingCodecTreeFactory, Bi
     }
 
     @Override
-    public BindingCodecTree create(BindingRuntimeContext context) {
+    public BindingCodecTree create(final BindingRuntimeContext context) {
         return codecRegistry.create(context);
     }
 
     @Override
-    public BindingCodecTree create(SchemaContext context, Class<?>... bindingClasses) {
+    public BindingCodecTree create(final SchemaContext context, final Class<?>... bindingClasses) {
         return codecRegistry.create(context, bindingClasses);
     }
 
+    @Nonnull protected Map.Entry<InstanceIdentifier<?>, BindingCodecTreeNode<?>> getSubtreeCodec(
+            final YangInstanceIdentifier domIdentifier) {
+
+        final BindingCodecTree currentCodecTree = codecRegistry.getCodecContext();
+        final InstanceIdentifier<?> bindingPath = codecRegistry.fromYangInstanceIdentifier(domIdentifier);
+        Preconditions.checkArgument(bindingPath != null);
+        /**
+         * If we are able to deserialize YANG instance identifier, getSubtreeCodec must
+         * return non-null value.
+         */
+        final BindingCodecTreeNode<?> codecContext = currentCodecTree.getSubtreeCodec(bindingPath);
+        return new SimpleEntry<InstanceIdentifier<?>, BindingCodecTreeNode<?>>(bindingPath, codecContext);
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataObjectModification.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataObjectModification.java
new file mode 100644 (file)
index 0000000..a165242
--- /dev/null
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
+import org.opendaylight.yangtools.yang.binding.ChildOf;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.Identifiable;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Lazily translated {@link DataObjectModification} based on {@link DataTreeCandidateNode}.
+ *
+ * {@link LazyDataObjectModification} represents Data tree change event,
+ * but whole tree is not translated or resolved eagerly, but only child nodes
+ * which are directly accessed by user of data object modification.
+ *
+ * @param <T> Type of Binding Data Object
+ */
+class LazyDataObjectModification<T extends DataObject> implements DataObjectModification<T> {
+
+    private final static Logger LOG = LoggerFactory.getLogger(LazyDataObjectModification.class);
+
+    private final BindingCodecTreeNode<T> codec;
+    private final DataTreeCandidateNode domData;
+    private final PathArgument identifier;
+    private Collection<DataObjectModification<? extends DataObject>> childNodesCache;
+
+    private LazyDataObjectModification(final BindingCodecTreeNode<T> codec, final DataTreeCandidateNode domData) {
+        this.codec = Preconditions.checkNotNull(codec);
+        this.domData = Preconditions.checkNotNull(domData);
+        this.identifier = codec.deserializePathArgument(domData.getIdentifier());
+    }
+
+    static <T extends DataObject> DataObjectModification<T> create(final BindingCodecTreeNode<T> codec,
+            final DataTreeCandidateNode domData) {
+        return new LazyDataObjectModification<>(codec,domData);
+    }
+
+    static Collection<DataObjectModification<? extends DataObject>> from(final BindingCodecTreeNode<?> parentCodec,
+            final Collection<DataTreeCandidateNode> domChildNodes) {
+        final ArrayList<DataObjectModification<? extends DataObject>> result = new ArrayList<>(domChildNodes.size());
+        populateList(result, parentCodec, domChildNodes);
+        return result;
+    }
+
+    private static void populateList(final List<DataObjectModification<? extends DataObject>> result,
+            final BindingCodecTreeNode<?> parentCodec, final Collection<DataTreeCandidateNode> domChildNodes) {
+        for (final DataTreeCandidateNode domChildNode : domChildNodes) {
+            final BindingStructuralType type = BindingStructuralType.from(domChildNode);
+            if (type != BindingStructuralType.NOT_ADDRESSABLE) {
+                /*
+                 *  Even if type is UNKNOWN, from perspective of BindingStructuralType
+                 *  we try to load codec for it. We will use that type to further specify
+                 *  debug log.
+                 */
+                try {
+                    final BindingCodecTreeNode<?> childCodec =
+                            parentCodec.yangPathArgumentChild(domChildNode.getIdentifier());
+                    populateList(result,type, childCodec, domChildNode);
+                } catch (final IllegalArgumentException e) {
+                    if(type == BindingStructuralType.UNKNOWN) {
+                        LOG.debug("Unable to deserialize unknown DOM node {}",domChildNode,e);
+                    } else {
+                        LOG.debug("Binding representation for DOM node {} was not found",domChildNode,e);
+                    }
+                }
+            }
+        }
+    }
+
+
+    private static void populateList(final List<DataObjectModification<? extends DataObject>> result,
+            final BindingStructuralType type, final BindingCodecTreeNode<?> childCodec,
+            final DataTreeCandidateNode domChildNode) {
+        switch (type) {
+            case INVISIBLE_LIST:
+                // We use parent codec intentionally.
+                populateListWithSingleCodec(result, childCodec, domChildNode.getChildNodes());
+                break;
+            case INVISIBLE_CONTAINER:
+                populateList(result, childCodec, domChildNode.getChildNodes());
+                break;
+            case UNKNOWN:
+            case VISIBLE_CONTAINER:
+                result.add(create(childCodec, domChildNode));
+            default:
+                break;
+        }
+    }
+
+    private static void populateListWithSingleCodec(final List<DataObjectModification<? extends DataObject>> result,
+            final BindingCodecTreeNode<?> codec, final Collection<DataTreeCandidateNode> childNodes) {
+        for (final DataTreeCandidateNode child : childNodes) {
+            result.add(create(codec, child));
+        }
+    }
+
+    @Override
+    public T getDataAfter() {
+        return deserialize(domData.getDataAfter());
+    }
+
+    @Override
+    public Class<T> getDataType() {
+        return codec.getBindingClass();
+    }
+
+    @Override
+    public PathArgument getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public DataObjectModification.ModificationType getModificationType() {
+        switch(domData.getModificationType()) {
+            case WRITE:
+                return DataObjectModification.ModificationType.WRITE;
+            case SUBTREE_MODIFIED:
+                return DataObjectModification.ModificationType.SUBTREE_MODIFIED;
+            case DELETE:
+                return DataObjectModification.ModificationType.DELETE;
+
+            default:
+                // TODO: Should we lie about modification type instead of exception?
+                throw new IllegalStateException("Unsupported DOM Modification type " + domData.getModificationType());
+        }
+    }
+
+    @Override
+    public Collection<DataObjectModification<? extends DataObject>> getModifiedChildren() {
+        if(childNodesCache == null) {
+            childNodesCache = from(codec,domData.getChildNodes());
+        }
+        return childNodesCache;
+    }
+
+    @Override
+    public DataObjectModification<? extends DataObject> getModifiedChild(final PathArgument arg) {
+        final List<YangInstanceIdentifier.PathArgument> domArgumentList = new ArrayList<>();
+        final BindingCodecTreeNode<?> childCodec = codec.bindingPathArgumentChild(arg, domArgumentList);
+        final Iterator<YangInstanceIdentifier.PathArgument> toEnter = domArgumentList.iterator();
+        DataTreeCandidateNode current = domData;
+        while (toEnter.hasNext() && current != null) {
+            current = current.getModifiedChild(toEnter.next());
+        }
+        if (current != null) {
+            return create(childCodec, current);
+        }
+        return null;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <C extends Identifiable<K> & ChildOf<? super T>, K extends Identifier<C>> DataObjectModification<C> getModifiedChildListItem(
+            final Class<C> listItem, final K listKey) {
+        return (DataObjectModification<C>) getModifiedChild(new InstanceIdentifier.IdentifiableItem<>(listItem, listKey));
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <C extends ChildOf<? super T>> DataObjectModification<C> getModifiedChildContainer(final Class<C> arg) {
+        return (DataObjectModification<C>) getModifiedChild(new InstanceIdentifier.Item<>(arg));
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <C extends Augmentation<T> & DataObject> DataObjectModification<C> getModifiedAugmentation(
+            final Class<C> augmentation) {
+        return (DataObjectModification<C>) getModifiedChild(new InstanceIdentifier.Item<>(augmentation));
+    }
+
+    private T deserialize(final Optional<NormalizedNode<?, ?>> dataAfter) {
+        if(dataAfter.isPresent()) {
+            return codec.deserialize(dataAfter.get());
+        }
+        return null;
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataTreeModification.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataTreeModification.java
new file mode 100644 (file)
index 0000000..2a90f96
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.binding.data.codec.api.BindingCodecTreeNode;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Lazily translated {@link DataTreeModification} based on {@link DataTreeCandidate}.
+ *
+ * {@link DataTreeModification} represents Data tree change event,
+ * but whole tree is not translated or resolved eagerly, but only child nodes
+ * which are directly accessed by user of data object modification.
+ *
+ */
+class LazyDataTreeModification<T extends DataObject> implements DataTreeModification<T> {
+
+    private final DataTreeIdentifier<T> path;
+    private final DataObjectModification<T> rootNode;
+
+    LazyDataTreeModification(final LogicalDatastoreType datastoreType, final InstanceIdentifier<T> path, final BindingCodecTreeNode<T> codec, final DataTreeCandidate domChange) {
+        this.path = new DataTreeIdentifier<>(datastoreType, path);
+        this.rootNode = LazyDataObjectModification.create(codec, domChange.getRootNode());
+    }
+
+    @Override
+    public DataObjectModification<T> getRootNode() {
+        return rootNode;
+    }
+
+    @Override
+    public DataTreeIdentifier<T> getRootPath() {
+        return path;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    static <T extends DataObject> DataTreeModification<T> create(final BindingToNormalizedNodeCodec codec, final DataTreeCandidate domChange,
+            final LogicalDatastoreType datastoreType) {
+        final Entry<InstanceIdentifier<?>, BindingCodecTreeNode<?>> codecCtx =
+                codec.getSubtreeCodec(domChange.getRootPath());
+        return (DataTreeModification<T>) new LazyDataTreeModification(datastoreType, codecCtx.getKey(), codecCtx.getValue(), domChange);
+    }
+
+    static <T extends DataObject> Collection<DataTreeModification<T>> from(final BindingToNormalizedNodeCodec codec,
+            final Collection<DataTreeCandidate> domChanges, final LogicalDatastoreType datastoreType) {
+        final List<DataTreeModification<T>> result = new ArrayList<>(domChanges.size());
+        for (final DataTreeCandidate domChange : domChanges) {
+            result.add(LazyDataTreeModification.<T>create(codec, domChange, datastoreType));
+        }
+        return result;
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/DataTreeChangeListenerTest.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/DataTreeChangeListenerTest.java
new file mode 100644 (file)
index 0000000..888a628
--- /dev/null
@@ -0,0 +1,163 @@
+package org.opendaylight.controller.md.sal.binding.impl.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_BAR_KEY;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.TOP_FOO_KEY;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.USES_ONE_KEY;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.complexUsesAugment;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.path;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.top;
+import static org.opendaylight.controller.md.sal.test.model.util.ListsBindingUtils.topLevelList;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
+import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.augment.rev140709.TreeComplexUsesAugment;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.TwoLevelList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+
+public class DataTreeChangeListenerTest extends AbstractDataBrokerTest {
+
+    private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier.create(Top.class);
+    private static final PathArgument TOP_ARGUMENT= TOP_PATH.getPathArguments().iterator().next();
+    private static final InstanceIdentifier<TopLevelList> FOO_PATH = path(TOP_FOO_KEY);
+    private static final PathArgument FOO_ARGUMENT = Iterables.getLast(FOO_PATH.getPathArguments());
+    private static final TopLevelList FOO_DATA = topLevelList(TOP_FOO_KEY, complexUsesAugment(USES_ONE_KEY));
+    private static final InstanceIdentifier<TopLevelList> BAR_PATH = path(TOP_BAR_KEY);
+    private static final PathArgument BAR_ARGUMENT = Iterables.getLast(BAR_PATH.getPathArguments());
+    private static final TopLevelList BAR_DATA = topLevelList(TOP_BAR_KEY);
+private static final DataTreeIdentifier<Top> TOP_IDENTIFIER = new DataTreeIdentifier<Top>(LogicalDatastoreType.OPERATIONAL,
+            TOP_PATH);
+
+    private static final Top TOP_INITIAL_DATA = top(FOO_DATA);
+
+    private BindingDOMDataBrokerAdapter dataBrokerImpl;
+
+    private static final class EventCapturingListener<T extends DataObject> implements DataTreeChangeListener<T> {
+
+        private SettableFuture<Collection<DataTreeModification<T>>> changes = SettableFuture.create();
+
+        @Override
+        public void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
+            this.changes.set(changes);
+
+        }
+
+        Collection<DataTreeModification<T>> nextEvent() throws Exception {
+            final Collection<DataTreeModification<T>> result = changes.get(200,TimeUnit.MILLISECONDS);
+            changes = SettableFuture.create();
+            return result;
+        }
+
+    }
+
+    @Override
+    protected Iterable<YangModuleInfo> getModuleInfos() throws Exception {
+        return ImmutableSet.of(
+                BindingReflections.getModuleInfo(TwoLevelList.class),
+                BindingReflections.getModuleInfo(TreeComplexUsesAugment.class)
+                );
+    }
+
+    @Override
+    protected void setupWithDataBroker(final DataBroker dataBroker) {
+        dataBrokerImpl = (BindingDOMDataBrokerAdapter) dataBroker;
+    }
+
+    @Test
+    public void testTopLevelListener() throws Exception {
+        final EventCapturingListener<Top> listener = new EventCapturingListener<>();
+        dataBrokerImpl.registerDataTreeChangeListener(TOP_IDENTIFIER, listener);
+
+        createAndVerifyTop(listener);
+
+        putTx(BAR_PATH, BAR_DATA).submit().checkedGet();
+        final DataObjectModification<Top> afterBarPutEvent = Iterables.getOnlyElement(listener.nextEvent()).getRootNode();
+        verifyModification(afterBarPutEvent, TOP_ARGUMENT, ModificationType.SUBTREE_MODIFIED);
+        final DataObjectModification<TopLevelList> barPutMod = afterBarPutEvent.getModifiedChildListItem(TopLevelList.class, TOP_BAR_KEY);
+        assertNotNull(barPutMod);
+        verifyModification(barPutMod, BAR_ARGUMENT, ModificationType.WRITE);
+
+        deleteTx(BAR_PATH).submit().checkedGet();
+        final DataObjectModification<Top> afterBarDeleteEvent = Iterables.getOnlyElement(listener.nextEvent()).getRootNode();
+        verifyModification(afterBarDeleteEvent, TOP_ARGUMENT, ModificationType.SUBTREE_MODIFIED);
+        final DataObjectModification<TopLevelList> barDeleteMod = afterBarDeleteEvent.getModifiedChildListItem(TopLevelList.class, TOP_BAR_KEY);
+        verifyModification(barDeleteMod, BAR_ARGUMENT, ModificationType.DELETE);
+    }
+
+    @Test
+    public void testWildcardedListListener() throws Exception {
+        final EventCapturingListener<TopLevelList> listener = new EventCapturingListener<>();
+        final DataTreeIdentifier<TopLevelList> wildcard = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, TOP_PATH.child(TopLevelList.class));
+        dataBrokerImpl.registerDataTreeChangeListener(wildcard, listener);
+
+        putTx(TOP_PATH, TOP_INITIAL_DATA).submit().checkedGet();
+
+        final DataTreeModification<TopLevelList> fooWriteEvent = Iterables.getOnlyElement(listener.nextEvent());
+        assertEquals(FOO_PATH, fooWriteEvent.getRootPath().getRootIdentifier());
+        verifyModification(fooWriteEvent.getRootNode(), FOO_ARGUMENT, ModificationType.WRITE);
+
+        putTx(BAR_PATH, BAR_DATA).submit().checkedGet();
+        final DataTreeModification<TopLevelList> barWriteEvent = Iterables.getOnlyElement(listener.nextEvent());
+        assertEquals(BAR_PATH, barWriteEvent.getRootPath().getRootIdentifier());
+        verifyModification(barWriteEvent.getRootNode(), BAR_ARGUMENT, ModificationType.WRITE);
+
+        deleteTx(BAR_PATH).submit().checkedGet();
+        final DataTreeModification<TopLevelList> barDeleteEvent = Iterables.getOnlyElement(listener.nextEvent());
+        assertEquals(BAR_PATH, barDeleteEvent.getRootPath().getRootIdentifier());
+        verifyModification(barDeleteEvent.getRootNode(), BAR_ARGUMENT, ModificationType.DELETE);
+    }
+
+
+
+    private void createAndVerifyTop(final EventCapturingListener<Top> listener) throws Exception {
+        putTx(TOP_PATH,TOP_INITIAL_DATA).submit().checkedGet();
+        final Collection<DataTreeModification<Top>> events = listener.nextEvent();
+
+        assertFalse("Non empty collection should be received.",events.isEmpty());
+        final DataTreeModification<Top> initialWrite = Iterables.getOnlyElement(events);
+        final DataObjectModification<? extends DataObject> initialNode = initialWrite.getRootNode();
+        verifyModification(initialNode,TOP_PATH.getPathArguments().iterator().next(),ModificationType.WRITE);
+        assertEquals(TOP_INITIAL_DATA, initialNode.getDataAfter());
+    }
+
+    private void verifyModification(final DataObjectModification<? extends DataObject> barWrite, final PathArgument pathArg,
+            final ModificationType eventType) {
+        assertEquals(pathArg.getType(), barWrite.getDataType());
+        assertEquals(eventType,barWrite.getModificationType());
+        assertEquals(pathArg, barWrite.getIdentifier());
+    }
+
+    private <T extends DataObject> WriteTransaction putTx(final InstanceIdentifier<T> path,final T data) {
+        final WriteTransaction tx = dataBrokerImpl.newWriteOnlyTransaction();
+        tx.put(LogicalDatastoreType.OPERATIONAL, path, data);
+        return tx;
+    }
+
+    private WriteTransaction deleteTx(final InstanceIdentifier<?> path) {
+        final WriteTransaction tx = dataBrokerImpl.newWriteOnlyTransaction();
+        tx.delete(LogicalDatastoreType.OPERATIONAL, path);
+        return tx;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..cac0f51
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import java.util.List;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import scala.concurrent.Future;
+
+/**
+ * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} instances returned by this
+ * implementation. In addition to the usual set of methods it also contains the list of actor
+ * futures.
+ */
+abstract class AbstractThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    abstract List<Future<ActorSelection>> getCohortFutures();
+}
index 933e87a..d94e1c6 100644 (file)
@@ -7,22 +7,40 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import scala.concurrent.Future;
 
 abstract class AbstractTransactionContext implements TransactionContext {
 
-    protected final TransactionIdentifier identifier;
-    protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
+    private final List<Future<Object>> recordedOperationFutures = new ArrayList<>();
+    private final TransactionIdentifier identifier;
 
-    AbstractTransactionContext(TransactionIdentifier identifier) {
+    protected AbstractTransactionContext(TransactionIdentifier identifier) {
         this.identifier = identifier;
     }
 
     @Override
-    public List<Future<Object>> getRecordedOperationFutures() {
-        return recordedOperationFutures;
+    public final void copyRecordedOperationFutures(Collection<Future<Object>> target) {
+        target.addAll(recordedOperationFutures);
     }
-}
\ No newline at end of file
+
+    protected final TransactionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    protected final Collection<Future<Object>> copyRecordedOperationFutures() {
+        return ImmutableList.copyOf(recordedOperationFutures);
+    }
+
+    protected final int recordedOperationCount() {
+        return recordedOperationFutures.size();
+    }
+
+    protected final void recordOperationFuture(Future<Object> future) {
+        recordedOperationFutures.add(future);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
new file mode 100644 (file)
index 0000000..ed3aa85
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+final class ChainedTransactionProxy extends TransactionProxy {
+    private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class);
+
+    /**
+     * Stores the ready Futures from the previous Tx in the chain.
+     */
+    private final List<Future<ActorSelection>> previousReadyFutures;
+
+    /**
+     * Stores the ready Futures from this transaction when it is readied.
+     */
+    private volatile List<Future<ActorSelection>> readyFutures;
+
+    ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
+            String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
+        super(actorContext, transactionType, transactionChainId);
+        this.previousReadyFutures = previousReadyFutures;
+    }
+
+    List<Future<ActorSelection>> getReadyFutures() {
+        return readyFutures;
+    }
+
+    boolean isReady() {
+        return readyFutures != null;
+    }
+
+    @Override
+    public AbstractThreePhaseCommitCohort ready() {
+        final AbstractThreePhaseCommitCohort ret = super.ready();
+        readyFutures = ret.getCohortFutures();
+        LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
+            readyFutures.size(), getTransactionChainId());
+        return ret;
+    }
+
+    /**
+     * This method is overridden to ensure the previous Tx's ready operations complete
+     * before we initiate the next Tx in the chain to avoid creation failures if the
+     * previous Tx's ready operations haven't completed yet.
+     */
+    @Override
+    protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
+        // Check if there are any previous ready Futures, otherwise let the super class handle it.
+        if(previousReadyFutures.isEmpty()) {
+            return super.sendFindPrimaryShardAsync(shardName);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+                    previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
+        }
+
+        // Combine the ready Futures into 1.
+        Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
+                previousReadyFutures, getActorContext().getClientDispatcher());
+
+        // Add a callback for completion of the combined Futures.
+        final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
+        OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
+            @Override
+            public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
+                if(failure != null) {
+                    // A Ready Future failed so fail the returned Promise.
+                    returnPromise.failure(failure);
+                } else {
+                    LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
+                            getIdentifier(), getTransactionChainId());
+
+                    // Send the FindPrimaryShard message and use the resulting Future to complete the
+                    // returned Promise.
+                    returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
+                }
+            }
+        };
+
+        combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
+
+        return returnPromise.future();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpDOMStoreThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..376b658
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.List;
+import scala.concurrent.Future;
+
+/**
+ * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
+ * instance given out for empty transactions.
+ */
+final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort {
+    static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
+
+    private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS = Futures.immediateFuture(null);
+    private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS = Futures.immediateFuture(Boolean.TRUE);
+
+    private NoOpDOMStoreThreePhaseCommitCohort() {
+        // Hidden to prevent instantiation
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return IMMEDIATE_BOOLEAN_SUCCESS;
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return IMMEDIATE_VOID_SUCCESS;
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return IMMEDIATE_VOID_SUCCESS;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        return IMMEDIATE_VOID_SUCCESS;
+    }
+
+    @Override
+    List<Future<ActorSelection>> getCohortFutures() {
+        return Collections.emptyList();
+    }
+}
index 84f0776..672560b 100644 (file)
@@ -33,44 +33,44 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
 
     @Override
     public void closeTransaction() {
-        LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
+        LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called", identifier);
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
         operationLimiter.release();
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+        LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+        LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} writeData called path = {}", identifier, path);
+        LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
         operationLimiter.release();
     }
 
     @Override
     public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-        LOG.debug("Tx {} readData called path = {}", identifier, path);
+        LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
         operationLimiter.release();
         proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, failure));
     }
 
     @Override
     public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
-        LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+        LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
         operationLimiter.release();
         proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
     }
index aeb4062..3a2bcf2 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -25,7 +24,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -34,7 +32,7 @@ import scala.runtime.AbstractFunction1;
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
  */
-public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort{
+public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort {
 
     private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
 
@@ -209,7 +207,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> commit() {
-        OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+        OperationCallback operationCallback = cohortFutures.isEmpty() ? NO_OP_CALLBACK :
                 new TransactionRateLimitingCallback(actorContext);
 
         return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
@@ -322,7 +320,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
         }, actorContext.getClientDispatcher());
     }
 
-    @VisibleForTesting
+    @Override
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
index 58ac1d8..11066ed 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
@@ -21,18 +20,13 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
-import scala.concurrent.Promise;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain {
 
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
-
     private interface State {
         boolean isReady();
 
@@ -139,83 +133,4 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
     private void checkReadyState(State state) {
         Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
     }
-
-    private static class ChainedTransactionProxy extends TransactionProxy {
-
-        /**
-         * Stores the ready Futures from the previous Tx in the chain.
-         */
-        private final List<Future<ActorSelection>> previousReadyFutures;
-
-        /**
-         * Stores the ready Futures from this transaction when it is readied.
-         */
-        private volatile List<Future<ActorSelection>> readyFutures;
-
-        private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
-                String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
-            super(actorContext, transactionType, transactionChainId);
-            this.previousReadyFutures = previousReadyFutures;
-        }
-
-        List<Future<ActorSelection>> getReadyFutures() {
-            return readyFutures;
-        }
-
-        boolean isReady() {
-            return readyFutures != null;
-        }
-
-        @Override
-        protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
-            LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
-                    readyFutures.size(), getTransactionChainId());
-            this.readyFutures = readyFutures;
-        }
-
-        /**
-         * This method is overridden to ensure the previous Tx's ready operations complete
-         * before we initiate the next Tx in the chain to avoid creation failures if the
-         * previous Tx's ready operations haven't completed yet.
-         */
-        @Override
-        protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
-            // Check if there are any previous ready Futures, otherwise let the super class handle it.
-            if(previousReadyFutures.isEmpty()) {
-                return super.sendFindPrimaryShardAsync(shardName);
-            }
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
-                        previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
-            }
-
-            // Combine the ready Futures into 1.
-            Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
-                    previousReadyFutures, getActorContext().getClientDispatcher());
-
-            // Add a callback for completion of the combined Futures.
-            final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
-            OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
-                    if(failure != null) {
-                        // A Ready Future failed so fail the returned Promise.
-                        returnPromise.failure(failure);
-                    } else {
-                        LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
-                                getIdentifier(), getTransactionChainId());
-
-                        // Send the FindPrimaryShard message and use the resulting Future to complete the
-                        // returned Promise.
-                        returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
-
-            return returnPromise.future();
-        }
-    }
 }
index 1b8e65e..a5a7494 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
+import java.util.Collection;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -34,5 +34,5 @@ interface TransactionContext {
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
 
-    List<Future<Object>> getRecordedOperationFutures();
+    void copyRecordedOperationFutures(Collection<Future<Object>> target);
 }
index 3a20963..c61682d 100644 (file)
@@ -86,7 +86,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public void closeTransaction() {
-        LOG.debug("Tx {} closeTransaction called", identifier);
+        LOG.debug("Tx {} closeTransaction called", getIdentifier());
 
         actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
     }
@@ -94,7 +94,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-                identifier, recordedOperationFutures.size());
+            getIdentifier(), recordedOperationCount());
 
         // Send the remaining batched modifications if any.
 
@@ -113,8 +113,8 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Future will fail. We need all prior operations and the ready operation to succeed
         // in order to attempt commit.
 
-        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
-        futureList.addAll(recordedOperationFutures);
+        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1);
+        copyRecordedOperationFutures(futureList);
         futureList.add(withLastReplyFuture);
 
         Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
@@ -127,7 +127,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             @Override
             public ActorSelection checkedApply(Iterable<Object> notUsed) {
                 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
-                        identifier);
+                    getIdentifier());
 
                 // At this point all the Futures succeeded and we need to extract the cohort
                 // actor path from the ReadyTransactionReply. For the recorded operations, they
@@ -149,7 +149,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
                 } else {
                     // Throwing an exception here will fail the Future.
                     throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                            identifier, serializedReadyReply.getClass()));
+                        getIdentifier(), serializedReadyReply.getClass()));
                 }
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
@@ -161,7 +161,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
                     transactionChainId);
         }
 
@@ -176,7 +176,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     private void sendAndRecordBatchedModifications() {
         Future<Object> sentFuture = sendBatchedModifications();
         if(sentFuture != null) {
-            recordedOperationFutures.add(sentFuture);
+            recordOperationFuture(sentFuture);
         }
     }
 
@@ -188,14 +188,14 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         Future<Object> sent = null;
         if(batchedModifications != null) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Tx {} sending {} batched modifications, ready: {}", identifier,
+                LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
             }
 
             batchedModifications.setReady(ready);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(identifier.toString(), remoteTransactionVersion,
+            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
                     transactionChainId);
         }
 
@@ -204,89 +204,46 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        LOG.debug("Tx {} deleteData called path = {}", identifier, path);
+        LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
 
         batchModification(new DeleteModification(path));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} mergeData called path = {}", identifier, path);
+        LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
 
         batchModification(new MergeModification(path, data));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} writeData called path = {}", identifier, path);
+        LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
 
         batchModification(new WriteModification(path, data));
     }
 
     @Override
-    public void readData(
-            final YangInstanceIdentifier path,final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
+    public void readData(final YangInstanceIdentifier path,
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
 
-        LOG.debug("Tx {} readData called path = {}", identifier, path);
+        LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
 
-        // Send the remaining batched modifications if any.
+        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+        // public API contract.
 
         sendAndRecordBatchedModifications();
 
-        // If there were any previous recorded put/merge/delete operation reply Futures then we
-        // must wait for them to successfully complete. This is necessary to honor the read
-        // uncommitted semantics of the public API contract. If any one fails then fail the read.
-
-        if(recordedOperationFutures.isEmpty()) {
-            finishReadData(path, returnFuture);
-        } else {
-            LOG.debug("Tx {} readData: verifying {} previous recorded operations",
-                    identifier, recordedOperationFutures.size());
-
-            // Note: we make a copy of recordedOperationFutures to be on the safe side in case
-            // Futures#sequence accesses the passed List on a different thread, as
-            // recordedOperationFutures is not synchronized.
-
-            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                    Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getClientDispatcher());
-
-            OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<Object> notUsed)
-                        throws Throwable {
-                    if(failure != null) {
-                        LOG.debug("Tx {} readData: a recorded operation failed: {}",
-                                identifier, failure);
-                        returnFuture.setException(new ReadFailedException(
-                                "The read could not be performed because a previous put, merge,"
-                                + "or delete operation failed", failure));
-                    } else {
-                        finishReadData(path, returnFuture);
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
-        }
-
-    }
-
-    private void finishReadData(final YangInstanceIdentifier path,
-            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
-
-        LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
-
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object readResponse) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+                    LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure);
                     returnFuture.setException(new ReadFailedException(
                             "Error reading data for path " + path, failure));
 
                 } else {
-                    LOG.debug("Tx {} read operation succeeded", identifier, failure);
+                    LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure);
 
                     if (readResponse instanceof ReadDataReply) {
                         ReadDataReply reply = (ReadDataReply) readResponse;
@@ -312,64 +269,22 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     @Override
     public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
 
-        LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+        LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
 
-        // Send the remaining batched modifications if any.
+        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
+        // public API contract.
 
         sendAndRecordBatchedModifications();
 
-        // If there were any previous recorded put/merge/delete operation reply Futures then we
-        // must wait for them to successfully complete. This is necessary to honor the read
-        // uncommitted semantics of the public API contract. If any one fails then fail this
-        // request.
-
-        if(recordedOperationFutures.isEmpty()) {
-            finishDataExists(path, returnFuture);
-        } else {
-            LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
-                    identifier, recordedOperationFutures.size());
-
-            // Note: we make a copy of recordedOperationFutures to be on the safe side in case
-            // Futures#sequence accesses the passed List on a different thread, as
-            // recordedOperationFutures is not synchronized.
-
-            Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                    Lists.newArrayList(recordedOperationFutures),
-                    actorContext.getClientDispatcher());
-            OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-                @Override
-                public void onComplete(Throwable failure, Iterable<Object> notUsed)
-                        throws Throwable {
-                    if(failure != null) {
-                        LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
-                                identifier, failure);
-                        returnFuture.setException(new ReadFailedException(
-                                "The data exists could not be performed because a previous "
-                                + "put, merge, or delete operation failed", failure));
-                    } else {
-                        finishDataExists(path, returnFuture);
-                    }
-                }
-            };
-
-            combinedFutures.onComplete(onComplete, actorContext.getClientDispatcher());
-        }
-    }
-
-    private void finishDataExists(final YangInstanceIdentifier path,
-            final SettableFuture<Boolean> returnFuture) {
-
-        LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
-
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+                    LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure);
                     returnFuture.setException(new ReadFailedException(
                             "Error checking data exists for path " + path, failure));
                 } else {
-                    LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+                    LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure);
 
                     if (response instanceof DataExistsReply) {
                         returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java
new file mode 100644 (file)
index 0000000..dc965ed
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Abstract superclass for transaction operations which should be executed
+ * on a {@link TransactionContext} at a later point in time.
+ */
+abstract class TransactionOperation {
+    /**
+     * Execute the delayed operation.
+     *
+     * @param transactionContext
+     */
+    protected abstract void invoke(TransactionContext transactionContext);
+}
index a7effbc..59c9298 100644 (file)
@@ -12,21 +12,16 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,7 +30,6 @@ import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
@@ -43,7 +37,6 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -73,19 +66,24 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         WRITE_ONLY,
         READ_WRITE;
 
-        public static TransactionType fromInt(int type) {
-            if(type == WRITE_ONLY.ordinal()) {
-                return WRITE_ONLY;
-            } else if(type == READ_WRITE.ordinal()) {
-                return READ_WRITE;
-            } else if(type == READ_ONLY.ordinal()) {
-                return READ_ONLY;
-            } else {
-                throw new IllegalArgumentException("In TransactionType enum value" + type);
+        // Cache all values
+        private static final TransactionType[] VALUES = values();
+
+        public static TransactionType fromInt(final int type) {
+            try {
+                return VALUES[type];
+            } catch (IndexOutOfBoundsException e) {
+                throw new IllegalArgumentException("In TransactionType enum value " + type, e);
             }
         }
     }
 
+    private static enum TransactionState {
+        OPEN,
+        READY,
+        CLOSED,
+    }
+
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -104,72 +102,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
             FiniteDuration.create(1, TimeUnit.SECONDS);
 
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue phantomReferenceQueue =
-                                                                  new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionProxyCleanupPhantomReference,
-                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
-                                                                        new ConcurrentHashMap<>();
-
-    /**
-     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
-     * garbage collected. This is used for read-only transactions as they're not explicitly closed
-     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
-     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
-     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
-     * to the old generation space and thus should be cleaned up in a timely manner as the GC
-     * runs on the young generation (eden, swap1...) space much more frequently.
-     */
-    private static class TransactionProxyCleanupPhantomReference
-                                           extends FinalizablePhantomReference<TransactionProxy> {
-
-        private final List<ActorSelection> remoteTransactionActors;
-        private final AtomicBoolean remoteTransactionActorsMB;
-        private final ActorContext actorContext;
-        private final TransactionIdentifier identifier;
-
-        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
-            super(referent, phantomReferenceQueue);
-
-            // Note we need to cache the relevant fields from the TransactionProxy as we can't
-            // have a hard reference to the TransactionProxy instance itself.
-
-            remoteTransactionActors = referent.remoteTransactionActors;
-            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
-            actorContext = referent.actorContext;
-            identifier = referent.getIdentifier();
-        }
-
-        @Override
-        public void finalizeReferent() {
-            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
-                    remoteTransactionActors.size(), identifier);
-
-            phantomReferenceCache.remove(this);
-
-            // Access the memory barrier volatile to ensure all previous updates to the
-            // remoteTransactionActors list are visible to this thread.
-
-            if(remoteTransactionActorsMB.get()) {
-                for(ActorSelection actor : remoteTransactionActors) {
-                    LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
-                }
-            }
-        }
-    }
-
     /**
      * Stores the remote Tx actors for each requested data store path to be used by the
      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
@@ -177,8 +109,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
      * remoteTransactionActors list so they will be visible to the thread accessing the
      * PhantomReference.
      */
-    private List<ActorSelection> remoteTransactionActors;
-    private volatile AtomicBoolean remoteTransactionActorsMB;
+    List<ActorSelection> remoteTransactionActors;
+    volatile AtomicBoolean remoteTransactionActorsMB;
 
     /**
      * Stores the create transaction results per shard.
@@ -186,10 +118,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
 
     private final TransactionType transactionType;
-    private final ActorContext actorContext;
+    final ActorContext actorContext;
     private final String transactionChainId;
     private final SchemaContext schemaContext;
-    private boolean inReadyState;
+    private TransactionState state = TransactionState.OPEN;
 
     private volatile boolean initialized;
     private Semaphore operationLimiter;
@@ -226,8 +158,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if(transactionContext != null) {
-                recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
+            if (transactionContext != null) {
+                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
             }
         }
 
@@ -295,7 +227,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private void checkModificationState() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(!inReadyState,
+        Preconditions.checkState(state == TransactionState.OPEN,
                 "Transaction is sealed - further modifications are not allowed");
     }
 
@@ -328,7 +260,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
-
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
 
@@ -383,26 +314,34 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         });
     }
 
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
+    private boolean seal(final TransactionState newState) {
+        if (state == TransactionState.OPEN) {
+            state = newState;
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-        checkModificationState();
+    @Override
+    public AbstractThreePhaseCommitCohort ready() {
+        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
+                "Read-only transactions cannot be readied");
 
-        inReadyState = true;
+        final boolean success = seal(TransactionState.READY);
+        Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
                     txFutureCallbackMap.size());
 
-        if(txFutureCallbackMap.size() == 0) {
-            onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+        if (txFutureCallbackMap.isEmpty()) {
             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
         throttleOperation(txFutureCallbackMap.size());
 
-        List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
-
+        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
@@ -427,22 +366,22 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        onTransactionReady(cohortFutures);
-
         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
             getIdentifier().toString());
     }
 
-    /**
-     * Method for derived classes to be notified when the transaction has been readied.
-     *
-     * @param cohortFutures the cohort Futures for each shard transaction.
-     */
-    protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-    }
-
     @Override
     public void close() {
+        if (!seal(TransactionState.CLOSED)) {
+            if (state == TransactionState.CLOSED) {
+                // Idempotent no-op as per AutoCloseable recommendation
+                return;
+            }
+
+            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
+                getIdentifier()));
+        }
+
         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
                 @Override
@@ -502,13 +441,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return actorContext;
     }
 
-    /**
-     * Interfaces for transaction operations to be invoked later.
-     */
-    private static interface TransactionOperation {
-        void invoke(TransactionContext transactionContext);
-    }
-
     /**
      * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
      * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
@@ -668,7 +600,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
                 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), operationLimiter);
-            } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+            } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
                 localTransactionContext = createValidTransactionContext(
                         CreateTransactionReply.fromSerializable(response));
             } else {
@@ -733,9 +665,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                     remoteTransactionActors = Lists.newArrayList();
                     remoteTransactionActorsMB = new AtomicBoolean();
 
-                    TransactionProxyCleanupPhantomReference cleanup =
-                            new TransactionProxyCleanupPhantomReference(TransactionProxy.this);
-                    phantomReferenceCache.put(cleanup, cleanup);
+                    TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
                 }
 
                 // Add the actor to the remoteTransactionActors list for access by the
@@ -765,36 +695,4 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             }
         }
     }
-
-    private static class NoOpDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
-        static NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
-
-        private static final ListenableFuture<Void> IMMEDIATE_VOID_SUCCESS =
-                com.google.common.util.concurrent.Futures.immediateFuture(null);
-        private static final ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
-                com.google.common.util.concurrent.Futures.immediateFuture(Boolean.TRUE);
-
-        private NoOpDOMStoreThreePhaseCommitCohort() {
-        }
-
-        @Override
-        public ListenableFuture<Boolean> canCommit() {
-            return IMMEDIATE_BOOLEAN_SUCCESS;
-        }
-
-        @Override
-        public ListenableFuture<Void> preCommit() {
-            return IMMEDIATE_VOID_SUCCESS;
-        }
-
-        @Override
-        public ListenableFuture<Void> abort() {
-            return IMMEDIATE_VOID_SUCCESS;
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            return IMMEDIATE_VOID_SUCCESS;
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java
new file mode 100644 (file)
index 0000000..77834d9
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+final class TransactionProxyCleanupPhantomReference
+                                       extends FinalizablePhantomReference<TransactionProxy> {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionProxyCleanupPhantomReference.class);
+    /**
+     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+     * trickery to clean up its internal thread when the bundle is unloaded.
+     */
+    private static final FinalizableReferenceQueue phantomReferenceQueue =
+                                                                  new FinalizableReferenceQueue();
+
+    /**
+     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+     * and thus becomes eligible for garbage collection.
+     */
+    private static final Map<TransactionProxyCleanupPhantomReference,
+                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
+                                                                        new ConcurrentHashMap<>();
+
+    private final List<ActorSelection> remoteTransactionActors;
+    private final AtomicBoolean remoteTransactionActorsMB;
+    private final ActorContext actorContext;
+    private final TransactionIdentifier identifier;
+
+    private TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+        super(referent, phantomReferenceQueue);
+
+        // Note we need to cache the relevant fields from the TransactionProxy as we can't
+        // have a hard reference to the TransactionProxy instance itself.
+
+        remoteTransactionActors = referent.remoteTransactionActors;
+        remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+        actorContext = referent.actorContext;
+        identifier = referent.getIdentifier();
+    }
+
+    static void track(TransactionProxy referent) {
+        final TransactionProxyCleanupPhantomReference ret = new TransactionProxyCleanupPhantomReference(referent);
+        phantomReferenceCache.put(ret, ret);
+    }
+
+    @Override
+    public void finalizeReferent() {
+        LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+                remoteTransactionActors.size(), identifier);
+
+        phantomReferenceCache.remove(this);
+
+        // Access the memory barrier volatile to ensure all previous updates to the
+        // remoteTransactionActors list are visible to this thread.
+
+        if(remoteTransactionActorsMB.get()) {
+            for(ActorSelection actor : remoteTransactionActors) {
+                LOG.trace("Sending CloseTransaction to {}", actor);
+                actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
+            }
+        }
+    }
+}
\ No newline at end of file
index 3b4a190..e131354 100644 (file)
@@ -33,7 +33,7 @@ public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-                identifier, recordedOperationFutures.size());
+            getIdentifier(), recordedOperationCount());
 
         // Send the remaining batched modifications if any.
 
index ccfb329..c345033 100644 (file)
@@ -45,26 +45,26 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        recordedOperationFutures.add(executeOperationAsync(
+        recordOperationFuture(executeOperationAsync(
                 new DeleteData(path, getRemoteTransactionVersion())));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordedOperationFutures.add(executeOperationAsync(
+        recordOperationFuture(executeOperationAsync(
                 new MergeData(path, data, getRemoteTransactionVersion())));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordedOperationFutures.add(executeOperationAsync(
+        recordOperationFuture(executeOperationAsync(
                 new WriteData(path, data, getRemoteTransactionVersion())));
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-                identifier, recordedOperationFutures.size());
+            getIdentifier(), recordedOperationCount());
 
         // Send the ReadyTransaction message to the Tx actor.
 
index 265ec59..a247100 100644 (file)
@@ -10,7 +10,6 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
@@ -164,34 +163,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         testReadWithExceptionOnInitialCreateTransaction(new TestException());
     }
 
-    @Test(expected = TestException.class)
-    public void testReadWithPriorRecordingOperationFailure() throws Throwable {
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
-                when(mockActorContext).getDatastoreContext();
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedReadData());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        try {
-            propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
-        } finally {
-            verify(mockActorContext, times(0)).executeOperationAsync(
-                    eq(actorSelection(actorRef)), eqSerializedReadData());
-        }
-    }
-
     @Test
     public void testReadWithPriorRecordingOperationSuccessful() throws Throwable {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
@@ -301,35 +272,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
 
-    @Test(expected = TestException.class)
-    public void testExistsWithPriorRecordingOperationFailure() throws Throwable {
-        doReturn(dataStoreContextBuilder.shardBatchedModificationCount(2).build()).
-                when(mockActorContext).getDatastoreContext();
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), eqSerializedDataExists());
-
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE);
-
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
-        transactionProxy.delete(TestModel.TEST_PATH);
-
-        try {
-            propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
-        } finally {
-            verify(mockActorContext, times(0)).executeOperationAsync(
-                    eq(actorSelection(actorRef)), eqSerializedDataExists());
-        }
-    }
-
     @Test
     public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
index f404c06..8b3a830 100644 (file)
@@ -6,6 +6,7 @@
  */
 package org.opendaylight.controller.md.sal.dom.api;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.Iterator;
@@ -102,4 +103,9 @@ public final class DOMDataTreeIdentifier implements Immutable, Path<DOMDataTreeI
 
         return oi.hasNext() ? -1 : 0;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("datastore", datastoreType).add("root", rootIdentifier).toString();
+    }
 }
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/ForwardingDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/ForwardingDOMStoreThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..4c817dd
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} implementations,
+ * which forward most of their functionality to a backend {@link #delegate()}.
+ */
+@Beta
+public abstract class ForwardingDOMStoreThreePhaseCommitCohort extends ForwardingObject implements DOMStoreThreePhaseCommitCohort {
+    @Override
+    protected abstract DOMStoreThreePhaseCommitCohort delegate();
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return delegate().canCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegate().preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegate().abort();
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        return delegate().commit();
+    }
+}
index d7d547d..1b9a37d 100644 (file)
@@ -8,8 +8,8 @@
                     </type>
                     <name>XSQL</name>
                     <data-broker>
-                       <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
-                       <name>binding-data-broker</name>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+                        <name>binding-data-broker</name>
                     </data-broker>
                     <async-data-broker>
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
index cc92b48..2cb2e7b 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.odl.xsql;
 
 import java.sql.Connection;
@@ -10,7 +17,9 @@ import java.util.Properties;
 import java.util.logging.Logger;
 
 import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCConnection;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class JDBCDriver implements Driver {
 
     public static JDBCDriver drv = new JDBCDriver();
index 2f28052..938d25e 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.InputStream;
@@ -21,7 +28,9 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Calendar;
 import java.util.Map;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class TablesResultSet implements ResultSet {
 
     private String tables[] = null;
index a5658cc..05f6522 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.File;
@@ -24,7 +31,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLAdapter extends Thread implements SchemaContextListener {
 
     private static final int SLEEP = 10000;
@@ -51,6 +60,7 @@ public class XSQLAdapter extends Thread implements SchemaContextListener {
     private String pinningFile;
     private ServerSocket serverSocket = null;
     private DOMDataBroker domDataBroker = null;
+    private static final String REFERENCE_FIELD_NAME = "reference";
 
     private XSQLAdapter() {
         XSQLAdapter.log("Starting Adapter");
@@ -152,28 +162,18 @@ public class XSQLAdapter extends Thread implements SchemaContextListener {
                 List<Object> result = new LinkedList<Object>();
                 YangInstanceIdentifier instanceIdentifier = YangInstanceIdentifier
                         .builder()
-                        .node(XSQLODLUtils.getPath(table.getODLNode()).get(0))
+                        .node(XSQLODLUtils.getPath(table.getFirstFromSchemaNodes()).get(0))
                         .toInstance();
                 DOMDataReadTransaction t = this.domDataBroker
                         .newReadOnlyTransaction();
                 Object node = t.read(type,
                         instanceIdentifier).get();
 
-                node = XSQLODLUtils.get(node, "reference");
+                node = XSQLODLUtils.get(node, REFERENCE_FIELD_NAME);
                 if (node == null) {
                     return result;
                 }
-
-                Map<?, ?> children = XSQLODLUtils.getChildren(node);
-                for (Object c : children.values()) {
-                    result.add(c);
-                    /* I don't remember why i did this... possibly to prevent different siblings queried together
-                    Map<?, ?> sons = XSQLODLUtils.getChildren(c);
-                    for (Object child : sons.values()) {
-                        result.add(child);
-                    }*/
-                }
-
+                result.add(node);
                 return result;
             } catch (Exception err) {
                 XSQLAdapter.log(err);
index a9c0f69..7615296 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.DataInputStream;
@@ -23,7 +30,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLBluePrint implements DatabaseMetaData, Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -203,15 +212,23 @@ public class XSQLBluePrint implements DatabaseMetaData, Serializable {
         return result;
     }
 
-    public void addToBluePrintCache(XSQLBluePrintNode blNode) {
-        this.tableNameToBluePrint.put(blNode.getBluePrintNodeName(), blNode);
-        Map<String, XSQLBluePrintNode> map = this.odlNameToBluePrint.get(blNode
-                .getODLTableName());
-        if (map == null) {
-            map = new HashMap<String, XSQLBluePrintNode>();
-            this.odlNameToBluePrint.put(blNode.getODLTableName(), map);
+    public XSQLBluePrintNode addToBluePrintCache(XSQLBluePrintNode blNode,XSQLBluePrintNode parent) {
+        XSQLBluePrintNode existingNode = this.tableNameToBluePrint.get(blNode.getBluePrintNodeName());
+        if(existingNode!=null){
+            existingNode.mergeAugmentation(blNode);
+            return existingNode;
+        }else{
+            this.tableNameToBluePrint.put(blNode.getBluePrintNodeName(), blNode);
+            Map<String, XSQLBluePrintNode> map = this.odlNameToBluePrint.get(blNode.getODLTableName());
+            if (map == null) {
+                map = new HashMap<String, XSQLBluePrintNode>();
+                this.odlNameToBluePrint.put(blNode.getODLTableName(), map);
+            }
+            map.put(blNode.getBluePrintNodeName(), blNode);
+            if(parent!=null)
+                parent.addChild(blNode);
+            return blNode;
         }
-        map.put(blNode.getBluePrintNodeName(), blNode);
     }
 
     public Class<?> getGenericType(ParameterizedType type) {
index 4a56545..d3cd91a 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.io.Serializable;
@@ -8,6 +15,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLBluePrintNode implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -24,12 +34,25 @@ public class XSQLBluePrintNode implements Serializable {
     private Set<XSQLColumn> columns = new HashSet<XSQLColumn>();
     private Map<String, XSQLColumn> origNameToColumn = new HashMap<String, XSQLColumn>();
 
-    private transient Object odlNode = null;
+    private transient Object[] odlSchemaNodes = null;
     private boolean module = false;
     private String bluePrintTableName = null;
     private String odlTableName = null;
     private String origName = null;
 
+    public void mergeAugmentation(XSQLBluePrintNode aug) {
+        this.relations.addAll(aug.relations);
+        this.inheritingNodes.addAll(aug.inheritingNodes);
+        this.children.addAll(aug.children);
+        this.columns.addAll(aug.columns);
+        this.origNameToColumn.putAll(aug.origNameToColumn);
+        if (aug.odlSchemaNodes != null) {
+            for (Object sn : aug.odlSchemaNodes) {
+                addToSchemaNodes(sn);
+            }
+        }
+    }
+
     public XSQLBluePrintNode(String name, String _origName, int _level) {
         this.level = _level;
         this.odlTableName = name;
@@ -46,12 +69,32 @@ public class XSQLBluePrintNode implements Serializable {
 
     public XSQLBluePrintNode(Object _odlNode, int _level,
             XSQLBluePrintNode _parent) {
-        this.odlNode = _odlNode;
+        addToSchemaNodes(_odlNode);
         this.level = _level;
         this.module = XSQLODLUtils.isModule(_odlNode);
         this.parent = _parent;
         this.bluePrintTableName = XSQLODLUtils.getBluePrintName(_odlNode);
-        this.odlTableName = XSQLODLUtils.getODLNodeName(this.odlNode);
+        this.odlTableName = XSQLODLUtils
+                .getODLNodeName(getFirstFromSchemaNodes());
+    }
+
+    private void addToSchemaNodes(Object schemaObject) {
+        if (this.odlSchemaNodes == null)
+            this.odlSchemaNodes = new Object[1];
+        else {
+            Object[] temp = new Object[this.odlSchemaNodes.length + 1];
+            System.arraycopy(this.odlSchemaNodes, 0, temp, 0,
+                    this.odlSchemaNodes.length);
+            this.odlSchemaNodes = temp;
+        }
+        this.odlSchemaNodes[this.odlSchemaNodes.length - 1] = schemaObject;
+    }
+
+    public Object getFirstFromSchemaNodes() {
+        if (this.odlSchemaNodes == null) {
+            return null;
+        }
+        return this.odlSchemaNodes[0];
     }
 
     public String getOrigName() {
@@ -72,16 +115,13 @@ public class XSQLBluePrintNode implements Serializable {
 
     public String getODLTableName() {
         if (this.odlTableName == null) {
-            this.odlTableName = XSQLODLUtils.getODLNodeName(this.odlNode);
+            this.odlTableName = XSQLODLUtils
+                    .getODLNodeName(getFirstFromSchemaNodes());
         }
         return this.odlTableName;
     }
 
-    public Object getODLNode() {
-        return this.odlNode;
-    }
-
-    public void AddChild(XSQLBluePrintNode ch) {
+    public void addChild(XSQLBluePrintNode ch) {
         this.children.add(ch);
     }
 
@@ -218,7 +258,7 @@ public class XSQLBluePrintNode implements Serializable {
         if (myInterfaceName != null) {
             return myInterfaceName;
         }
-        if (odlNode != null) {
+        if (this.odlSchemaNodes != null) {
             return getBluePrintNodeName();
         }
         if (odlTableName != null) {
@@ -238,15 +278,14 @@ public class XSQLBluePrintNode implements Serializable {
     @Override
     public boolean equals(Object obj) {
         XSQLBluePrintNode other = (XSQLBluePrintNode) obj;
-        if (odlNode != null) {
+        if (this.odlSchemaNodes != null) {
             return getBluePrintNodeName().equals(other.getBluePrintNodeName());
         } else if (this.odlTableName == null && other.odlTableName != null) {
             return false;
         }
         if (this.odlTableName != null && other.odlTableName == null) {
             return false;
-        }
-        else {
+        } else {
             return this.odlTableName.equals(other.odlTableName);
         }
     }
@@ -255,7 +294,7 @@ public class XSQLBluePrintNode implements Serializable {
     public int hashCode() {
         if (myInterfaceString != null) {
             return myInterfaceString.hashCode();
-        } else if (odlNode != null) {
+        } else if (this.odlSchemaNodes != null) {
             return bluePrintTableName.hashCode();
         }
         return 0;
index 17b8ae5..16a33b3 100644 (file)
@@ -1,15 +1,27 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
+import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
@@ -20,7 +32,9 @@ import org.opendaylight.yangtools.yang.model.util.Uint16;
 import org.opendaylight.yangtools.yang.model.util.Uint32;
 import org.opendaylight.yangtools.yang.model.util.Uint64;
 import org.opendaylight.yangtools.yang.model.util.Uint8;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLODLUtils {
 
     private static Map<Class<?>, Class<?>> types =
@@ -113,7 +127,7 @@ public class XSQLODLUtils {
 
     public static boolean createOpenDaylightCache(XSQLBluePrint bluePrint,Object module) {
         XSQLBluePrintNode node = new XSQLBluePrintNode(module, 0,null);
-        bluePrint.addToBluePrintCache(node);
+        bluePrint.addToBluePrintCache(node,null);
         collectODL(bluePrint, node, ((Module) module).getChildNodes(), 1);
         return true;
     }
@@ -124,20 +138,30 @@ public class XSQLODLUtils {
             return;
         }
         for (DataSchemaNode n : nodes) {
-            if (n instanceof DataNodeContainer /*|| n instanceof LeafListSchemaNode*/
-                || n instanceof ListSchemaNode) {
+            if (n instanceof DataNodeContainer) {
                 XSQLBluePrintNode bn = new XSQLBluePrintNode(n, level,parent);
-                bluePrint.addToBluePrintCache(bn);
-                parent.AddChild(bn);
-                if (n instanceof DataNodeContainer) {
+                bn = bluePrint.addToBluePrintCache(bn,parent);
+                if (n instanceof ListSchemaNode) {
                     level++;
-                    collectODL(bluePrint, bn,
-                        ((DataNodeContainer) n).getChildNodes(), level);
+                    collectODL(bluePrint, bn,((ListSchemaNode) n).getChildNodes(), level);
+                    Set<AugmentationSchema> s = ((ListSchemaNode)n).getAvailableAugmentations();
+                    if(s!=null){
+                        for(AugmentationSchema as:s){
+                            collectODL(bluePrint, bn,as.getChildNodes(), level);
+                        }
+                    }
                     level--;
-                } else if (n instanceof ListSchemaNode) {
+                }else{
                     level++;
-                    collectODL(bluePrint, bn,
-                        ((ListSchemaNode) n).getChildNodes(), level);
+                    collectODL(bluePrint, bn,((DataNodeContainer) n).getChildNodes(), level);
+                    if(n instanceof ContainerSchemaNode){
+                       Set<AugmentationSchema> s = ((ContainerSchemaNode)n).getAvailableAugmentations();
+                       if(s!=null){
+                           for(AugmentationSchema as:s){
+                               collectODL(bluePrint, bn,as.getChildNodes(), level);
+                           }
+                       }
+                    }
                     level--;
                 }
             } else {
@@ -189,7 +213,7 @@ public class XSQLODLUtils {
             Field f = findField(c, name);
             return f.get(o);
         } catch (Exception err) {
-            XSQLAdapter.log(err);
+            //XSQLAdapter.log(err);
         }
         return null;
     }
@@ -207,6 +231,21 @@ public class XSQLODLUtils {
         return (Map<?, ?>) get(o, "children");
     }
 
+    public static Collection<?> getChildrenCollection(Object o) {
+        Object value = get(o, "children");
+        if(value==null)
+            return Collections.emptyList();
+        if(value instanceof Map)
+            return ((Map<?,?>)value).values();
+        else
+        if(value instanceof Collection){
+            return (Collection<?>)value;
+        }else{
+            XSQLAdapter.log("Unknown Child Value Type="+value.getClass().getName());
+            return new ArrayList();
+        }
+    }
+
     public static Object getValue(Object o) {
         return get(o, "value");
     }
index 6689908..ea16e72 100644 (file)
@@ -1,10 +1,16 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.md.sal.dom.xsql.jdbc;
 
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.Serializable;
 import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
 import java.math.BigDecimal;
 import java.net.URL;
 import java.sql.Array;
@@ -24,6 +30,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -32,14 +39,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrint;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrintNode;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLColumn;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLCriteria;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLODLUtils;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
 
-public class JDBCResultSet implements Serializable, ResultSet,
-        ResultSetMetaData {
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
+public class JDBCResultSet implements Serializable, ResultSet, ResultSetMetaData {
     private static final long serialVersionUID = -7450200738431047057L;
     private static final ClassLoader CLASS_LOADER = JDBCResultSet.class.getClassLoader();
     private static final Class<?>[] PROXY_INTERFACES = new Class[] { ResultSet.class };
@@ -57,27 +68,28 @@ public class JDBCResultSet implements Serializable, ResultSet,
     private Map<String, Map<XSQLColumn, List<XSQLCriteria>>> criteria = new ConcurrentHashMap<String, Map<XSQLColumn, List<XSQLCriteria>>>();
     private Exception err = null;
     private List<Record> EMPTY_RESULT = new LinkedList<Record>();
-    private transient Map<String,JDBCResultSet> subQueries = new HashMap<String,JDBCResultSet>();
+    private transient Map<String, JDBCResultSet> subQueries = new HashMap<String, JDBCResultSet>();
 
     public ResultSet getProxy() {
-         return (ResultSet) Proxy.newProxyInstance(CLASS_LOADER, PROXY_INTERFACES, new JDBCProxy(this));
+        return this;
+        //return (ResultSet) Proxy.newProxyInstance(CLASS_LOADER, PROXY_INTERFACES, new JDBCProxy(this));
     }
 
     public void setSQL(String _sql) {
         this.sql = _sql;
     }
 
-    public JDBCResultSet addSubQuery(String _sql,String logicalName) {
+    public JDBCResultSet addSubQuery(String _sql, String logicalName) {
         if (subQueries == null) {
-            subQueries = new HashMap<String,JDBCResultSet>();
+            subQueries = new HashMap<String, JDBCResultSet>();
         }
         JDBCResultSet rs = new JDBCResultSet(_sql);
-        this.subQueries.put(logicalName,rs);
+        this.subQueries.put(logicalName, rs);
         return rs;
     }
 
-    public Map<String,JDBCResultSet> getSubQueries() {
-        if (this.subQueries==null) {
+    public Map<String, JDBCResultSet> getSubQueries() {
+        if (this.subQueries == null) {
             this.subQueries = new HashMap<>();
         }
         return this.subQueries;
@@ -112,7 +124,8 @@ public class JDBCResultSet implements Serializable, ResultSet,
         }
     }
 
-    public int isObjectFitCriteria(Map<String, Object> objValues, String tableName) {
+    public int isObjectFitCriteria(Map<String, Object> objValues,
+            String tableName) {
         Map<XSQLColumn, List<XSQLCriteria>> tblCriteria = criteria
                 .get(tableName);
         if (tblCriteria == null) {
@@ -289,19 +302,41 @@ public class JDBCResultSet implements Serializable, ResultSet,
     }
 
     public static class Record {
+        // The map container the Attribute 2 the attribute value
         public Map<String, Object> data = new HashMap<>();
+        // The Element Object (Possibly some kind of NormalizedNode
         public Object element = null;
+        // Does this record fit the criteria
+        // In case of a list property, we first collect the list and only then
+        // we
+        // we decide which list item should be included or not.
+        public boolean fitCriteria = true;
 
         public Map<String, Object> getRecord() {
             return this.data;
         }
     }
 
-    private Map<String, Object> collectColumnValues(Object node, XSQLBluePrintNode bpn) {
-        Map<?, ?> subChildren = XSQLODLUtils.getChildren(node);
-        Map<String, Object> result = new HashMap<>();
-        for (Object stc : subChildren.values()) {
-            if (stc.getClass().getName().endsWith("ImmutableAugmentationNode")) {
+    public static class RecordsContainer {
+        public List<Record> records = new LinkedList<Record>();
+        public List<Record> fitRecords = new LinkedList<Record>();
+        public Object currentObject = null;
+    }
+
+    private void collectColumnValues(RecordsContainer rContainer,
+            XSQLBluePrintNode bpn) {
+        Collection<?> subChildren = XSQLODLUtils
+                .getChildrenCollection(rContainer.currentObject);
+        Record r = new Record();
+        r.element = rContainer.currentObject;
+        for (Object stc : subChildren) {
+            if (stc.getClass().getName()
+                    .endsWith("ImmutableUnkeyedListEntryNode")) {
+                r.fitCriteria = false;
+                rContainer.currentObject = stc;
+                collectColumnValues(rContainer, bpn);
+            } else if (stc.getClass().getName()
+                    .endsWith("ImmutableAugmentationNode")) {
                 Map<?, ?> values = XSQLODLUtils.getChildren(stc);
                 for (Object key : values.keySet()) {
                     Object val = values.get(key);
@@ -309,7 +344,7 @@ public class JDBCResultSet implements Serializable, ResultSet,
                         Object value = XSQLODLUtils.getValue(val);
                         String k = XSQLODLUtils.getNodeName(val);
                         if (value != null) {
-                            result.put(bpn.getBluePrintNodeName() + "." + k,
+                            r.data.put(bpn.getBluePrintNodeName() + "." + k,
                                     value.toString());
                         }
                     }
@@ -318,16 +353,17 @@ public class JDBCResultSet implements Serializable, ResultSet,
                 String k = XSQLODLUtils.getNodeName(stc);
                 Object value = XSQLODLUtils.getValue(stc);
                 if (value != null) {
-                    result.put(bpn.getBluePrintNodeName() + "." + k,
+                    r.data.put(bpn.getBluePrintNodeName() + "." + k,
                             value.toString());
                 }
             }
         }
-        return result;
+        if (r.fitCriteria) {
+            rContainer.records.add(r);
+        }
     }
 
-    private void addToData(Record rec, XSQLBluePrintNode bpn,
-            XSQLBluePrint bluePrint, Map<String, Object> fullRecord) {
+    private void addToData(Record rec, XSQLBluePrintNode bpn,XSQLBluePrint bluePrint, Map<String, Object> fullRecord) {
         XSQLBluePrintNode eNodes[] = bluePrint
                 .getBluePrintNodeByODLTableName(XSQLODLUtils
                         .getNodeIdentiofier(rec.element));
@@ -386,6 +422,11 @@ public class JDBCResultSet implements Serializable, ResultSet,
 
             String odlNodeName = XSQLODLUtils.getNodeIdentiofier(child);
             if (odlNodeName == null) {
+                if (child instanceof DataContainerNode) {
+                    List<Object> augChidlren = getChildren(child, tableName,
+                            bluePrint);
+                    result.addAll(augChidlren);
+                }
                 continue;
             }
 
@@ -407,7 +448,10 @@ public class JDBCResultSet implements Serializable, ResultSet,
                 continue;
             }
 
-            if (child.getClass().getName().endsWith("ImmutableContainerNode")) {
+            if (child.getClass().getName().endsWith("ImmutableUnkeyedListNode")) {
+                result.add(child);
+            } else if (child.getClass().getName()
+                    .endsWith("ImmutableContainerNode")) {
                 result.add(child);
             } else if (child.getClass().getName()
                     .endsWith("ImmutableAugmentationNode")) {
@@ -420,52 +464,76 @@ public class JDBCResultSet implements Serializable, ResultSet,
                 }
             } else if (child.getClass().getName().endsWith("ImmutableMapNode")) {
                 result.addAll(XSQLODLUtils.getMChildren(child));
+            } else {
+                XSQLAdapter.log("Missed Node Data OF Type="
+                        + child.getClass().getName());
             }
         }
         return result;
     }
 
-    public List<Record> addRecords(Object element, XSQLBluePrintNode node,boolean root, String tableName, XSQLBluePrint bluePrint) {
+    public List<Record> addRecords(Object element, XSQLBluePrintNode node,
+            boolean root, String tableName, XSQLBluePrint bluePrint) {
         List<Record> result = new LinkedList<Record>();
-        //In case this is a sibling to the requested table, the elenment type
-        //won't be in the path of the leaf node
-        if(node==null){
-            return result;
-        }
         String nodeID = XSQLODLUtils.getNodeIdentiofier(element);
         if (node.getODLTableName().equals(nodeID)) {
-            XSQLBluePrintNode bluePrintNode = bluePrint.getBluePrintNodeByODLTableName(nodeID)[0];
-            Record rec = new Record();
-            rec.element = element;
-            XSQLBluePrintNode bpn = this.tablesInQueryMap.get(bluePrintNode.getBluePrintNodeName());
-            if (this.criteria.containsKey(bluePrintNode.getBluePrintNodeName()) || bpn != null) {
-                Map<String, Object> allKeyValues = collectColumnValues(element, bpn);
-                if (!(isObjectFitCriteria(allKeyValues,
-                        bpn.getBluePrintNodeName()) == 1)) {
-                    return EMPTY_RESULT;
+            XSQLBluePrintNode bluePrintNode = bluePrint
+                    .getBluePrintNodeByODLTableName(nodeID)[0];
+            RecordsContainer rContainer = new RecordsContainer();
+            rContainer.currentObject = element;
+            XSQLBluePrintNode bpn = this.tablesInQueryMap.get(bluePrintNode
+                    .getBluePrintNodeName());
+            if (this.criteria.containsKey(bluePrintNode.getBluePrintNodeName())
+                    || bpn != null) {
+                collectColumnValues(rContainer, bpn);
+                for (Record r : rContainer.records) {
+                    if (!(isObjectFitCriteria(r.data,
+                            bpn.getBluePrintNodeName()) == 1)) {
+                        r.fitCriteria = false;
+                    }
+                    if (r.fitCriteria) {
+                        Record rec = new Record();
+                        rec.element = r.element;
+                        addToData(rec, bpn, bluePrint, r.data);
+                        rContainer.fitRecords.add(rec);
+                    }
                 }
-                addToData(rec, bpn, bluePrint, allKeyValues);
+                if (rContainer.fitRecords.isEmpty())
+                    return EMPTY_RESULT;
             }
-            if (root) {
-                addRecord(rec.data);
+            if (rContainer.records.isEmpty()) {
+                Record rec = new Record();
+                rec.element = rContainer.currentObject;
+                if (root) {
+                    addRecord(rec.data);
+                } else {
+                    result.add(rec);
+                }
             } else {
-                result.add(rec);
+                for (Record rec : rContainer.fitRecords) {
+                    if (root) {
+                        addRecord(rec.data);
+                    } else {
+                        result.add(rec);
+                    }
+                }
             }
             return result;
         }
 
         XSQLBluePrintNode parent = node.getParent();
-        List<Record> subRecords = addRecords(element, parent, false, tableName,bluePrint);
+        List<Record> subRecords = addRecords(element, parent, false, tableName,
+                bluePrint);
         for (Record subRec : subRecords) {
             List<Object> subO = getChildren(subRec.element, tableName,
                     bluePrint);
             if (subO != null) {
                 for (Object subData : subO) {
-                    Record rec = new Record();
-                    rec.element = subData;
-                    rec.data.putAll(subRec.data);
+                    RecordsContainer rContainer = new RecordsContainer();
+                    rContainer.currentObject = subData;
 
-                    String recID = XSQLODLUtils.getNodeIdentiofier(rec.element);
+                    String recID = XSQLODLUtils
+                            .getNodeIdentiofier(rContainer.currentObject);
                     XSQLBluePrintNode eNodes[] = bluePrint
                             .getBluePrintNodeByODLTableName(recID);
                     XSQLBluePrintNode bpn = null;
@@ -476,18 +544,24 @@ public class JDBCResultSet implements Serializable, ResultSet,
                             break;
                         }
                     }
-                    boolean isObjectInCriteria = true;
                     if (bpn != null) {
-                        Map<String, Object> allKeyValues = collectColumnValues(rec.element, bpn);
-                        if ((isObjectFitCriteria(allKeyValues,
-                                bpn.getBluePrintNodeName()) == 1)) {
-                            addToData(rec, bpn, bluePrint, allKeyValues);
-                        } else {
-                            isObjectInCriteria = false;
+                        collectColumnValues(rContainer, bpn);
+                        for (Record r : rContainer.records) {
+                            if ((isObjectFitCriteria(r.data,
+                                    bpn.getBluePrintNodeName()) == 1)) {
+                                Record rec = new Record();
+                                rec.data.putAll(subRec.data);
+                                rec.element = r.element;
+                                addToData(rec, bpn, bluePrint, r.data);
+                            } else {
+                                r.fitCriteria = false;
+                            }
                         }
                     }
-
-                    if (isObjectInCriteria) {
+                    if (rContainer.records.isEmpty()) {
+                        Record rec = new Record();
+                        rec.data.putAll(subRec.data);
+                        rec.element = rContainer.currentObject;
                         if (root) {
                             if (!rec.data.isEmpty()) {
                                 addRecord(rec.data);
@@ -495,11 +569,23 @@ public class JDBCResultSet implements Serializable, ResultSet,
                         } else {
                             result.add(rec);
                         }
+                    } else {
+                        for (Record r : rContainer.records) {
+                            r.data.putAll(subRec.data);
+                            if (r.fitCriteria) {
+                                if (root) {
+                                    if (!r.data.isEmpty()) {
+                                        addRecord(r.data);
+                                    }
+                                } else {
+                                    result.add(r);
+                                }
+                            }
+                        }
                     }
                 }
             }
         }
-
         return result;
     }
 
index 7b2733c..31941e4 100644 (file)
@@ -46,8 +46,7 @@ public class JDBCServer extends Thread {
         }
     }
 
-    public static void execute(JDBCResultSet rs, XSQLAdapter adapter)
-            throws SQLException {
+    public static void execute(JDBCResultSet rs, XSQLAdapter adapter)throws SQLException {
         if(rs.getSQL().toLowerCase().trim().equals("select 1")){
             rs.setFinished(true);
             return;
index cde0157..29a1945 100644 (file)
@@ -1,37 +1,47 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.xsql;
 
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQL;
 import org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.XSQLBuilder;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Created by root on 6/26/14.
- */
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLProvider implements AutoCloseable {
 
     public static final InstanceIdentifier<XSQL> ID = InstanceIdentifier.builder(XSQL.class).build();
-    private static final Logger LOG = LoggerFactory.getLogger(XSQLProvider.class);
+    //public static final InstanceIdentifier<SalTest> ID2 = InstanceIdentifier.builder(SalTest.class).build();
 
     public void close() {
     }
 
-    public XSQL buildXSQL(DataProviderService dps) {
+    public XSQL buildXSQL(DataBroker dps) {
+            XSQLAdapter.log("Building XSL...");
             XSQLBuilder builder = new XSQLBuilder();
             builder.setPort("34343");
             XSQL xsql = builder.build();
             try {
                 if (dps != null) {
-                    final DataModificationTransaction t = dps.beginTransaction();
-                    t.removeOperationalData(ID);
-                    t.putOperationalData(ID,xsql);
-                    t.commit().get();
+                    XSQLAdapter.log("Starting TRansaction...");
+                    WriteTransaction t = dps.newReadWriteTransaction();
+                    t.delete(LogicalDatastoreType.OPERATIONAL, ID);
+                    t.put(LogicalDatastoreType.OPERATIONAL,ID,xsql);
+                    XSQLAdapter.log("Submitting...");
+                    t.submit();
                 }
             } catch (Exception e) {
-                LOG.warn("Failed to update XSQL port status, ", e);
+                XSQLAdapter.log(e);
             }
         return xsql;
     }
index c8a5a85..a669345 100644 (file)
@@ -1,9 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626;
 
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.xsql.XSQLProvider;
-
+/**
+ * @author Sharon Aicler(saichler@gmail.com)
+ **/
 public class XSQLModule extends org.opendaylight.yang.gen.v1.http.netconfcentral.org.ns.xsql.rev140626.AbstractXSQLModule {
+    private static final long SLEEP_TIME_BEFORE_CREATING_TRANSACTION = 10000;
     public XSQLModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
@@ -22,9 +32,14 @@ public class XSQLModule extends org.opendaylight.yang.gen.v1.http.netconfcentral
         XSQLAdapter xsqlAdapter = XSQLAdapter.getInstance();
         getSchemaServiceDependency().registerSchemaContextListener(xsqlAdapter);
         xsqlAdapter.setDataBroker(getAsyncDataBrokerDependency());
-        XSQLProvider p = new XSQLProvider();
-        //p.buildXSQL(getDataBrokerDependency());
+        final XSQLProvider p = new XSQLProvider();
+        Runnable runthis = new Runnable() {
+            @Override
+            public void run() {
+                try{Thread.sleep(SLEEP_TIME_BEFORE_CREATING_TRANSACTION);}catch(Exception err){}
+                p.buildXSQL(getDataBrokerDependency());
+            }
+        };
         return p;
     }
-
 }
index d7d547d..1b9a37d 100644 (file)
@@ -8,8 +8,8 @@
                     </type>
                     <name>XSQL</name>
                     <data-broker>
-                       <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-data-broker</type>
-                       <name>binding-data-broker</name>
+                        <type xmlns:binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">binding:binding-async-data-broker</type>
+                        <name>binding-data-broker</name>
                     </data-broker>
                     <async-data-broker>
                         <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-async-data-broker</type>
index 0437e10..f0f5269 100644 (file)
@@ -37,14 +37,14 @@ module XSQL{
         case XSQL {
             when "/config:modules/config:module/config:type = 'XSQL'";
 
-                       container data-broker {
+            container data-broker {
                 uses config:service-ref {
                     refine type {
                         mandatory false;
-                        config:required-identity mdsal:binding-data-broker;
+                        config:required-identity mdsal:binding-async-data-broker;
                     }
                 }
-            }         
+            }
 
             container async-data-broker {
                 uses config:service-ref {
index 8a6b184..e3f5fbb 100644 (file)
@@ -2,18 +2,29 @@ package org.opendaylight.xsql.test;
 
 import java.io.InputStream;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLAdapter;
 import org.opendaylight.controller.md.sal.dom.xsql.XSQLBluePrint;
 import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCResultSet;
 import org.opendaylight.controller.md.sal.dom.xsql.jdbc.JDBCServer;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 public class XSQLTest {
-
-    XSQLBluePrint bluePrint = null;
+    private static final String DATASTORE_TEST_YANG = "/sal-persisted-dom-test.yang";
+    private XSQLBluePrint bluePrint = null;
+    //private static SchemaContext schemaContext = null;
+    @BeforeClass
+    public static void loadSchemaContext(){
+        //schemaContext = createTestContext();
+    }
 
     @Before
     public void before() {
@@ -167,4 +178,18 @@ public class XSQLTest {
         System.out.print("*** XSQL Tests -");
         System.out.println(str);
     }
+
+    public static final InputStream getDatastoreTestInputStream() {
+        return getInputStream(DATASTORE_TEST_YANG);
+    }
+
+    private static InputStream getInputStream(final String resourceName) {
+        return XSQLTest.class.getResourceAsStream(DATASTORE_TEST_YANG);
+    }
+
+    public static SchemaContext createTestContext() {
+        YangParserImpl parser = new YangParserImpl();
+        Set<Module> modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream()));
+        return parser.resolveSchemaContext(modules);
+    }
 }
index b6b34ac..152f787 100644 (file)
Binary files a/opendaylight/md-sal/sal-dom-xsql/src/test/resources/BluePrintCache.dat and b/opendaylight/md-sal/sal-dom-xsql/src/test/resources/BluePrintCache.dat differ
index 5b0f739..05e3d5c 100644 (file)
@@ -12,13 +12,14 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
 
-final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
     private final SnapshotBackedWriteTransaction transaction;
     private final DOMStoreThreePhaseCommitCohort delegate;
     private final DOMStoreTransactionChainImpl txChain;
 
-    protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+    ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
             final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
         this.transaction = Preconditions.checkNotNull(transaction);
         this.delegate = Preconditions.checkNotNull(delegate);
@@ -26,23 +27,13 @@ final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCoho
     }
 
     @Override
-    public ListenableFuture<Boolean> canCommit() {
-        return delegate.canCommit();
-    }
-
-    @Override
-    public ListenableFuture<Void> preCommit() {
-        return delegate.preCommit();
-    }
-
-    @Override
-    public ListenableFuture<Void> abort() {
-        return delegate.abort();
+    protected DOMStoreThreePhaseCommitCohort delegate() {
+        return delegate;
     }
 
     @Override
     public ListenableFuture<Void> commit() {
-        ListenableFuture<Void> commitFuture = delegate.commit();
+        ListenableFuture<Void> commitFuture = super.commit();
         Futures.addCallback(commitFuture, new FutureCallback<Void>() {
             @Override
             public void onFailure(final Throwable t) {
@@ -56,4 +47,5 @@ final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCoho
         });
         return commitFuture;
     }
+
 }
\ No newline at end of file
index 999fb91..feb1b66 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invo
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChan
 
     @Override
     protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
-        final DataTreeCandidate candidate = new SimpleDataTreeCandidate(path, node);
+        final DataTreeCandidate candidate = new DefaultDataTreeCandidate(path, node);
 
         for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
             LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations);
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java
deleted file mode 100644 (file)
index 701841c..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-
-final class SimpleDataTreeCandidate implements DataTreeCandidate {
-    private final YangInstanceIdentifier rootPath;
-    private final DataTreeCandidateNode rootNode;
-
-    SimpleDataTreeCandidate(final YangInstanceIdentifier rootPath, final DataTreeCandidateNode rootNode) {
-        this.rootPath = Preconditions.checkNotNull(rootPath);
-        this.rootNode = Preconditions.checkNotNull(rootNode);
-    }
-
-    @Override
-    public DataTreeCandidateNode getRootNode() {
-        return rootNode;
-    }
-
-    @Override
-    public YangInstanceIdentifier getRootPath() {
-        return rootPath;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString();
-    }
-}
\ No newline at end of file
index d777942..0e660ea 100644 (file)
             <param-name>javax.ws.rs.Application</param-name>
             <param-value>org.opendaylight.controller.sal.rest.doc.jaxrs.ApiDocApplication</param-value>
         </init-param>
+        <!-- AAA Auth Filter -->
+        <init-param>
+            <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+            <param-value> org.opendaylight.aaa.sts.TokenAuthFilter</param-value>
+        </init-param>
         <load-on-startup>1</load-on-startup>
     </servlet>
 
 
     <security-constraint>
       <web-resource-collection>
-        <web-resource-name>free access</web-resource-name>
-        <url-pattern>/explorer/css/*</url-pattern>
-        <url-pattern>/explorer/images/*</url-pattern>
-        <url-pattern>/explorer/lib/*</url-pattern>
-        <url-pattern>/explorer/*</url-pattern>
+        <web-resource-name>API Doc</web-resource-name>
+        <url-pattern>/*</url-pattern>
       </web-resource-collection>
     </security-constraint>
 
index ff7d30d..a3cd3c7 100644 (file)
@@ -110,7 +110,7 @@ public class RuntimeRpc extends AbstractSingletonNetconfOperation {
 
     //this returns module with the newest revision if more then 1 module with same namespace is found
     private Optional<Module> getModule(final URI namespaceURI) {
-        return Optional.of(schemaContext.getCurrentContext().findModuleByNamespaceAndRevision(namespaceURI, null));
+        return Optional.fromNullable(schemaContext.getCurrentContext().findModuleByNamespaceAndRevision(namespaceURI, null));
     }
 
     private Optional<RpcDefinition> getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) {
index c292d93..27c9bd8 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-impl</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.yangtools</groupId>
-      <artifactId>yang-data-composite-node</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-model-api</artifactId>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-parser-impl</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-data-api</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-common</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>netconf-client</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-binding</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-model-api</artifactId>
+      </dependency>
   </dependencies>
 
   <build>
index a49c7b9..0ae1be4 100644 (file)
@@ -30,9 +30,9 @@ import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.controller.netconf.cli.writer.OutFormatter;
 import org.opendaylight.controller.netconf.cli.writer.WriteException;
 import org.opendaylight.controller.netconf.cli.writer.Writer;
-import org.opendaylight.controller.netconf.cli.writer.impl.CompositeNodeWriter;
+import org.opendaylight.controller.netconf.cli.writer.impl.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
 
@@ -102,7 +102,7 @@ public class Cli implements Runnable {
 
     private void handleRegularOutput(final Output response, final OutputDefinition outputDefinition,
             final Writer<DataSchemaNode> outHandler) {
-        final Map<DataSchemaNode, List<Node<?>>> unwrap = response.unwrap(outputDefinition);
+        final Map<DataSchemaNode, List<NormalizedNode<?, ?>>> unwrap = response.unwrap(outputDefinition);
 
         for (final DataSchemaNode schemaNode : unwrap.keySet()) {
             Preconditions.checkNotNull(schemaNode);
@@ -132,8 +132,8 @@ public class Cli implements Runnable {
 
     private void handleEmptyOutput(final Command command, final Output response) {
         try {
-            new CompositeNodeWriter(consoleIO, new OutFormatter()).write(null,
-                    Collections.<Node<?>> singletonList(response.getOutput()));
+            new NormalizedNodeWriter(consoleIO, new OutFormatter()).write(null,
+                    Collections.<NormalizedNode<?, ?>>singletonList(response.getOutput()));
         } catch (final WriteException e) {
             throw new IllegalStateException("Unable to write value for: " + response.getOutput().getNodeType()
                     + " from: " + command.getCommandId(), e);
@@ -141,7 +141,7 @@ public class Cli implements Runnable {
     }
 
     private Input handleInput(final InputDefinition inputDefinition) {
-        List<Node<?>> allArgs = Collections.emptyList();
+        List<NormalizedNode<?, ?>> allArgs = Collections.emptyList();
         try {
             if (!inputDefinition.isEmpty()) {
                 allArgs = argumentHandlerRegistry.getGenericReader(schemaContextRegistry.getLocalSchemaContext()).read(
index bede549..50c3243 100644 (file)
@@ -10,13 +10,13 @@ package org.opendaylight.controller.netconf.cli;
 import com.google.common.base.Optional;
 import jline.console.completer.Completer;
 import jline.console.completer.NullCompleter;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.netconf.cli.commands.CommandDispatcher;
 import org.opendaylight.controller.netconf.cli.io.ConsoleContext;
 import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -42,7 +42,7 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<Netco
 
     @Override
     public synchronized void onDeviceConnected(final SchemaContext context,
-            final NetconfSessionPreferences preferences, final RpcImplementation rpcImplementation) {
+            final NetconfSessionPreferences preferences, final DOMRpcService rpcService) {
         console.enterRootContext(new ConsoleContext() {
 
             @Override
@@ -60,7 +60,7 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<Netco
         // possible
         // TODO detect netconf base version
         // TODO detect inet types version
-        commandDispatcher.addRemoteCommands(rpcImplementation, context);
+        commandDispatcher.addRemoteCommands(rpcService, context);
         schemaContextRegistry.setRemoteSchemaContext(context);
         up = true;
         this.notify();
@@ -87,8 +87,8 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<Netco
     }
 
     @Override
-    public void onNotification(final CompositeNode compositeNode) {
-        // FIXME
+    public void onNotification(ContainerNode domNotification) {
+
     }
 
     @Override
index 67e9658..2438df4 100644 (file)
@@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
 import org.opendaylight.controller.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
 import org.opendaylight.controller.sal.connect.netconf.NetconfStateSchemas.NetconfStateSchemasResolverImpl;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
-import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
@@ -90,7 +89,7 @@ public class NetconfDeviceConnectionManager implements Closeable {
         repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
 
         device = new NetconfDevice(new SchemaResourcesDTO(repository, schemaContextFactory, new NetconfStateSchemasResolverImpl()),
-                deviceId, handler, executor, new NetconfMessageTransformer());
+                deviceId, handler, executor, true);
         listener = new NetconfDeviceCommunicator(deviceId, device);
         configBuilder.withSessionListener(listener);
         listener.initializeRemoteConnection(netconfClientDispatcher, configBuilder.build());
index ec7b5b4..f1b14ea 100644 (file)
@@ -19,6 +19,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionHandler;
 import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionManager;
 import org.opendaylight.controller.netconf.cli.commands.local.Close;
@@ -27,7 +28,6 @@ import org.opendaylight.controller.netconf.cli.commands.local.Disconnect;
 import org.opendaylight.controller.netconf.cli.commands.local.Help;
 import org.opendaylight.controller.netconf.cli.commands.remote.RemoteCommand;
 import org.opendaylight.controller.netconf.cli.io.IOUtil;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
@@ -106,15 +106,15 @@ public class CommandDispatcher {
     public static final Collection<String> BASE_NETCONF_SCHEMA_PATHS = Lists.newArrayList("/schema/remote/ietf-netconf.yang",
             "/schema/common/netconf-cli-ext.yang", "/schema/common/ietf-inet-types.yang");
 
-    public synchronized void addRemoteCommands(final RpcImplementation rpcInvoker, final SchemaContext remoteSchema) {
-        this.addRemoteCommands(rpcInvoker, remoteSchema, parseSchema(BASE_NETCONF_SCHEMA_PATHS));
+    public synchronized void addRemoteCommands(final DOMRpcService rpcService, final SchemaContext remoteSchema) {
+        this.addRemoteCommands(rpcService, remoteSchema, parseSchema(BASE_NETCONF_SCHEMA_PATHS));
     }
 
-    public synchronized void addRemoteCommands(final RpcImplementation rpcInvoker, final SchemaContext remoteSchema, final SchemaContext baseNetconfSchema) {
+    public synchronized void addRemoteCommands(final DOMRpcService rpcService, final SchemaContext remoteSchema, final SchemaContext baseNetconfSchema) {
         for (final SchemaContext context : Lists.newArrayList(remoteSchema, baseNetconfSchema)) {
             for (final Module module : context.getModules()) {
                 for (final RpcDefinition rpcDefinition : module.getRpcs()) {
-                    final Command command = RemoteCommand.fromRpc(rpcDefinition, rpcInvoker);
+                    final Command command = RemoteCommand.fromRpc(rpcDefinition, rpcService);
                     remoteCommands.put(rpcDefinition.getQName(), command);
                     nameToQNameRemote.put(getCommandName(rpcDefinition, module), rpcDefinition.getQName());
                 }
index 02173ac..e2cc83d 100644 (file)
@@ -8,46 +8,52 @@
 package org.opendaylight.controller.netconf.cli.commands.input;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 /**
  * Input arguments for and rpc/command execution
  */
 public class Input {
 
-    private final List<Node<?>> args;
+    private final List<NormalizedNode<?, ?>> args;
 
-    private final Map<String, Node<?>> nameToArg = new HashMap<String, Node<?>>();
+    private final Map<String, NormalizedNode<?, ?>> nameToArg = new HashMap<>();
 
-    public Input(final List<Node<?>> args) {
+    public Input(final List<NormalizedNode<?, ?>> args) {
         // FIXME empty Input should be constructed from static factory method
         if(args.isEmpty()) {
             this.args = Collections.emptyList();
             return;
         }
 
-        final Node<?> input = args.iterator().next();
+        final NormalizedNode<?, ?> input = args.iterator().next();
         Preconditions
-                .checkArgument(input instanceof CompositeNode, "Input container has to be of type composite node.");
-        this.args = ((CompositeNode) input).getValue();
+                .checkArgument(input instanceof DataContainerChild<?, ?>, "Input container has to be of type Data Container Child.");
+        this.args = new ArrayList<>((Collection) input.getValue());
 
-        for (final Node<?> arg : this.args) {
+        for (final NormalizedNode<?, ?> arg : this.args) {
             nameToArg.put(arg.getNodeType().getLocalName(), arg);
         }
     }
 
-    public Node<?> getArg(final String name) {
+    public NormalizedNode<?, ?> getArg(final String name) {
         return nameToArg.get(name);
     }
 
-    public CompositeNode wrap(final QName rpcQName) {
-        return new CompositeNodeTOImpl(rpcQName, null, args);
+    public NormalizedNode<?, ?> wrap(final QName rpcQName) {
+        //TODO just add the list as children to the node
+        return ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(rpcQName))
+                .withValue((Collection) args).build();
     }
 }
index 54706b8..b9abb5a 100644 (file)
@@ -29,10 +29,15 @@ import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 
 /**
@@ -59,14 +64,21 @@ public class Connect extends AbstractCommand {
     private Output invoke(final NetconfClientConfigurationBuilder config, final String addressName, final Input inputArgs) {
         final Set<String> remoteCmds = connectManager.connectBlocking(addressName, getAdress(inputArgs), config);
 
-        final ArrayList<Node<?>> output = Lists.newArrayList();
-        output.add(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "status"), null, "Connection initiated"));
+        final ArrayList<DataContainerChild<?, ?>> output = Lists.newArrayList();
+        output.add(ImmutableLeafNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "status")))
+                .withValue("Connection initiated").build());
 
+        final ArrayList<LeafSetEntryNode<Object>> leafListChildren = Lists.newArrayList();
         for (final String cmdId : remoteCmds) {
-            output.add(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "remote-commands"), null, cmdId));
+            leafListChildren.add(ImmutableLeafSetEntryNodeBuilder.create()
+                    .withNodeIdentifier(new NodeWithValue(QName.create(getCommandId(), "remote-commands"), cmdId))
+                    .withValue(cmdId).build());
         }
 
-        return new Output(new CompositeNodeTOImpl(getCommandId(), null, output));
+        return new Output(ImmutableLeafSetNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "remote-commands")))
+                .withValue(leafListChildren).build());
     }
 
     private NetconfClientConfigurationBuilder getConfig(final Input inputArgs) {
@@ -105,11 +117,11 @@ public class Connect extends AbstractCommand {
 
     private <T> Optional<T> getArgumentOpt(final Input inputArgs, final String argName, final Class<T> type) {
         final QName argQName = QName.create(getCommandId(), argName);
-        final Node<?> argumentNode = inputArgs.getArg(argName);
+        final NormalizedNode<?, ?> argumentNode = inputArgs.getArg(argName);
         if (argumentNode == null) {
             return Optional.absent();
         }
-        Preconditions.checkArgument(argumentNode instanceof SimpleNode, "Only simple type argument supported, %s",
+        Preconditions.checkArgument(argumentNode instanceof LeafNode, "Only simple type argument supported, %s",
                 argQName);
 
         final Object value = argumentNode.getValue();
index 73088b4..2bb9c00 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.netconf.cli.commands.local;
 
-import com.google.common.collect.Lists;
 import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionManager;
 import org.opendaylight.controller.netconf.cli.commands.AbstractCommand;
 import org.opendaylight.controller.netconf.cli.commands.Command;
@@ -16,9 +15,9 @@ import org.opendaylight.controller.netconf.cli.commands.input.InputDefinition;
 import org.opendaylight.controller.netconf.cli.commands.output.Output;
 import org.opendaylight.controller.netconf.cli.commands.output.OutputDefinition;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 
 /**
@@ -39,9 +38,12 @@ public class Disconnect extends AbstractCommand {
     public Output invoke(final Input inputArgs) {
         connectionManager.disconnect();
 
-        return new Output(new CompositeNodeTOImpl(getCommandId(), null,
-                Lists.<Node<?>> newArrayList(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "status"), null,
-                        "Connection disconnected"))));
+        return new Output(
+                ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(getCommandId()))
+                        .withChild(ImmutableLeafNodeBuilder.create()
+                                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "status")))
+                                .withValue("Connection disconnected").build()).build());
     }
 
     public static Command create(final RpcDefinition rpcDefinition,
index 1816469..0b22aaf 100644 (file)
@@ -20,10 +20,14 @@ import org.opendaylight.controller.netconf.cli.commands.input.InputDefinition;
 import org.opendaylight.controller.netconf.cli.commands.output.Output;
 import org.opendaylight.controller.netconf.cli.commands.output.OutputDefinition;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
-import org.opendaylight.yangtools.yang.data.impl.NodeFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 
 /**
@@ -40,21 +44,34 @@ public class Help extends AbstractCommand {
 
     @Override
     public Output invoke(final Input inputArgs) {
-        final ArrayList<Node<?>> value = Lists.newArrayList();
+        final ArrayList<MapEntryNode> value = Lists.newArrayList();
 
         for (final String id : commandDispatcher.getCommandIds()) {
             final Optional<Command> cmd = commandDispatcher.getCommand(id);
             Preconditions.checkState(cmd.isPresent(), "Command %s has to be present in command dispatcher", id);
             final Optional<String> description = cmd.get().getCommandDescription();
-            final List<Node<?>> nameAndDescription = Lists.newArrayList();
-            nameAndDescription.add(NodeFactory.createImmutableSimpleNode(QName.create(getCommandId(), "id"), null, id));
+            final List<DataContainerChild<?, ?>> nameAndDescription = Lists.newArrayList();
+            nameAndDescription.add(
+                    ImmutableLeafNodeBuilder.create()
+                            .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "id")))
+                            .withValue(id).build());
             if(description.isPresent()) {
-                nameAndDescription.add(NodeFactory.createImmutableSimpleNode(QName.create(getCommandId(), "description"), null, description.get()));
+                nameAndDescription.add(
+                        ImmutableLeafNodeBuilder.create()
+                                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "description")))
+                                .withValue(description.get()).build());
             }
-            value.add(ImmutableCompositeNode.create(QName.create(getCommandId(), "commands"), nameAndDescription));
+            value.add(ImmutableMapEntryNodeBuilder.create()
+                    .withValue(nameAndDescription)
+                    .withNodeIdentifier(
+                            new NodeIdentifierWithPredicates(QName.create(getCommandId(), "commands"),
+                                    QName.create(getCommandId(), "id"), id)).build());
         }
+        MapNode mappedHelp = ImmutableMapNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "commands")))
+                .withValue(value).build();
 
-        return new Output(new CompositeNodeTOImpl(getCommandId(), null, value));
+        return new Output(mappedHelp);
     }
 
     public static Command create(final RpcDefinition rpcDefinition, final CommandDispatcher commandDispatcher) {
index c366c89..0d9880d 100644 (file)
@@ -13,8 +13,8 @@ import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 
 /**
@@ -22,30 +22,32 @@ import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
  */
 public class Output {
 
-    private final CompositeNode output;
+    private final NormalizedNode<?, ?> output;
 
-    public Output(final CompositeNode output) {
-        this.output = output;
+    public Output(final NormalizedNode<?, ?> output) {
+        if (output instanceof ContainerNode && output.getNodeType().getLocalName() == "rpc-reply") {
+            this.output = ((ContainerNode) output).getValue().iterator().next();
+        } else {
+            this.output = output;
+        }
     }
 
-    public Map<DataSchemaNode, List<Node<?>>> unwrap(final OutputDefinition outputDefinition) {
+    public Map<DataSchemaNode, List<NormalizedNode<?, ?>>> unwrap(final OutputDefinition outputDefinition) {
         Preconditions.checkArgument(outputDefinition.isEmpty() == false);
 
         final Map<QName, DataSchemaNode> mappedSchemaNodes = mapOutput(outputDefinition);
-        final Map<DataSchemaNode, List<Node<?>>> mappedNodesToSchema = Maps.newHashMap();
-
-        for (final Node<?> node : output.getValue()) {
-            final DataSchemaNode schemaNode = mappedSchemaNodes.get(node.getKey().withoutRevision());
-            final List<Node<?>> list = mappedNodesToSchema.get(schemaNode) == null ? Lists.<Node<?>> newArrayList()
-                    : mappedNodesToSchema.get(schemaNode);
-            list.add(node);
-            mappedNodesToSchema.put(schemaNode, list);
-        }
+        final Map<DataSchemaNode, List<NormalizedNode<?, ?>>> mappedNodesToSchema = Maps.newHashMap();
+
+        final DataSchemaNode schemaNode = mappedSchemaNodes.get(output.getNodeType().withoutRevision());
+        final List<NormalizedNode<?, ?>> list = mappedNodesToSchema.get(schemaNode) == null ? Lists.<NormalizedNode<?, ?>>newArrayList()
+                : mappedNodesToSchema.get(schemaNode);
+        list.add(output);
+        mappedNodesToSchema.put(schemaNode, list);
 
         return mappedNodesToSchema;
     }
 
-    public CompositeNode getOutput() {
+    public NormalizedNode<?, ?> getOutput() {
         return output;
     }
 
index 05b9e85..be2dc85 100644 (file)
@@ -7,10 +7,14 @@
  */
 package org.opendaylight.controller.netconf.cli.commands.remote;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.netconf.cli.commands.AbstractCommand;
 import org.opendaylight.controller.netconf.cli.commands.Command;
 import org.opendaylight.controller.netconf.cli.commands.CommandInvocationException;
@@ -18,11 +22,9 @@ import org.opendaylight.controller.netconf.cli.commands.input.Input;
 import org.opendaylight.controller.netconf.cli.commands.input.InputDefinition;
 import org.opendaylight.controller.netconf.cli.commands.output.Output;
 import org.opendaylight.controller.netconf.cli.commands.output.OutputDefinition;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
  * Generic remote command implementation that sends the rpc xml to the remote device and waits for response
@@ -33,16 +35,18 @@ public class RemoteCommand extends AbstractCommand {
     // TODO make this configurable
     private static final long DEFAULT_TIMEOUT = 10000;
     private static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
-    private final RpcImplementation rpc;
+    private final DOMRpcService rpcService;
 
-    public RemoteCommand(final QName qName, final InputDefinition args, final OutputDefinition output, final String description, final RpcImplementation rpc) {
+    public RemoteCommand(final QName qName, final InputDefinition args, final OutputDefinition output, final String description, final DOMRpcService rpcService) {
         super(qName, args, output, description);
-        this.rpc = rpc;
+        this.rpcService = rpcService;
     }
 
     @Override
     public Output invoke(final Input inputArgs) throws CommandInvocationException {
-        final ListenableFuture<RpcResult<CompositeNode>> invokeRpc = rpc.invokeRpc(getCommandId(), inputArgs.wrap(getCommandId()));
+        final CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc =
+                rpcService.invokeRpc(SchemaPath.create(Collections.singletonList(getCommandId()), true), inputArgs.wrap(getCommandId()));
+
         try {
             return new Output(invokeRpc.get(DEFAULT_TIMEOUT, DEFAULT_TIMEOUT_UNIT).getResult());
         } catch (final ExecutionException e) {
@@ -56,10 +60,10 @@ public class RemoteCommand extends AbstractCommand {
         }
     }
 
-    public static Command fromRpc(final RpcDefinition rpcDefinition, final RpcImplementation rpcInvoker) {
+    public static Command fromRpc(final RpcDefinition rpcDefinition, final DOMRpcService rpcService) {
         final InputDefinition args = getInputDefinition(rpcDefinition);
         final OutputDefinition retVal = getOutputDefinition(rpcDefinition);
 
-        return new RemoteCommand(rpcDefinition.getQName(), args, retVal, rpcDefinition.getDescription(), rpcInvoker);
+        return new RemoteCommand(rpcDefinition.getQName(), args, retVal, rpcDefinition.getDescription(), rpcService);
     }
 }
index 0c5e276..fef5f3a 100644 (file)
@@ -16,7 +16,7 @@ import jline.console.completer.Completer;
 import jline.console.completer.NullCompleter;
 import org.opendaylight.controller.netconf.cli.io.ConsoleContext;
 import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
-import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
@@ -56,7 +56,7 @@ public abstract class AbstractReader<T extends DataSchemaNode> implements Reader
     }
 
     @Override
-    public List<Node<?>> read(final T schemaNode) throws ReadingException {
+    public List<NormalizedNode<?, ?>> read(final T schemaNode) throws ReadingException {
         if (isReadingWanted(schemaNode)) {
             final ConsoleContext ctx = getContext(schemaNode);
             console.enterContext(ctx);
@@ -80,7 +80,7 @@ public abstract class AbstractReader<T extends DataSchemaNode> implements Reader
 
     // TODO javadoc
 
-    protected abstract List<Node<?>> readWithContext(T schemaNode) throws IOException, ReadingException;
+    protected abstract List<NormalizedNode<?, ?>> readWithContext(T schemaNode) throws IOException, ReadingException;
 
     protected abstract ConsoleContext getContext(T schemaNode);
 
index 9f27b8f..4a23499 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.netconf.cli.reader;
 
 import java.util.List;
-import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 
 /**
@@ -16,6 +16,6 @@ import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
  */
 public interface Reader<T extends DataSchemaNode> {
 
-    List<Node<?>> read(T schemaNode) throws ReadingException;
+    List<NormalizedNode<?, ?>> read(T schemaNode) throws ReadingException;
 
 }
index 95fc098..111a242 100644 (file)
@@ -33,8 +33,10 @@ import org.opendaylight.controller.netconf.cli.io.IOUtil;
 import org.opendaylight.controller.netconf.cli.reader.AbstractReader;
 import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
@@ -69,7 +71,7 @@ public class ConfigReader extends AbstractReader<DataSchemaNode> {
     // FIXME refactor + unite common code with FilterReader
 
     @Override
-    protected List<Node<?>> readWithContext(final DataSchemaNode schemaNode) throws IOException, ReadingException {
+    protected List<NormalizedNode<?, ?>> readWithContext(final DataSchemaNode schemaNode) throws IOException, ReadingException {
         console.writeLn("Config " + schemaNode.getQName().getLocalName());
         console.writeLn("Submit path of the data to edit. Use TAB for autocomplete");
 
@@ -87,20 +89,25 @@ public class ConfigReader extends AbstractReader<DataSchemaNode> {
             filterPartsQNames.add(qName);
         }
 
-        List<Node<?>> previous = readInnerNode(rawValue);
+        List<NormalizedNode<?, ?>> previous = readInnerNode(rawValue);
 
         for (final QName qName : Lists.reverse(filterPartsQNames).subList(1, filterPartsQNames.size())) {
-            previous = Collections.<Node<?>> singletonList(new CompositeNodeTOImpl(qName, null,
-                    previous == null ? Collections.<Node<?>> emptyList() : previous));
+            previous = Collections.<NormalizedNode<?, ?>>singletonList(
+                    ImmutableContainerNodeBuilder.create()
+                            .withNodeIdentifier(new NodeIdentifier(qName))
+                            .withValue(previous == null ? Collections.<DataContainerChild<?, ?>>emptyList() : (Collection) previous).build()
+            );
         }
 
-        final Node<?> newNode = previous == null ? null
-                : new CompositeNodeTOImpl(schemaNode.getQName(), null, previous);
+        final DataContainerChild<?, ?> newNode = previous == null ? null
+                : ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(schemaNode.getQName()))
+                        .withValue((Collection) previous).build();
 
-        return Collections.<Node<?>> singletonList(newNode);
+        return Collections.<NormalizedNode<?, ?>> singletonList(newNode);
     }
 
-    private List<Node<?>> readInnerNode(final String pathString) throws ReadingException {
+    private List<NormalizedNode<?, ?>> readInnerNode(final String pathString) throws ReadingException {
         final Optional<DataSchemaNode> schema = getCurrentNode(getSchemaContext(), pathString);
         Preconditions.checkState(schema.isPresent(), "Unable to find schema for %s", pathString);
         return commandArgHandlerRegistry.getGenericReader(getSchemaContext(), true).read(schema.get());
index bdd9cd0..edf05f1 100644 (file)
@@ -16,7 +16,7 @@ import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
 import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.controller.netconf.cli.reader.impl.ChoiceReader;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
 import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -33,7 +33,7 @@ public class EditContentReader extends ChoiceReader {
     }
 
     @Override
-    public List<Node<?>> readWithContext(final ChoiceSchemaNode choiceNode) throws IOException, ReadingException {
+    public List<NormalizedNode<?, ?>> readWithContext(final ChoiceSchemaNode choiceNode) throws IOException, ReadingException {
         Preconditions.checkState(choiceNode.getQName().equals(EDIT_CONTENT_QNAME), "Unexpected choice %s, expected %s", choiceNode, EDIT_CONTENT_QNAME);
         final ChoiceCaseNode selectedCase = choiceNode.getCaseNodeByName(CONFIG_QNAME);
         Preconditions.checkNotNull(selectedCase, "Unexpected choice %s, expected %s that contains %s", choiceNode, EDIT_CONTENT_QNAME, CONFIG_QNAME);
index 7b37f69..ff1e2b1 100644 (file)
@@ -31,9 +31,10 @@ import org.opendaylight.controller.netconf.cli.io.IOUtil;
 import org.opendaylight.controller.netconf.cli.reader.AbstractReader;
 import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
@@ -73,9 +74,9 @@ public class FilterReader extends AbstractReader<DataSchemaNode> {
     public static final String FILTER_TYPE_VALUE_DEFAULT = "subtree";
 
     @Override
-    protected List<Node<?>> readWithContext(final DataSchemaNode schemaNode) throws IOException, ReadingException {
+    protected List<NormalizedNode<?, ?>> readWithContext(final DataSchemaNode schemaNode) throws IOException, ReadingException {
         boolean redSuccessfuly = false;
-        Node<?> newNode = null;
+        DataContainerChild<?, ?> newNode = null;
         do {
             console.writeLn("Filter " + schemaNode.getQName().getLocalName());
             console.writeLn("Submit path of the data to retrieve. Use TAB for autocomplete");
@@ -95,18 +96,18 @@ public class FilterReader extends AbstractReader<DataSchemaNode> {
                     filterPartsQNames.add(qName);
                 }
 
-                Node<?> previous = null;
+                DataContainerChild<?, ?> previous = null;
 
                 for (final QName qName : Lists.reverse(filterPartsQNames)) {
-                    previous = new CompositeNodeTOImpl(qName, null,
-                            previous == null ? Collections.<Node<?>> emptyList()
-                                    : Collections.<Node<?>> singletonList(previous));
+                    previous = ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(qName))
+                            .withValue(previous == null ? Collections.<DataContainerChild<?, ?>>emptyList()
+                                    : Collections.<DataContainerChild<?, ?>>singletonList(previous)).build();
                 }
 
                 final Map<QName, String> attributes = Collections.singletonMap(FILTER_TYPE_QNAME,
                         FILTER_TYPE_VALUE_DEFAULT);
-                newNode = previous == null ? null : ImmutableCompositeNode.create(schemaNode.getQName(), attributes,
-                        Collections.<Node<?>> singletonList(previous));
+                newNode = previous == null ? null : ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(schemaNode.getQName())).withChild(previous).build();
                 redSuccessfuly = true;
             } catch (final ReadingException e) {
                 final String message = "Specified filter path isn't correct.";
@@ -114,7 +115,7 @@ public class FilterReader extends AbstractReader<DataSchemaNode> {
                 console.writeLn(message);
             }
         } while (!redSuccessfuly);
-        return Collections.<Node<?>> singletonList(newNode);
+        return Collections.<NormalizedNode<?, ?>> singletonList(newNode);
     }
 
     @Override
index 2ce2f64..a8d2590 100644 (file)
@@ -21,9 +21,13 @@ import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
 import org.opendaylight.controller.netconf.cli.reader.AbstractReader;
 import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.NodeFactory;
-import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils;
+import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
 import org.opendaylight.yangtools.yang.model.api.AnyXmlSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.w3c.dom.Document;
@@ -40,32 +44,38 @@ public class AnyXmlReader extends AbstractReader<AnyXmlSchemaNode> {
     }
 
     @Override
-    protected List<Node<?>> readWithContext(final AnyXmlSchemaNode schemaNode) throws IOException, ReadingException {
+    protected List<NormalizedNode<?, ?>> readWithContext(final AnyXmlSchemaNode schemaNode) throws IOException, ReadingException {
         console.writeLn(listType(schemaNode) + " " + schemaNode.getQName().getLocalName());
 
         final String rawValue = console.read();
 
-        Node<?> newNode = null;
+        DataContainerChild<?, ?> newNode = null;
         if (!isSkipInput(rawValue)) {
-            final Optional<Node<?>> value = tryParse(rawValue);
+            final Optional<DataContainerChild<?, ?>> value = tryParse(rawValue, schemaNode);
 
             if (value.isPresent()) {
-                newNode = NodeFactory.createImmutableCompositeNode(schemaNode.getQName(), null,
-                        Collections.<Node<?>> singletonList(value.get()));
+                newNode = ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(schemaNode.getQName()))
+                        .withChild(value.get()).build();
             } else {
-                newNode = NodeFactory.createImmutableSimpleNode(schemaNode.getQName(), null, rawValue);
+                newNode = ImmutableLeafNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(schemaNode.getQName())).withValue(rawValue).build();
             }
         }
 
-        final List<Node<?>> newNodes = new ArrayList<>();
+        final List<NormalizedNode<?, ?>> newNodes = new ArrayList<>();
         newNodes.add(newNode);
         return newNodes;
     }
 
-    private Optional<Node<?>> tryParse(final String rawValue) {
+    private Optional<DataContainerChild<?, ?>> tryParse(final String rawValue, final AnyXmlSchemaNode schemaNode) {
         try {
             final Document dom = XmlUtil.readXmlToDocument(rawValue);
-            return Optional.<Node<?>> of(XmlDocumentUtils.toDomNode(dom));
+            return Optional.<DataContainerChild<?, ?>> of(
+                    DomToNormalizedNodeParserFactory.
+          &nbs