Merge "BUG-2627: do not duplicate descriptions"
authorTony Tkacik <ttkacik@cisco.com>
Thu, 12 Feb 2015 12:18:47 +0000 (12:18 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 12 Feb 2015 12:18:48 +0000 (12:18 +0000)
82 files changed:
features/netconf/pom.xml
features/netconf/src/main/resources/features.xml
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ClientActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-netconf-connector/pom.xml
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/api/RemoteDeviceHandler.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfStateSchemas.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCapabilities.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferences.java [moved from opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java with 88% similarity]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataBroker.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDatastoreAdapter.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalProvider.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/AbstractWriteTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateRunningTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteCandidateTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/WriteRunningTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/util/RemoteDeviceId.java
opendaylight/md-sal/sal-netconf-connector/src/main/yang/netconf-node-topology.yang [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/NetconfDeviceTest.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionPreferencesTest.java [moved from opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilitiesTest.java with 81% similarity]
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapterTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/TerminationMonitor.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/netconf/netconf-cli/pom.xml
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Main.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionManager.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITSecureTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToEXIEncoder.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/FakeModuleBuilderCapability.java [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java
pom.xml

index 028c16b..997b6a2 100644 (file)
@@ -9,7 +9,7 @@
   </parent>
   <artifactId>features-netconf</artifactId>
 
-  <packaging>pom</packaging>
+  <packaging>jar</packaging>
 
   <properties>
     <features.file>features.xml</features.file>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-auth</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-notifications-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-notifications-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>ietf-netconf</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>ietf-netconf-monitoring</artifactId>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>ietf-netconf-monitoring-extension</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>ietf-netconf-notifications</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.yangtools.model</groupId>
       <artifactId>ietf-inet-types</artifactId>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-monitoring</artifactId>
     </dependency>
+    <!-- test to validate features.xml -->
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>features-test</artifactId>
+      <version>${yangtools.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- dependency for opendaylight-karaf-empty for use by testing -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>opendaylight-karaf-empty</artifactId>
+      <version>${commons.opendaylight.version}</version>
+      <type>zip</type>
+    </dependency>
   </dependencies>
 
   <build>
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${surefire.version}</version>
+        <configuration>
+          <systemPropertyVariables>
+            <karaf.distro.groupId>org.opendaylight.controller</karaf.distro.groupId>
+            <karaf.distro.artifactId>opendaylight-karaf-empty</karaf.distro.artifactId>
+            <karaf.distro.version>${commons.opendaylight.version}</karaf.distro.version>
+          </systemPropertyVariables>
+          <dependenciesToScan>
+            <dependency>org.opendaylight.yangtools:features-test</dependency>
+          </dependenciesToScan>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
   <scm>
index 2affa27..a655021 100644 (file)
@@ -85,6 +85,8 @@
   </feature>
   <feature name='odl-netconf-notifications-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Monitoring :: Impl">
     <feature version='${project.version}'>odl-netconf-notifications-api</feature>
+    <feature version='${project.version}'>odl-netconf-util</feature>
+    <feature version='${yangtools.version}'>odl-yangtools-binding-generator</feature>
     <bundle>mvn:org.opendaylight.controller/netconf-notifications-impl/${project.version}</bundle>
   </feature>
 
index a05d02c..7f5233c 100644 (file)
@@ -238,7 +238,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
      * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt
      *
      * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
-     *         success if it indicates no further attempts should be made and failure if it reports an error
+     *         success is never reported, only failure when it runs out of reconnection attempts.
      */
     protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
             final PipelineInitializer<S> initializer) {
index aaec95a..865c666 100644 (file)
@@ -15,6 +15,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import org.slf4j.Logger;
@@ -55,6 +56,15 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
                 channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
             }
         });
+
+        pending.addListener(new GenericFutureListener<Future<Object>>() {
+            @Override
+            public void operationComplete(Future<Object> future) throws Exception {
+                if (!future.isSuccess()) {
+                    ReconnectPromise.this.setFailure(future.cause());
+                }
+            }
+        });
     }
 
     /**
index 8022e72..fe25c75 100644 (file)
@@ -11,14 +11,13 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientActor extends UntypedActor {
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     private final ActorRef target;
 
index 9aff86b..c5ae4c4 100644 (file)
@@ -125,7 +125,7 @@ public class ExampleActor extends RaftActor {
         try {
             bs = fromObject(state);
         } catch (Exception e) {
-            LOG.error(e, "Exception in creating snapshot");
+            LOG.error("Exception in creating snapshot", e);
         }
         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
@@ -135,7 +135,7 @@ public class ExampleActor extends RaftActor {
         try {
             state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
-           LOG.error(e, "Exception in applying snapshot");
+           LOG.error("Exception in applying snapshot", e);
         }
         if(LOG.isDebugEnabled()) {
             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
index 766b80e..3dc6ae4 100644 (file)
@@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Procedure;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
@@ -43,6 +41,8 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
@@ -85,8 +85,7 @@ import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries
  * </ul>
  */
 public abstract class RaftActor extends AbstractUntypedPersistentActor {
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
      * The current state determines the current behavior of a RaftActor
@@ -338,8 +337,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
 
-            LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId());
+            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+                    persistenceId(), saveSnapshotFailure.cause());
 
             context.getReplicatedLog().snapshotRollback();
 
index 0e1f20b..9d391a1 100644 (file)
@@ -12,9 +12,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.event.LoggingAdapter;
-
 import java.util.Map;
+import org.slf4j.Logger;
 
 /**
  * The RaftActorContext contains that portion of the RaftActors state that
@@ -106,7 +105,7 @@ public interface RaftActorContext {
      *
      * @return
      */
-    LoggingAdapter getLogger();
+    Logger getLogger();
 
     /**
      * Get a mapping of peerId's to their addresses
index 5438fe7..b71b3be 100644 (file)
@@ -8,15 +8,14 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import static com.google.common.base.Preconditions.checkState;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActorContext;
-import akka.event.LoggingAdapter;
 import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
 
@@ -36,7 +35,7 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private final Map<String, String> peerAddresses;
 
-    private final LoggingAdapter LOG;
+    private final Logger LOG;
 
     private final ConfigParams configParams;
 
@@ -47,7 +46,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         ElectionTerm termInformation, long commitIndex,
         long lastApplied, ReplicatedLog replicatedLog,
         Map<String, String> peerAddresses, ConfigParams configParams,
-        LoggingAdapter logger) {
+        Logger logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
@@ -115,7 +114,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         return context.system();
     }
 
-    @Override public LoggingAdapter getLogger() {
+    @Override public Logger getLogger() {
         return this.LOG;
     }
 
index 8f33d94..9b6c088 100644 (file)
@@ -26,7 +26,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -129,7 +128,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // Upon election: send initial empty AppendEntries RPCs
         // (heartbeat) to each server; repeat during idle periods to
         // prevent election timeouts (ยง5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+        sendAppendEntries(0);
     }
 
     /**
@@ -425,18 +424,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
-            sendAppendEntries();
+            sendAppendEntries(0);
         }
     }
 
-    private void sendAppendEntries() {
+    private void sendAppendEntries(long timeSinceLastActivityInterval) {
         // Send an AppendEntries to all followers
-        long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
             final FollowerLogInformation followerLogInformation = e.getValue();
             // This checks helps not to send a repeat message to the follower
-            if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+            if(!followerLogInformation.isFollowerActive() ||
+                    followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
                 sendUpdatesToFollower(followerId, followerLogInformation, true);
             }
         }
@@ -638,7 +637,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
-            LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
+            LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
         }
     }
 
@@ -661,7 +660,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
-            sendAppendEntries();
+            sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
         }
     }
 
index 99824b0..075b287 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -24,6 +23,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.slf4j.Logger;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -46,7 +46,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     /**
      *
      */
-    protected final LoggingAdapter LOG;
+    protected final Logger LOG;
 
     /**
      *
@@ -349,7 +349,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                LOG.warning(
+                LOG.warn(
                         "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
                         context.getId(), i, i, index);
                 break;
@@ -394,7 +394,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         try {
             close();
         } catch (Exception e) {
-            LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
+            LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e);
         }
 
         return behavior;
index 410b3c2..8a07887 100644 (file)
@@ -342,7 +342,7 @@ public class Follower extends AbstractRaftActorBehavior {
             snapshotTracker = null;
 
         } catch (Exception e){
-            LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
+            LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e);
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     installSnapshot.getChunkIndex(), false), actor());
index 26fbde0..d26837f 100644 (file)
@@ -8,22 +8,22 @@
 
 package org.opendaylight.controller.cluster.raft.behaviors;
 
-import akka.event.LoggingAdapter;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
+import org.slf4j.Logger;
 
 /**
  * SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
  */
 public class SnapshotTracker {
-    private final LoggingAdapter LOG;
+    private final Logger LOG;
     private final int totalChunks;
     private ByteString collectedChunks = ByteString.EMPTY;
     private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
     private boolean sealed = false;
     private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
-    SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+    SnapshotTracker(Logger LOG, int totalChunks){
         this.LOG = LOG;
         this.totalChunks = totalChunks;
     }
@@ -77,6 +77,8 @@ public class SnapshotTracker {
     }
 
     public static class InvalidChunkException extends Exception {
+        private static final long serialVersionUID = 1L;
+
         InvalidChunkException(String message){
             super(message);
         }
index 9d3e5dc..4d33152 100644 (file)
@@ -12,8 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import java.io.Serializable;
@@ -22,6 +20,8 @@ import java.util.Map;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MockRaftActorContext implements RaftActorContext {
 
@@ -88,7 +88,8 @@ public class MockRaftActorContext implements RaftActorContext {
 
     public void initReplicatedLog(){
         this.replicatedLog = new SimpleReplicatedLog();
-        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("")));
+        this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1")));
+        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2")));
     }
 
     @Override public ActorRef actorOf(Props props) {
@@ -144,8 +145,8 @@ public class MockRaftActorContext implements RaftActorContext {
         return this.system;
     }
 
-    @Override public LoggingAdapter getLogger() {
-        return Logging.getLogger(system, this);
+    @Override public Logger getLogger() {
+        return LoggerFactory.getLogger(getClass());
     }
 
     @Override public Map<String, String> getPeerAddresses() {
index cf7af43..9e0e06c 100644 (file)
@@ -1,5 +1,18 @@
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
@@ -62,20 +75,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
 public class RaftActorTest extends AbstractActorTest {
 
 
@@ -1173,7 +1172,10 @@ public class RaftActorTest extends AbstractActorTest {
                 // simulate a real snapshot
                 leaderActor.onReceiveCommand(new InitiateInstallSnapshot());
                 assertEquals(5, leaderActor.getReplicatedLog().size());
-                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+                assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
+                        leaderActor.getCurrentBehavior().state(),leaderActor.getLeaderId())
+                        , RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
 
                 //reply from a slow follower does not initiate a fake snapshot
                 leaderActor.onReceiveCommand(new AppendEntriesReply("follower-2", 1, true, 9, 1));
index 666cea6..3f551b3 100644 (file)
@@ -1,5 +1,8 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -41,14 +44,16 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.impl.SimpleLogger;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
+    static {
+        // This enables trace logging for the tests.
+        System.setProperty(SimpleLogger.LOG_KEY_PREFIX + MockRaftActorContext.class.getName(), "trace");
+    }
+
     private final ActorRef leaderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
     private final ActorRef senderActor =
@@ -70,47 +75,50 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
-
                     ActorRef followerActor = getTestActor();
 
                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap<>();
 
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                    String followerId = "follower";
+                    peerAddresses.put(followerId, followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
+                    long term = 1;
+                    actorContext.getTermInformation().update(term, "");
+
                     Leader leader = new Leader(actorContext);
-                    leader.markFollowerActive(followerActor.path().toString());
-                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                        TimeUnit.MILLISECONDS);
-                    leader.handleMessage(senderActor, new SendHeartBeat());
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            @Override
-                            protected String match(Object in) {
-                                Object msg = fromSerializableMessage(in);
-                                if (msg instanceof AppendEntries) {
-                                    if (((AppendEntries)msg).getTerm() == 0) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
+                    // Leader should send an immediate heartbeat with no entries as follower is inactive.
+                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
+                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getTerm", term, appendEntries.getTerm());
+                    assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 0, appendEntries.getEntries().size());
 
-                    assertEquals("match", out);
+                    // The follower would normally reply - simulate that explicitly here.
+                    leader.handleMessage(followerActor, new AppendEntriesReply(
+                            followerId, term, true, lastIndex - 1, term));
+                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+                    // Sleep for the heartbeat interval so AppendEntries is sent.
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+                            getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
+                    leader.handleMessage(senderActor, new SendHeartBeat());
+
+                    appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
+                    assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
                 }
             };
         }};
@@ -119,55 +127,51 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
-
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap<>();
 
-                    peerAddresses.put(followerActor.path().toString(),
-                            followerActor.path().toString());
+                    String followerId = "follower";
+                    peerAddresses.put(followerId, followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
+                    long term = 1;
+                    actorContext.getTermInformation().update(term, "");
+
                     Leader leader = new Leader(actorContext);
-                    leader.markFollowerActive(followerActor.path().toString());
-                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                        TimeUnit.MILLISECONDS);
-                    RaftActorBehavior raftBehavior = leader
-                        .handleMessage(senderActor, new Replicate(null, null,
-                            new MockRaftActorContext.MockReplicatedLogEntry(1,
-                                100,
-                                new MockRaftActorContext.MockPayload("foo"))
-                        ));
+
+                    // Leader will send an immediate heartbeat - ignore it.
+                    expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
+                    // The follower would normally reply - simulate that explicitly here.
+                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
+                    leader.handleMessage(followerActor, new AppendEntriesReply(
+                            followerId, term, true, lastIndex, term));
+                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+                    MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+                    MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                            1, lastIndex + 1, payload);
+                    actorContext.getReplicatedLog().append(newEntry);
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                            new Replicate(null, null, newEntry));
 
                     // State should not change
                     assertTrue(raftBehavior instanceof Leader);
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            @Override
-                            protected String match(Object in) {
-                                Object msg = fromSerializableMessage(in);
-                                if (msg instanceof AppendEntries) {
-                                    if (((AppendEntries)msg).getTerm() == 0) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertEquals("match", out);
+                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
+                    assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+                    assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
                 }
             };
         }};
@@ -176,7 +180,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
                 @Override
                 protected void run() {
@@ -282,7 +285,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+            AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
                 followerActor, AppendEntries.class);
 
             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
@@ -297,9 +300,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(senderActor, new SendHeartBeat());
 
-            InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
-                MessageCollectorActor.getFirstMatching(followerActor,
-                    InstallSnapshot.SERIALIZABLE_CLASS);
+            InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
+                InstallSnapshot.SERIALIZABLE_CLASS);
 
             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
                 isproto);
@@ -435,7 +437,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 leaderActor, new InitiateInstallSnapshot());
 
-            CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+            CaptureSnapshot cs = MessageCollectorActor.
                 getFirstMatching(leaderActor, CaptureSnapshot.class);
 
             assertNotNull(cs);
@@ -491,6 +493,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             Leader leader = new Leader(actorContext);
 
+            // Ignore initial heartbeat.
+            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
             // new entry
             ReplicatedLogImplEntry entry =
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -558,6 +563,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             MockLeader leader = new MockLeader(actorContext);
 
+            // Ignore initial heartbeat.
+            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
             Map<String, String> leadersSnapshot = new HashMap<>();
             leadersSnapshot.put("1", "A");
             leadersSnapshot.put("2", "B");
@@ -734,15 +742,17 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+            MessageCollectorActor.getAllMatching(followerActor,
+                    InstallSnapshotMessages.InstallSnapshot.class);
 
-            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
-            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+            assertNotNull(installSnapshot);
 
             assertEquals(1, installSnapshot.getChunkIndex());
             assertEquals(3, installSnapshot.getTotalChunks());
 
+            followerActor.underlyingActor().clear();
 
             leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 followerActor.path().toString(), -1, false));
@@ -752,11 +762,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
-
-            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
-            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+            installSnapshot = MessageCollectorActor.getFirstMatching(
+                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+            assertNotNull(installSnapshot);
 
             assertEquals(1, installSnapshot.getChunkIndex());
             assertEquals(3, installSnapshot.getTotalChunks());
@@ -769,7 +777,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
         new JavaTestKit(getSystem()) {
             {
-
                 TestActorRef<MessageCollectorActor> followerActor =
                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
@@ -811,11 +818,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
-
-                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
-
-                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+                assertNotNull(installSnapshot);
 
                 assertEquals(1, installSnapshot.getChunkIndex());
                 assertEquals(3, installSnapshot.getTotalChunks());
@@ -823,17 +828,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 int hashCode = installSnapshot.getData().hashCode();
 
-                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
-
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-
-                leader.handleMessage(leaderActor, new SendHeartBeat());
+                followerActor.underlyingActor().clear();
 
-                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
-
-                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+                installSnapshot = MessageCollectorActor.getFirstMatching(
+                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+                assertNotNull(installSnapshot);
 
                 assertEquals(2, installSnapshot.getChunkIndex());
                 assertEquals(3, installSnapshot.getTotalChunks());
@@ -902,7 +903,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
-        return new MockRaftActorContext("test", getSystem(), actorRef);
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+        context.setConfigParams(configParams);
+        return context;
     }
 
     private ByteString toByteString(Map<String, String> state) {
@@ -931,43 +937,41 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
-        private static AbstractRaftActorBehavior behavior;
-
-        public ForwardMessageToBehaviorActor(){
-
-        }
+        AbstractRaftActorBehavior behavior;
 
         @Override public void onReceive(Object message) throws Exception {
+            if(behavior != null) {
+                behavior.handleMessage(sender(), message);
+            }
+
             super.onReceive(message);
-            behavior.handleMessage(sender(), message);
         }
 
-        public static void setBehavior(AbstractRaftActorBehavior behavior){
-            ForwardMessageToBehaviorActor.behavior = behavior;
+        public static Props props() {
+            return Props.create(ForwardMessageToBehaviorActor.class);
         }
     }
 
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(ForwardMessageToBehaviorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
-            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower", getSystem(), followerActor);
+                    new MockRaftActorContext("follower", getSystem(), followerActor);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+            peerAddresses.put("follower", followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
@@ -975,7 +979,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             //create 3 entries
             leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             leaderActorContext.setCommitIndex(1);
 
@@ -983,37 +987,29 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             // follower too has the exact same log entries and has the same commit index
             followerActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             followerActorContext.setCommitIndex(1);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive(followerActor.path().toString());
-
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
-
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntries.class);
 
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getEntries().size());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
                     leaderActor, AppendEntriesReply.class);
-
             assertNotNull(appendEntriesReply);
 
-            // follower returns its next index
             assertEquals(2, appendEntriesReply.getLogLastIndex());
             assertEquals(1, appendEntriesReply.getLogLastTerm());
 
+            // follower returns its next index
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
         }};
     }
 
@@ -1021,68 +1017,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(ForwardMessageToBehaviorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
-            ActorRef followerActor = getSystem().actorOf(
-                Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower", getSystem(), followerActor);
+                    new MockRaftActorContext("follower", getSystem(), followerActor);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+            peerAddresses.put("follower", followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leaderActorContext.getReplicatedLog().removeFrom(0);
 
             leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             leaderActorContext.setCommitIndex(1);
 
             followerActorContext.getReplicatedLog().removeFrom(0);
 
             followerActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             // follower has the same log entries but its commit index > leaders commit index
             followerActorContext.setCommitIndex(2);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive(followerActor.path().toString());
-
-            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
-
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntries.class);
 
+            // Initial heartbeat
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getEntries().size());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
                     leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+            leaderActor.underlyingActor().behavior = leader;
+            leader.handleMessage(followerActor, appendEntriesReply);
+
+            leaderActor.underlyingActor().clear();
+            followerActor.underlyingActor().clear();
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                    TimeUnit.MILLISECONDS);
 
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(0, appendEntries.getEntries().size());
+            assertEquals(2, appendEntries.getPrevLogIndex());
+
+            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
             assertNotNull(appendEntriesReply);
 
             assertEquals(2, appendEntriesReply.getLogLastIndex());
             assertEquals(1, appendEntriesReply.getLogLastTerm());
 
+            assertEquals(1, followerActorContext.getCommitIndex());
         }};
     }
 
@@ -1156,8 +1167,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 assertEquals(2, leaderActorContext.getCommitIndex());
 
                 ApplyLogEntries applyLogEntries =
-                    (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
-                        ApplyLogEntries.class);
+                    MessageCollectorActor.getFirstMatching(leaderActor,
+                    ApplyLogEntries.class);
 
                 assertNotNull(applyLogEntries);
 
@@ -1295,78 +1306,91 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(MessageCollectorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
             DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
             configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
             leaderActorContext.setConfigParams(configParams);
 
-            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+                    new MockRaftActorContext("follower-reply", getSystem(), followerActor);
 
             followerActorContext.setConfigParams(configParams);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-reply",
-                followerActor.path().toString());
+                    followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leaderActorContext.getReplicatedLog().removeFrom(0);
+            leaderActorContext.setCommitIndex(-1);
+            leaderActorContext.setLastApplied(-1);
 
-            //create 3 entries
-            leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-
-            leaderActorContext.setCommitIndex(1);
+            followerActorContext.getReplicatedLog().removeFrom(0);
+            followerActorContext.setCommitIndex(-1);
+            followerActorContext.setLastApplied(-1);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive("follower-reply");
+
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+            System.out.println("appendEntriesReply: "+appendEntriesReply);
+            leader.handleMessage(followerActor, appendEntriesReply);
+
+            // Clear initial heartbeat messages
+
+            leaderActor.underlyingActor().clear();
+            followerActor.underlyingActor().clear();
+
+            // create 3 entries
+            leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+            leaderActorContext.setCommitIndex(1);
+            leaderActorContext.setLastApplied(1);
 
             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+                    TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
-                .getFirstMatching(followerActor, AppendEntries.class);
-
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
+            // Should send first log entry
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-            assertEquals(0, appendEntries.getPrevLogIndex());
-
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+            assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(-1, appendEntries.getPrevLogIndex());
 
+            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
             assertNotNull(appendEntriesReply);
 
-            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
-            List<Object> entries = ForwardMessageToBehaviorActor
-                .getAllMatching(followerActor, AppendEntries.class);
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+            assertEquals(0, appendEntriesReply.getLogLastIndex());
 
-            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+            followerActor.underlyingActor().clear();
 
-            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
 
-            assertEquals(1, appendEntriesSecond.getLeaderCommit());
-            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
-            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
 
+            // Should send second log entry
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
         }};
     }
 
index 1b3a8f5..f103abc 100644 (file)
@@ -1,8 +1,6 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import akka.event.LoggingAdapter;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
@@ -13,9 +11,13 @@ import java.util.Map;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SnapshotTrackerTest {
 
+    Logger logger = LoggerFactory.getLogger(getClass());
+
     Map<String, String> data;
     ByteString byteString;
     ByteString chunk1;
@@ -37,14 +39,14 @@ public class SnapshotTrackerTest {
 
     @Test
     public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
-        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
 
         tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
         tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
         tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
 
         // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
-        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+        SnapshotTracker tracker2 = new SnapshotTracker(logger, 2);
 
         tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
         tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
@@ -57,7 +59,7 @@ public class SnapshotTrackerTest {
         }
 
         // The first chunk's index must at least be FIRST_CHUNK_INDEX
-        SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+        SnapshotTracker tracker3 = new SnapshotTracker(logger, 2);
 
         try {
             tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
@@ -67,7 +69,7 @@ public class SnapshotTrackerTest {
         }
 
         // Out of sequence chunk indexes won't work
-        SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+        SnapshotTracker tracker4 = new SnapshotTracker(logger, 2);
 
         tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
 
@@ -80,7 +82,7 @@ public class SnapshotTrackerTest {
 
         // No exceptions will be thrown when invalid chunk is added with the right sequence
         // If the lastChunkHashCode is missing
-        SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+        SnapshotTracker tracker5 = new SnapshotTracker(logger, 2);
 
         tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
         // Look I can add the same chunk again
@@ -88,7 +90,7 @@ public class SnapshotTrackerTest {
 
         // An exception will be thrown when an invalid chunk is addedd with the right sequence
         // when the lastChunkHashCode is present
-        SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+        SnapshotTracker tracker6 = new SnapshotTracker(logger, 2);
 
         tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
 
@@ -106,7 +108,7 @@ public class SnapshotTrackerTest {
     public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
 
         // Trying to get a snapshot before all chunks have been received will throw an exception
-        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
 
         tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
         try {
@@ -116,7 +118,7 @@ public class SnapshotTrackerTest {
 
         }
 
-        SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+        SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
 
         tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
         tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
@@ -129,7 +131,7 @@ public class SnapshotTrackerTest {
 
     @Test
     public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
-        SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
 
         ByteString chunks = chunk1.concat(chunk2);
 
index 3469a95..c5acb1f 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -23,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 
 public class MessageCollectorActor extends UntypedActor {
-    private List<Object> messages = new ArrayList<>();
+    private final List<Object> messages = new ArrayList<>();
 
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
@@ -35,6 +36,10 @@ public class MessageCollectorActor extends UntypedActor {
         }
     }
 
+    public void clear() {
+        messages.clear();
+    }
+
     public static List<Object> getAllMessages(ActorRef actor) throws Exception {
         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
         Timeout operationTimeout = new Timeout(operationDuration);
@@ -53,13 +58,17 @@ public class MessageCollectorActor extends UntypedActor {
      * @param clazz
      * @return
      */
-    public static Object getFirstMatching(ActorRef actor, Class<?> clazz) throws Exception {
-        List<Object> allMessages = getAllMessages(actor);
+    public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
+        for(int i = 0; i < 50; i++) {
+            List<Object> allMessages = getAllMessages(actor);
 
-        for(Object message : allMessages){
-            if(message.getClass().equals(clazz)){
-                return message;
+            for(Object message : allMessages){
+                if(message.getClass().equals(clazz)){
+                    return (T) message;
+                }
             }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
         }
 
         return null;
index 21a0cb6..a604b05 100644 (file)
@@ -9,12 +9,11 @@
 package org.opendaylight.controller.cluster.common.actor;
 
 import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractUntypedActor extends UntypedActor {
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     public AbstractUntypedActor() {
         if(LOG.isDebugEnabled()) {
index 8a6217d..95ee216 100644 (file)
@@ -8,17 +8,16 @@
 
 package org.opendaylight.controller.cluster.common.actor;
 
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
 
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     public AbstractUntypedPersistentActor() {
         if(LOG.isDebugEnabled()) {
@@ -119,7 +118,7 @@ public abstract class AbstractUntypedPersistentActor extends UntypedPersistentAc
             try {
                 procedure.apply(o);
             } catch (Exception e) {
-                LOG.error(e, "An unexpected error occurred");
+                LOG.error("An unexpected error occurred", e);
             }
         }
 
index d6030ea..e27546f 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-impl</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
index 01e42db..cee781f 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.util.Timeout;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.text.WordUtils;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
@@ -25,37 +26,43 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class DatastoreContext {
 
-    private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
-    private final Duration shardTransactionIdleTimeout;
-    private final int operationTimeoutInSeconds;
-    private final String dataStoreMXBeanType;
-    private final ConfigParams shardRaftConfig;
-    private final int shardTransactionCommitTimeoutInSeconds;
-    private final int shardTransactionCommitQueueCapacity;
-    private final Timeout shardInitializationTimeout;
-    private final Timeout shardLeaderElectionTimeout;
-    private final boolean persistent;
-    private final ConfigurationReader configurationReader;
-    private final long shardElectionTimeoutFactor;
-
-    private DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
-            ConfigParams shardRaftConfig, String dataStoreMXBeanType, int operationTimeoutInSeconds,
-            Duration shardTransactionIdleTimeout, int shardTransactionCommitTimeoutInSeconds,
-            int shardTransactionCommitQueueCapacity, Timeout shardInitializationTimeout,
-            Timeout shardLeaderElectionTimeout,
-            boolean persistent, ConfigurationReader configurationReader, long shardElectionTimeoutFactor) {
-        this.dataStoreProperties = dataStoreProperties;
-        this.shardRaftConfig = shardRaftConfig;
-        this.dataStoreMXBeanType = dataStoreMXBeanType;
-        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
-        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
-        this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
-        this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
-        this.shardInitializationTimeout = shardInitializationTimeout;
-        this.shardLeaderElectionTimeout = shardLeaderElectionTimeout;
-        this.persistent = persistent;
-        this.configurationReader = configurationReader;
-        this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+    public static final Duration DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT = Duration.create(10, TimeUnit.MINUTES);
+    public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 5;
+    public static final int DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS = 30;
+    public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1000;
+    public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
+    public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
+    public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+    public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+    public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
+    public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
+    public static final boolean DEFAULT_PERSISTENT = true;
+    public static final FileConfigurationReader DEFAULT_CONFIGURATION_READER = new FileConfigurationReader();
+    public static final int DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE = 12;
+    public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
+    public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
+    public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+
+    private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+    private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
+    private int operationTimeoutInSeconds = DEFAULT_OPERATION_TIMEOUT_IN_SECONDS;
+    private String dataStoreMXBeanType;
+    private int shardTransactionCommitTimeoutInSeconds = DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS;
+    private int shardTransactionCommitQueueCapacity = DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY;
+    private Timeout shardInitializationTimeout = DEFAULT_SHARD_INITIALIZATION_TIMEOUT;
+    private Timeout shardLeaderElectionTimeout = DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT;
+    private boolean persistent = DEFAULT_PERSISTENT;
+    private ConfigurationReader configurationReader = DEFAULT_CONFIGURATION_READER;
+    private long transactionCreationInitialRateLimit = DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT;
+    private DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
+    private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
+
+    private DatastoreContext(){
+        setShardJournalRecoveryLogBatchSize(DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE);
+        setSnapshotBatchCount(DEFAULT_SNAPSHOT_BATCH_COUNT);
+        setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS);
+        setIsolatedLeaderCheckInterval(DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS);
+        setSnapshotDataThresholdPercentage(DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE);
     }
 
     public static Builder newBuilder() {
@@ -79,7 +86,7 @@ public class DatastoreContext {
     }
 
     public ConfigParams getShardRaftConfig() {
-        return shardRaftConfig;
+        return raftConfig;
     }
 
     public int getShardTransactionCommitTimeoutInSeconds() {
@@ -107,125 +114,140 @@ public class DatastoreContext {
     }
 
     public long getShardElectionTimeoutFactor(){
-        return this.shardElectionTimeoutFactor;
+        return raftConfig.getElectionTimeoutFactor();
+    }
+
+    public String getDataStoreType(){
+        return dataStoreType;
+    }
+
+    public long getTransactionCreationInitialRateLimit() {
+        return transactionCreationInitialRateLimit;
+    }
+
+    private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+        raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
+                TimeUnit.MILLISECONDS));
+    }
+
+    private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+        raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
+    }
+
+
+    private void setIsolatedLeaderCheckInterval(long shardIsolatedLeaderCheckIntervalInMillis) {
+        raftConfig.setIsolatedLeaderCheckInterval(
+                new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+    }
+
+    private void setElectionTimeoutFactor(long shardElectionTimeoutFactor) {
+        raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
+    }
+
+    private void setSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+        raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
+    }
+
+    private void setSnapshotBatchCount(int shardSnapshotBatchCount) {
+        raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
     }
 
     public static class Builder {
-        private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
-        private Duration shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
-        private int operationTimeoutInSeconds = 5;
-        private String dataStoreMXBeanType;
-        private int shardTransactionCommitTimeoutInSeconds = 30;
-        private int shardJournalRecoveryLogBatchSize = 1000;
-        private int shardSnapshotBatchCount = 20000;
-        private int shardHeartbeatIntervalInMillis = 500;
-        private int shardTransactionCommitQueueCapacity = 20000;
-        private Timeout shardInitializationTimeout = new Timeout(5, TimeUnit.MINUTES);
-        private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
-        private boolean persistent = true;
-        private ConfigurationReader configurationReader = new FileConfigurationReader();
-        private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
-        private int shardSnapshotDataThresholdPercentage = 12;
-        private long shardElectionTimeoutFactor = 2;
+        private DatastoreContext datastoreContext = new DatastoreContext();
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
-            this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+            datastoreContext.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
             return this;
         }
 
         public Builder operationTimeoutInSeconds(int operationTimeoutInSeconds) {
-            this.operationTimeoutInSeconds = operationTimeoutInSeconds;
+            datastoreContext.operationTimeoutInSeconds = operationTimeoutInSeconds;
             return this;
         }
 
         public Builder dataStoreMXBeanType(String dataStoreMXBeanType) {
-            this.dataStoreMXBeanType = dataStoreMXBeanType;
+            datastoreContext.dataStoreMXBeanType = dataStoreMXBeanType;
             return this;
         }
 
         public Builder dataStoreProperties(InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
-            this.dataStoreProperties = dataStoreProperties;
+            datastoreContext.dataStoreProperties = dataStoreProperties;
             return this;
         }
 
         public Builder shardTransactionCommitTimeoutInSeconds(int shardTransactionCommitTimeoutInSeconds) {
-            this.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
+            datastoreContext.shardTransactionCommitTimeoutInSeconds = shardTransactionCommitTimeoutInSeconds;
             return this;
         }
 
         public Builder shardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
-            this.shardJournalRecoveryLogBatchSize = shardJournalRecoveryLogBatchSize;
+            datastoreContext.setShardJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             return this;
         }
 
         public Builder shardSnapshotBatchCount(int shardSnapshotBatchCount) {
-            this.shardSnapshotBatchCount = shardSnapshotBatchCount;
+            datastoreContext.setSnapshotBatchCount(shardSnapshotBatchCount);
             return this;
         }
 
         public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
-            this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+            datastoreContext.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             return this;
         }
 
-
         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
-            this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
+            datastoreContext.setHeartbeatInterval(shardHeartbeatIntervalInMillis);
             return this;
         }
 
         public Builder shardTransactionCommitQueueCapacity(int shardTransactionCommitQueueCapacity) {
-            this.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
+            datastoreContext.shardTransactionCommitQueueCapacity = shardTransactionCommitQueueCapacity;
             return this;
         }
 
         public Builder shardInitializationTimeout(long timeout, TimeUnit unit) {
-            this.shardInitializationTimeout = new Timeout(timeout, unit);
+            datastoreContext.shardInitializationTimeout = new Timeout(timeout, unit);
             return this;
         }
 
         public Builder shardLeaderElectionTimeout(long timeout, TimeUnit unit) {
-            this.shardLeaderElectionTimeout = new Timeout(timeout, unit);
+            datastoreContext.shardLeaderElectionTimeout = new Timeout(timeout, unit);
             return this;
         }
 
         public Builder configurationReader(ConfigurationReader configurationReader){
-            this.configurationReader = configurationReader;
+            datastoreContext.configurationReader = configurationReader;
             return this;
         }
 
         public Builder persistent(boolean persistent){
-            this.persistent = persistent;
+            datastoreContext.persistent = persistent;
             return this;
         }
 
         public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
-            this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+            datastoreContext.setIsolatedLeaderCheckInterval(shardIsolatedLeaderCheckIntervalInMillis);
             return this;
         }
 
         public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
-            this.shardElectionTimeoutFactor = shardElectionTimeoutFactor;
+            datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
             return this;
         }
 
+        public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+            datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
+            return this;
+        }
 
-        public DatastoreContext build() {
-            DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
-            raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
-                    TimeUnit.MILLISECONDS));
-            raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
-            raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
-            raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
-            raftConfig.setElectionTimeoutFactor(shardElectionTimeoutFactor);
-            raftConfig.setIsolatedLeaderCheckInterval(
-                new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
+        public Builder dataStoreType(String dataStoreType){
+            datastoreContext.dataStoreType = dataStoreType;
+            datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreType) + "Datastore";
+            return this;
+        }
 
-            return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
-                    operationTimeoutInSeconds, shardTransactionIdleTimeout,
-                    shardTransactionCommitTimeoutInSeconds, shardTransactionCommitQueueCapacity,
-                    shardInitializationTimeout, shardLeaderElectionTimeout,
-                    persistent, configurationReader, shardElectionTimeoutFactor);
+        public DatastoreContext build() {
+            return datastoreContext;
         }
     }
 }
index 930c5f7..107c959 100644 (file)
@@ -39,20 +39,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     private final ActorContext actorContext;
 
-    public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
+    public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
             Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
-        Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
 
+        String type = datastoreContext.getDataStoreType();
+
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
         actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
-                ShardManager.props(type, cluster, configuration, datastoreContext)
+                ShardManager.props(cluster, configuration, datastoreContext)
                     .withMailbox(ActorContext.MAILBOX), shardManagerId ),
                 cluster, configuration, datastoreContext);
     }
@@ -94,11 +95,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
index 5d63c92..a9a735e 100644 (file)
@@ -22,13 +22,13 @@ public class DistributedDataStoreFactory {
 
     private static volatile ActorSystem persistentActorSystem = null;
 
-    public static DistributedDataStore createInstance(String name, SchemaService schemaService,
+    public static DistributedDataStore createInstance(SchemaService schemaService,
                                                       DatastoreContext datastoreContext, BundleContext bundleContext) {
 
         ActorSystem actorSystem = getOrCreateInstance(bundleContext, datastoreContext.getConfigurationReader());
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
-                new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
+                new DistributedDataStore(actorSystem, new ClusterWrapperImpl(actorSystem),
                         config, datastoreContext);
 
         ShardStrategyFactory.setConfiguration(config);
index 744e2c2..87a0fb9 100644 (file)
@@ -12,8 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
@@ -101,8 +99,6 @@ public class Shard extends RaftActor {
     // The state of this Shard
     private final InMemoryDOMDataStore store;
 
-    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
-
     /// The name of this shard
     private final ShardIdentifier name;
 
@@ -220,8 +216,8 @@ public class Shard extends RaftActor {
         }
 
         if (message instanceof RecoveryFailure){
-            LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
-                    persistenceId());
+            LOG.error("{}: Recovery failed because of this cause",
+                    persistenceId(), ((RecoveryFailure) message).cause());
 
             // Even though recovery failed, we still need to finish our recovery, eg send the
             // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
@@ -274,7 +270,7 @@ public class Shard extends RaftActor {
         if(cohortEntry != null) {
             long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
             if(elapsed > transactionCommitTimeout) {
-                LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+                LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
@@ -322,8 +318,8 @@ public class Shard extends RaftActor {
                         new ModificationPayload(cohortEntry.getModification()));
             }
         } catch (Exception e) {
-            LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
-                    persistenceId(), cohortEntry.getTransactionID());
+            LOG.error("{} An exception occurred while preCommitting transaction {}",
+                    persistenceId(), cohortEntry.getTransactionID(), e);
             shardMBean.incrementFailedTransactionsCount();
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
@@ -376,7 +372,8 @@ public class Shard extends RaftActor {
         } catch (Exception e) {
             sender.tell(new akka.actor.Status.Failure(e), getSelf());
 
-            LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
+            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+                    transactionID, e);
             shardMBean.incrementFailedTransactionsCount();
         } finally {
             commitCoordinator.currentTransactionComplete(transactionID, true);
@@ -445,7 +442,7 @@ public class Shard extends RaftActor {
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.error(t, "{}: An exception happened during abort", persistenceId());
+                    LOG.error("{}: An exception happened during abort", persistenceId(), t);
 
                     if(sender != null) {
                         sender.tell(new akka.actor.Status.Failure(t), self);
@@ -580,7 +577,7 @@ public class Shard extends RaftActor {
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
             shardMBean.incrementFailedTransactionsCount();
-            LOG.error(e, "{}: Failed to commit", persistenceId());
+            LOG.error("{}: Failed to commit", persistenceId(), e);
         }
     }
 
@@ -667,7 +664,7 @@ public class Shard extends RaftActor {
             try {
                 currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
-                LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+                LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
             }
         } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
@@ -722,7 +719,7 @@ public class Shard extends RaftActor {
                     shardMBean.incrementCommittedTransactionCount();
                 } catch (InterruptedException | ExecutionException e) {
                     shardMBean.incrementFailedTransactionsCount();
-                    LOG.error(e, "{}: Failed to commit", persistenceId());
+                    LOG.error("{}: Failed to commit", persistenceId(), e);
                 }
             }
         }
@@ -752,7 +749,7 @@ public class Shard extends RaftActor {
             try {
                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
-                LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+                LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
             }
         }
         else if (data instanceof CompositeModificationPayload) {
@@ -835,7 +832,7 @@ public class Shard extends RaftActor {
             transaction.write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
-            LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
+            LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
         } finally {
             LOG.info("{}: Done applying snapshot", persistenceId());
         }
index 165e272..8b95404 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.event.LoggingAdapter;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.util.LinkedList;
@@ -20,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -36,11 +36,11 @@ public class ShardCommitCoordinator {
 
     private final int queueCapacity;
 
-    private final LoggingAdapter log;
+    private final Logger log;
 
     private final String name;
 
-    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+    public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
             String name) {
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
index 22e2dbd..3dbac00 100644 (file)
@@ -15,8 +15,6 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.japi.Function;
 import akka.japi.Procedure;
@@ -54,6 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
 /**
@@ -67,8 +67,7 @@ import scala.concurrent.duration.Duration;
  */
 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -97,17 +96,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private final DataPersistenceProvider dataPersistenceProvider;
 
     /**
-     * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
-     *             configuration or operational
      */
-    protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
             DatastoreContext datastoreContext) {
 
-        this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
+        this.type = datastoreContext.getDataStoreType();
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -119,16 +116,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
     }
 
-    public static Props props(final String type,
+    public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
         final DatastoreContext datastoreContext) {
 
-        Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
 
-        return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
     }
 
     @Override
@@ -186,7 +182,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 knownModules = ImmutableSet.copyOf(msg.getModules());
             } else if (message instanceof RecoveryFailure) {
                 RecoveryFailure failure = (RecoveryFailure) message;
-                LOG.error(failure.cause(), "Recovery failed");
+                LOG.error("Recovery failed", failure.cause());
             } else if (message instanceof RecoveryCompleted) {
                 LOG.info("Recovery complete : {}", persistenceId());
 
@@ -424,12 +420,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             new Function<Throwable, SupervisorStrategy.Directive>() {
                 @Override
                 public SupervisorStrategy.Directive apply(Throwable t) {
-                    StringBuilder sb = new StringBuilder();
-                    for(StackTraceElement element : t.getStackTrace()) {
-                       sb.append("\n\tat ")
-                         .append(element.toString());
-                    }
-                    LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
+                    LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
                     return SupervisorStrategy.resume();
                 }
             }
@@ -535,14 +526,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private static class ShardManagerCreator implements Creator<ShardManager> {
         private static final long serialVersionUID = 1L;
 
-        final String type;
         final ClusterWrapper cluster;
         final Configuration configuration;
         final DatastoreContext datastoreContext;
 
-        ShardManagerCreator(String type, ClusterWrapper cluster,
+        ShardManagerCreator(ClusterWrapper cluster,
                 Configuration configuration, DatastoreContext datastoreContext) {
-            this.type = type;
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
@@ -550,7 +539,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(type, cluster, configuration, datastoreContext);
+            return new ShardManager(cluster, configuration, datastoreContext);
         }
     }
 
index 2a97036..5052857 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.event.LoggingAdapter;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
@@ -22,6 +21,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
 
 /**
  * Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
@@ -40,10 +40,10 @@ class ShardRecoveryCoordinator {
     private final SchemaContext schemaContext;
     private final String shardName;
     private final ExecutorService executor;
-    private final LoggingAdapter log;
+    private final Logger log;
     private final String name;
 
-    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+    ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
             String name) {
         this.schemaContext = schemaContext;
         this.shardName = shardName;
index 0c3d33a..6dd0ab1 100644 (file)
@@ -10,16 +10,15 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Terminated;
 import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import org.opendaylight.controller.cluster.datastore.messages.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TerminationMonitor extends UntypedActor{
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
 
     public TerminationMonitor(){
-        LOG.info("Created TerminationMonitor");
+        LOG.debug("Created TerminationMonitor");
     }
 
     @Override public void onReceive(Object message) throws Exception {
index 932c36f..4f47226 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -44,6 +47,19 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     private final List<Future<ActorSelection>> cohortFutures;
     private volatile List<ActorSelection> cohorts;
     private final String transactionId;
+    private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+        @Override
+        public void run() {
+        }
+
+        @Override
+        public void success() {
+        }
+
+        @Override
+        public void failure() {
+        }
+    };
 
     public ThreePhaseCommitCohortProxy(ActorContext actorContext,
             List<Future<ActorSelection>> cohortFutures, String transactionId) {
@@ -151,8 +167,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
             }
-
-            futureList.add(actorContext.executeOperationAsync(cohort, message));
+            futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
@@ -179,12 +194,20 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
     @Override
     public ListenableFuture<Void> commit() {
-        return voidOperation("commit",  new CommitTransaction(transactionId).toSerializable(),
-                CommitTransactionReply.SERIALIZABLE_CLASS, true);
+        OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
+                new CommitCallback(actorContext);
+
+        return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
+                CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+    }
+
+    private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
+                                                 final Class<?> expectedResponseClass, final boolean propagateException) {
+        return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
     }
 
     private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
-            final Class<?> expectedResponseClass, final boolean propagateException) {
+                                                 final Class<?> expectedResponseClass, final boolean propagateException, final OperationCallback callback) {
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} {}", transactionId, operationName);
@@ -196,7 +219,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
 
         if(cohorts != null) {
             finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
-                    returnFuture);
+                    returnFuture, callback);
         } else {
             buildCohortList().onComplete(new OnComplete<Void>() {
                 @Override
@@ -213,7 +236,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         }
                     } else {
                         finishVoidOperation(operationName, message, expectedResponseClass,
-                                propagateException, returnFuture);
+                                propagateException, returnFuture, callback);
                     }
                 }
             }, actorContext.getActorSystem().dispatcher());
@@ -223,11 +246,14 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     }
 
     private void finishVoidOperation(final String operationName, final Object message,
-            final Class<?> expectedResponseClass, final boolean propagateException,
-            final SettableFuture<Void> returnFuture) {
+                                     final Class<?> expectedResponseClass, final boolean propagateException,
+                                     final SettableFuture<Void> returnFuture, final OperationCallback callback) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Tx {} finish {}", transactionId, operationName);
         }
+
+        callback.run();
+
         Future<Iterable<Object>> combinedFuture = invokeCohorts(message);
 
         combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@@ -247,6 +273,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                 }
 
                 if(exceptionToPropagate != null) {
+
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: a {} cohort Future failed: {}", transactionId,
                             operationName, exceptionToPropagate);
@@ -265,11 +292,16 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
                         }
                         returnFuture.set(null);
                     }
+
+                    callback.failure();
                 } else {
+
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("Tx {}: {} succeeded", transactionId, operationName);
                     }
                     returnFuture.set(null);
+
+                    callback.success();
                 }
             }
         }, actorContext.getActorSystem().dispatcher());
@@ -279,4 +311,58 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
     List<Future<ActorSelection>> getCohortFutures() {
         return Collections.unmodifiableList(cohortFutures);
     }
+
+    private static interface OperationCallback {
+        void run();
+        void success();
+        void failure();
+    }
+
+    private static class CommitCallback implements OperationCallback{
+
+        private static final Logger LOG = LoggerFactory.getLogger(CommitCallback.class);
+        private static final String COMMIT = "commit";
+
+        private final Timer commitTimer;
+        private final ActorContext actorContext;
+        private Timer.Context timerContext;
+
+        CommitCallback(ActorContext actorContext){
+            this.actorContext = actorContext;
+            commitTimer = actorContext.getOperationTimer(COMMIT);
+        }
+
+        @Override
+        public void run() {
+            timerContext = commitTimer.time();
+        }
+
+        @Override
+        public void success() {
+            timerContext.stop();
+
+            Snapshot timerSnapshot = commitTimer.getSnapshot();
+            double allowedLatencyInNanos = timerSnapshot.get98thPercentile();
+
+            long commitTimeoutInSeconds = actorContext.getDatastoreContext()
+                    .getShardTransactionCommitTimeoutInSeconds();
+            long commitTimeoutInNanos = TimeUnit.SECONDS.toNanos(commitTimeoutInSeconds);
+
+            // Here we are trying to find out how many transactions per second are allowed
+            double newRateLimit = ((double) commitTimeoutInNanos / allowedLatencyInNanos) / commitTimeoutInSeconds;
+
+            LOG.debug("Data Store {} commit rateLimit adjusted to {} allowedLatencyInNanos = {}",
+                    actorContext.getDataStoreType(), newRateLimit, allowedLatencyInNanos);
+
+            actorContext.setTxCreationLimit(newRateLimit);
+        }
+
+        @Override
+        public void failure() {
+            // This would mean we couldn't get a transaction completed in 30 seconds which is
+            // the default transaction commit timeout. Using the timeout information to figure out the rate limit is
+            // not going to be useful - so we leave it as it is
+        }
+    }
+
 }
index 87959ef..ee3a5cc 100644 (file)
@@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
index 30ab97c..f05ef91 100644 (file)
@@ -7,17 +7,17 @@
  */
 package org.opendaylight.controller.cluster.datastore.compat;
 
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
@@ -28,7 +28,7 @@ import akka.japi.Creator;
  */
 public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
 
-    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+    private static final Logger LOG = LoggerFactory.getLogger(BackwardsCompatibleThreePhaseCommitCohort.class);
 
     private final String transactionId;
 
index c9fdf38..cb06c89 100644 (file)
@@ -18,9 +18,13 @@ import akka.actor.PoisonPill;
 import akka.dispatch.Mapper;
 import akka.pattern.AskTimeoutException;
 import akka.util.Timeout;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -54,11 +58,11 @@ import scala.concurrent.duration.FiniteDuration;
  * but should not be passed to actors especially remote actors
  */
 public class ActorContext {
-    private static final Logger
-        LOG = LoggerFactory.getLogger(ActorContext.class);
-
-    public static final String MAILBOX = "bounded-mailbox";
-
+    private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
+    private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
+    private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
+    private static final String METRIC_RATE = "rate";
+    private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
                                                               new Mapper<Throwable, Throwable>() {
         @Override
@@ -74,17 +78,23 @@ public class ActorContext {
             return actualFailure;
         }
     };
+    public static final String MAILBOX = "bounded-mailbox";
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
     private final DatastoreContext datastoreContext;
-    private volatile SchemaContext schemaContext;
     private final FiniteDuration operationDuration;
     private final Timeout operationTimeout;
     private final String selfAddressHostPort;
+    private final RateLimiter txRateLimiter;
+    private final MetricRegistry metricRegistry = new MetricRegistry();
+    private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
     private final int transactionOutstandingOperationLimit;
+    private final Timeout transactionCommitOperationTimeout;
+
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -100,10 +110,13 @@ public class ActorContext {
         this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
         this.datastoreContext = datastoreContext;
+        this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
 
-        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
-                TimeUnit.SECONDS);
+        operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
         operationTimeout = new Timeout(operationDuration);
+        transactionCommitOperationTimeout =  new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
+                TimeUnit.SECONDS));
+
 
         Address selfAddress = clusterWrapper.getSelfAddress();
         if (selfAddress != null && !selfAddress.host().isEmpty()) {
@@ -113,6 +126,7 @@ public class ActorContext {
         }
 
         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
+        jmxReporter.start();
     }
 
     public DatastoreContext getDatastoreContext() {
@@ -446,4 +460,59 @@ public class ActorContext {
     public int getTransactionOutstandingOperationLimit(){
         return transactionOutstandingOperationLimit;
     }
+
+    /**
+     * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
+     * us to create a timer for pretty much anything.
+     *
+     * @param operationName
+     * @return
+     */
+    public Timer getOperationTimer(String operationName){
+        final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
+        return metricRegistry.timer(rate);
+    }
+
+    /**
+     * Get the type of the data store to which this ActorContext belongs
+     *
+     * @return
+     */
+    public String getDataStoreType() {
+        return datastoreContext.getDataStoreType();
+    }
+
+    /**
+     * Set the number of transaction creation permits that are to be allowed
+     *
+     * @param permitsPerSecond
+     */
+    public void setTxCreationLimit(double permitsPerSecond){
+        txRateLimiter.setRate(permitsPerSecond);
+    }
+
+    /**
+     * Get the current transaction creation rate limit
+     * @return
+     */
+    public double getTxCreationLimit(){
+        return txRateLimiter.getRate();
+    }
+
+    /**
+     * Try to acquire a transaction creation permit. Will block if no permits are available.
+     */
+    public void acquireTxCreationPermit(){
+        txRateLimiter.acquire();
+    }
+
+    /**
+     * Return the operation timeout to be used when committing transactions
+     * @return
+     */
+    public Timeout getTransactionCommitOperationTimeout(){
+        return transactionCommitOperationTimeout;
+    }
+
+
 }
index 711c6a3..7e83074 100644 (file)
@@ -41,7 +41,7 @@ public class DistributedConfigDataStoreProviderModule extends
         }
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder()
-                .dataStoreMXBeanType("DistributedConfigDatastore")
+                .dataStoreType("config")
                 .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
@@ -67,9 +67,10 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
                 .build();
 
-        return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+        return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
                 datastoreContext, bundleContext);
     }
 
index d9df06d..0655468 100644 (file)
@@ -41,7 +41,7 @@ public class DistributedOperationalDataStoreProviderModule extends
         }
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder()
-                .dataStoreMXBeanType("DistributedOperationalDatastore")
+                .dataStoreType("operational")
                 .dataStoreProperties(InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue(),
@@ -67,10 +67,11 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardIsolatedLeaderCheckIntervalInMillis(
                     props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
+                .transactionCreationInitialRateLimit(props.getTxCreationInitialRateLimit().getValue())
                 .build();
 
-        return DistributedDataStoreFactory.createInstance("operational",
-                getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
+        return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
+                datastoreContext, bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index 46cd50d..e2ee737 100644 (file)
@@ -180,6 +180,14 @@ module distributed-datastore-provider {
             description "The interval at which the leader of the shard will check if its majority
                         followers are active and term itself as isolated";
         }
+
+        leaf tx-creation-initial-rate-limit {
+            default 100;
+            type non-zero-uint32-type;
+            description "The initial number of transactions per second that are allowed before the data store
+                         should begin applying back pressure. This number is only used as an initial guidance,
+                         subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
new file mode 100644 (file)
index 0000000..3e89823
--- /dev/null
@@ -0,0 +1,37 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DatastoreContextTest {
+
+    private DatastoreContext.Builder builder;
+
+    @Before
+    public void setUp(){
+        builder = new DatastoreContext.Builder();
+    }
+
+    @Test
+    public void testDefaults(){
+        DatastoreContext build = builder.build();
+
+        assertEquals(DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT , build.getShardTransactionIdleTimeout());
+        assertEquals(DatastoreContext.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS, build.getOperationTimeoutInSeconds());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_TIMEOUT_IN_SECONDS, build.getShardTransactionCommitTimeoutInSeconds());
+        assertEquals(DatastoreContext.DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE, build.getShardRaftConfig().getJournalRecoveryLogBatchSize());
+        assertEquals(DatastoreContext.DEFAULT_SNAPSHOT_BATCH_COUNT, build.getShardRaftConfig().getSnapshotBatchCount());
+        assertEquals(DatastoreContext.DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getHeartBeatInterval().length());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY, build.getShardTransactionCommitQueueCapacity());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_INITIALIZATION_TIMEOUT, build.getShardInitializationTimeout());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT, build.getShardLeaderElectionTimeout());
+        assertEquals(DatastoreContext.DEFAULT_PERSISTENT, build.isPersistent());
+        assertEquals(DatastoreContext.DEFAULT_CONFIGURATION_READER, build.getConfigurationReader());
+        assertEquals(DatastoreContext.DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS, build.getShardRaftConfig().getIsolatedCheckInterval().length());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_SNAPSHOT_DATA_THRESHOLD_PERCENTAGE, build.getShardRaftConfig().getSnapshotDataThresholdPercentage());
+        assertEquals(DatastoreContext.DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR, build.getShardRaftConfig().getElectionTimeoutFactor());
+        assertEquals(DatastoreContext.DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT, build.getTransactionCreationInitialRateLimit());
+    }
+
+}
\ No newline at end of file
index 9f5aded..1ad2be7 100644 (file)
@@ -790,8 +790,11 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
             ShardStrategyFactory.setConfiguration(config);
 
+            datastoreContextBuilder.dataStoreType(typeName);
+
             DatastoreContext datastoreContext = datastoreContextBuilder.build();
-            DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
+
+            DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
                     config, datastoreContext);
 
             SchemaContext schemaContext = SchemaContextHelper.full();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
new file mode 100644 (file)
index 0000000..66fa876
--- /dev/null
@@ -0,0 +1,60 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DistributedDataStoreTest extends AbstractActorTest {
+
+    private SchemaContext schemaContext;
+
+    @Mock
+    private ActorContext actorContext;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        schemaContext = TestModel.createTestContext();
+
+        doReturn(schemaContext).when(actorContext).getSchemaContext();
+    }
+
+    @Test
+    public void testRateLimitingUsedInReadWriteTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newReadWriteTransaction();
+
+        verify(actorContext, times(1)).acquireTxCreationPermit();
+    }
+
+    @Test
+    public void testRateLimitingUsedInWriteOnlyTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newWriteOnlyTransaction();
+
+        verify(actorContext, times(1)).acquireTxCreationPermit();
+    }
+
+
+    @Test
+    public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        distributedDataStore.newReadOnlyTransaction();
+        distributedDataStore.newReadOnlyTransaction();
+        distributedDataStore.newReadOnlyTransaction();
+
+        verify(actorContext, times(0)).acquireTxCreationPermit();
+    }
+
+}
\ No newline at end of file
index 8c56efd..596761d 100644 (file)
@@ -1,5 +1,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
@@ -11,6 +16,13 @@ import akka.util.Timeout;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,20 +47,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class ShardManagerTest extends AbstractActorTest {
     private static int ID_COUNTER = 1;
 
@@ -73,8 +71,10 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps() {
-        return ShardManager.props(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
-                DatastoreContext.newBuilder().build());
+
+        DatastoreContext.Builder builder = DatastoreContext.newBuilder();
+        builder.dataStoreType(shardMrgIDSuffix);
+        return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
     }
 
     @Test
@@ -351,10 +351,8 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRecoveryApplicable(){
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).build());
+                final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
                 final TestActorRef<ShardManager> persistentShardManager =
                         TestActorRef.create(getSystem(), persistentProps);
 
@@ -362,10 +360,8 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
 
-                final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
-                        new MockClusterWrapper(),
-                        new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(false).build());
+                final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
                 final TestActorRef<ShardManager> nonPersistentShardManager =
                         TestActorRef.create(getSystem(), nonPersistentProps);
 
@@ -386,7 +382,8 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
+                        DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
                     @Override
                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
@@ -426,8 +423,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
         TestShardManager(String shardMrgIDSuffix) {
-            super(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().build());
+            super(new MockClusterWrapper(), new MockConfiguration(),
+                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
         }
 
         @Override
index 75c93dd..d2396e0 100644 (file)
@@ -3,14 +3,20 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorPath;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
+import akka.util.Timeout;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
@@ -43,11 +49,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     @Mock
     private ActorContext actorContext;
 
+    @Mock
+    private DatastoreContext datastoreContext;
+
+    @Mock
+    private Timer commitTimer;
+
+    @Mock
+    private Timer.Context commitTimerContext;
+
+    @Mock
+    private Snapshot commitSnapshot;
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
         doReturn(getSystem()).when(actorContext).getActorSystem();
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
+        doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
+        doReturn(commitTimerContext).when(commitTimer).time();
+        doReturn(commitSnapshot).when(commitTimer).getSnapshot();
+        doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get98thPercentile();
+        doReturn(10.0).when(actorContext).getTxCreationLimit();
     }
 
     private Future<ActorSelection> newCohort() {
@@ -86,12 +111,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         }
 
         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
-                isA(requestType));
+                isA(requestType), any(Timeout.class));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
         verify(actorContext, times(nCohorts)).executeOperationAsync(
-                any(ActorSelection.class), isA(requestType));
+                any(ActorSelection.class), isA(requestType), any(Timeout.class));
     }
 
     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
@@ -276,8 +301,11 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         try {
             propagateExecutionExceptionCause(proxy.commit());
         } finally {
+
+            verify(actorContext, never()).setTxCreationLimit(anyLong());
             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
         }
+
     }
 
     @Test
@@ -294,11 +322,30 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
                 new CommitTransactionReply(), new CommitTransactionReply());
 
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
         proxy.canCommit().get(5, TimeUnit.SECONDS);
         proxy.preCommit().get(5, TimeUnit.SECONDS);
         proxy.commit().get(5, TimeUnit.SECONDS);
 
         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
+
+        // Verify that the creation limit was changed to 0.5 (based on setup)
+        verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
+    }
+
+    @Test
+    public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
+
+        ThreePhaseCommitCohortProxy proxy = setupProxy(0);
+
+        assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
+
+        proxy.canCommit().get(5, TimeUnit.SECONDS);
+        proxy.preCommit().get(5, TimeUnit.SECONDS);
+        proxy.commit().get(5, TimeUnit.SECONDS);
+
+        verify(actorContext, never()).setTxCreationLimit(anyLong());
     }
 }
index dd37371..23c3a82 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -29,10 +32,17 @@ public class TransactionChainProxyTest extends AbstractActorTest{
     ActorContext actorContext = null;
     SchemaContext schemaContext = mock(SchemaContext.class);
 
+    @Mock
+    ActorContext mockActorContext;
+
     @Before
     public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
         actorContext = new MockActorContext(getSystem());
         actorContext.setSchemaContext(schemaContext);
+
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
     }
 
     @SuppressWarnings("resource")
@@ -76,4 +86,32 @@ public class TransactionChainProxyTest extends AbstractActorTest{
 
         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
     }
+
+    @Test
+    public void testRateLimitingUsedInReadWriteTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newReadWriteTransaction();
+
+        verify(mockActorContext, times(1)).acquireTxCreationPermit();
+    }
+
+    @Test
+    public void testRateLimitingUsedInWriteOnlyTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newWriteOnlyTransaction();
+
+        verify(mockActorContext, times(1)).acquireTxCreationPermit();
+    }
+
+
+    @Test
+    public void testRateLimitingNotUsedInReadOnlyTxCreation(){
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+
+        txChainProxy.newReadOnlyTransaction();
+
+        verify(mockActorContext, times(0)).acquireTxCreationPermit();
+    }
 }
index e4ab969..eae46da 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
@@ -12,10 +13,12 @@ import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.time.StopWatch;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
@@ -265,4 +268,35 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testRateLimiting(){
+        DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+        doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+        doReturn("config").when(mockDataStoreContext).getDataStoreType();
+
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class), mockDataStoreContext);
+
+        // Check that the initial value is being picked up from DataStoreContext
+        assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
+
+        actorContext.setTxCreationLimit(1.0);
+
+        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
+
+
+        StopWatch watch = new StopWatch();
+
+        watch.start();
+
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+        actorContext.acquireTxCreationPermit();
+
+        watch.stop();
+
+        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
+    }
 }
index c8836d1..add889f 100644 (file)
       <groupId>org.opendaylight.controller.model</groupId>
       <artifactId>model-inventory</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools.model</groupId>
+      <artifactId>ietf-topology</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-broker-impl</artifactId>
index 97e2940..460e072 100644 (file)
@@ -26,7 +26,7 @@ import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
 import org.opendaylight.controller.sal.connect.netconf.NetconfStateSchemas;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceSalFacade;
 import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -50,7 +50,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
     private static final Logger logger = LoggerFactory.getLogger(NetconfConnectorModule.class);
 
     private BundleContext bundleContext;
-    private Optional<NetconfSessionCapabilities> userCapabilities;
+    private Optional<NetconfSessionPreferences> userCapabilities;
     private SchemaSourceRegistry schemaRegistry;
     private SchemaContextFactory schemaContextFactory;
 
@@ -97,14 +97,14 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        final RemoteDeviceId id = new RemoteDeviceId(getIdentifier());
+        final RemoteDeviceId id = new RemoteDeviceId(getIdentifier(), getSocketAddress());
 
         final ExecutorService globalProcessingExecutor = getProcessingExecutorDependency().getExecutor();
 
         final Broker domBroker = getDomRegistryDependency();
         final BindingAwareBroker bindingBroker = getBindingRegistryDependency();
 
-        final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade
+        final RemoteDeviceHandler<NetconfSessionPreferences> salFacade
                 = new NetconfDeviceSalFacade(id, domBroker, bindingBroker, bundleContext, globalProcessingExecutor);
 
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO =
@@ -124,7 +124,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
         return new MyAutoCloseable(listener, salFacade);
     }
 
-    private Optional<NetconfSessionCapabilities> getUserCapabilities() {
+    private Optional<NetconfSessionPreferences> getUserCapabilities() {
         if(getYangModuleCapabilities() == null) {
             return Optional.absent();
         }
@@ -134,7 +134,7 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
             return Optional.absent();
         }
 
-        final NetconfSessionCapabilities parsedOverrideCapabilities = NetconfSessionCapabilities.fromStrings(capabilities);
+        final NetconfSessionPreferences parsedOverrideCapabilities = NetconfSessionPreferences.fromStrings(capabilities);
         JmxAttributeValidationException.checkCondition(
                 parsedOverrideCapabilities.getNonModuleCaps().isEmpty(),
                 "Capabilities to override can only contain module based capabilities, non-module capabilities will be retrieved from the device," +
@@ -170,11 +170,11 @@ public final class NetconfConnectorModule extends org.opendaylight.controller.co
     }
 
     private static final class MyAutoCloseable implements AutoCloseable {
-        private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+        private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
         private final NetconfDeviceCommunicator listener;
 
         public MyAutoCloseable(final NetconfDeviceCommunicator listener,
-                final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+                final RemoteDeviceHandler<NetconfSessionPreferences> salFacade) {
             this.listener = listener;
             this.salFacade = salFacade;
         }
index 31779a7..39340fa 100644 (file)
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -30,10 +31,12 @@ import org.opendaylight.controller.sal.connect.api.MessageTransformer;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
 import org.opendaylight.controller.sal.connect.netconf.schema.NetconfRemoteSchemaYangSourceProvider;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
@@ -51,7 +54,7 @@ import org.slf4j.LoggerFactory;
 /**
  *  This is a mediator between NetconfDeviceCommunicator and NetconfDeviceSalFacade
  */
-public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilities, NetconfMessage> {
+public final class NetconfDevice implements RemoteDevice<NetconfSessionPreferences, NetconfMessage> {
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfDevice.class);
 
@@ -65,7 +68,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final RemoteDeviceId id;
 
     private final SchemaContextFactory schemaContextFactory;
-    private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+    private final RemoteDeviceHandler<NetconfSessionPreferences> salFacade;
     private final ListeningExecutorService processingExecutor;
     private final SchemaSourceRegistry schemaRegistry;
     private final MessageTransformer<NetconfMessage> messageTransformer;
@@ -73,7 +76,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     private final NotificationHandler notificationHandler;
     private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations = Lists.newArrayList();
 
-    public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade,
+    public NetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
                          final ExecutorService globalProcessingExecutor, final MessageTransformer<NetconfMessage> messageTransformer) {
         this.id = id;
         this.schemaRegistry = schemaResourcesDTO.getSchemaRegistry();
@@ -86,7 +89,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
     }
 
     @Override
-    public void onRemoteSessionUp(final NetconfSessionCapabilities remoteSessionCapabilities,
+    public void onRemoteSessionUp(final NetconfSessionPreferences remoteSessionCapabilities,
                                   final RemoteDeviceCommunicator<NetconfMessage> listener) {
         // SchemaContext setup has to be performed in a dedicated thread since
         // we are in a netty thread in this method
@@ -119,9 +122,10 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
         };
 
         Futures.addCallback(sourceResolverFuture, resolvedSourceCallback);
+
     }
 
-    private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
+    private void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc) {
         updateMessageTransformer(result);
         salFacade.onDeviceConnected(result, remoteSessionCapabilities, deviceRpc);
         notificationHandler.onRemoteSchemaUp();
@@ -173,6 +177,11 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
         resetMessageTransformer();
     }
 
+    @Override
+    public void onRemoteSessionFailed(Throwable throwable) {
+        salFacade.onDeviceFailed(throwable);
+    }
+
     @Override
     public void onNotification(final NetconfMessage notification) {
         notificationHandler.handleNotification(notification);
@@ -210,11 +219,11 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
      */
     private static class DeviceSourcesResolver implements Callable<DeviceSources> {
         private final NetconfDeviceRpc deviceRpc;
-        private final NetconfSessionCapabilities remoteSessionCapabilities;
+        private final NetconfSessionPreferences remoteSessionCapabilities;
         private final RemoteDeviceId id;
         private final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver;
 
-        public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
+        public DeviceSourcesResolver(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id, final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver) {
             this.deviceRpc = deviceRpc;
             this.remoteSessionCapabilities = remoteSessionCapabilities;
             this.id = id;
@@ -287,15 +296,17 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
      */
     private final class RecursiveSchemaSetup implements Runnable {
         private final DeviceSources deviceSources;
-        private final NetconfSessionCapabilities remoteSessionCapabilities;
+        private final NetconfSessionPreferences remoteSessionCapabilities;
         private final NetconfDeviceRpc deviceRpc;
         private final RemoteDeviceCommunicator<NetconfMessage> listener;
+        private NetconfDeviceCapabilities capabilities;
 
-        public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionCapabilities remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
+        public RecursiveSchemaSetup(final DeviceSources deviceSources, final NetconfSessionPreferences remoteSessionCapabilities, final NetconfDeviceRpc deviceRpc, final RemoteDeviceCommunicator<NetconfMessage> listener) {
             this.deviceSources = deviceSources;
             this.remoteSessionCapabilities = remoteSessionCapabilities;
             this.deviceRpc = deviceRpc;
             this.listener = listener;
+            this.capabilities = remoteSessionCapabilities.getNetconfDeviceCapabilities();
         }
 
         @Override
@@ -306,6 +317,7 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
         /**
          * Recursively build schema context, in case of success or final failure notify device
          */
+        // FIXME reimplement without recursion
         private void setUpSchema(final Collection<SourceIdentifier> requiredSources) {
             logger.trace("{}: Trying to build schema context from {}", id, requiredSources);
 
@@ -322,6 +334,9 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                 @Override
                 public void onSuccess(final SchemaContext result) {
                     logger.debug("{}: Schema context built successfully from {}", id, requiredSources);
+                    Collection<QName> filteredQNames = Sets.difference(remoteSessionCapabilities.getModuleBasedCaps(), capabilities.getUnresolvedCapabilites().keySet());
+                    capabilities.addCapabilities(filteredQNames);
+                    capabilities.addNonModuleBasedCapabilities(remoteSessionCapabilities.getNonModuleCaps());
                     handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
                 }
 
@@ -331,12 +346,15 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
                     if (t instanceof MissingSchemaSourceException) {
                         final SourceIdentifier missingSource = ((MissingSchemaSourceException) t).getSourceId();
                         logger.warn("{}: Unable to build schema context, missing source {}, will reattempt without it", id, missingSource);
+                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(Sets.newHashSet(missingSource)), FailureReason.MissingSource);
                         setUpSchema(stripMissingSource(requiredSources, missingSource));
 
                     // In case resolution error, try only with resolved sources
                     } else if (t instanceof SchemaResolutionException) {
                         // TODO check for infinite loop
                         final SchemaResolutionException resolutionException = (SchemaResolutionException) t;
+                        final Set<SourceIdentifier> unresolvedSources = resolutionException.getUnsatisfiedImports().keySet();
+                        capabilities.addUnresolvedCapabilities(getQNameFromSourceIdentifiers(unresolvedSources), FailureReason.UnableToResolve);
                         logger.warn("{}: Unable to build schema context, unsatisfied imports {}, will reattempt with resolved only", id, resolutionException.getUnsatisfiedImports());
                         setUpSchema(resolutionException.getResolvedSources());
                     // unknown error, fail
@@ -355,5 +373,29 @@ public final class NetconfDevice implements RemoteDevice<NetconfSessionCapabilit
             Preconditions.checkState(removed, "{}: Trying to remove {} from {} failed", id, sIdToRemove, requiredSources);
             return sourceIdentifiers;
         }
+
+        private Collection<QName> getQNameFromSourceIdentifiers(Collection<SourceIdentifier> identifiers) {
+            Collection<QName> qNames = new HashSet<>();
+            for (SourceIdentifier source : identifiers) {
+                Optional<QName> qname = getQNameFromSourceIdentifier(source);
+                if (qname.isPresent()) {
+                    qNames.add(qname.get());
+                }
+            }
+            if (qNames.isEmpty()) {
+                logger.debug("Unable to map any source identfiers to a capability reported by device : " + identifiers);
+            }
+            return qNames;
+        }
+
+        private Optional<QName> getQNameFromSourceIdentifier(SourceIdentifier identifier) {
+            for (QName qname : remoteSessionCapabilities.getModuleBasedCaps()) {
+                if (qname.getLocalName().equals(identifier.getName())
+                        && qname.getFormattedRevision().equals(identifier.getRevision())) {
+                    return Optional.of(qname);
+                }
+            }
+            throw new IllegalArgumentException("Unable to map identifier to a devices reported capability: " + identifier);
+        }
     }
 }
index d758073..68c1a5c 100644 (file)
@@ -11,7 +11,7 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -40,7 +40,7 @@ public final class NetconfStateSchemas {
      * Factory for NetconfStateSchemas
      */
     public interface NetconfStateSchemasResolver {
-        NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id);
+        NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id);
     }
 
     /**
@@ -49,7 +49,7 @@ public final class NetconfStateSchemas {
     public static final class NetconfStateSchemasResolverImpl implements NetconfStateSchemasResolver {
 
         @Override
-        public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+        public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
             return NetconfStateSchemas.create(deviceRpc, remoteSessionCapabilities, id);
         }
     }
@@ -91,7 +91,7 @@ public final class NetconfStateSchemas {
     /**
      * Issue get request to remote device and parse response to find all schemas under netconf-state/schemas
      */
-    private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+    private static NetconfStateSchemas create(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
         if(remoteSessionCapabilities.isMonitoringSupported() == false) {
             logger.warn("{}: Netconf monitoring not supported on device, cannot detect provided schemas");
             return EMPTY;
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCapabilities.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCapabilities.java
new file mode 100644 (file)
index 0000000..8f30a5c
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.listener;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public final class NetconfDeviceCapabilities {
+    private final Map<QName, FailureReason> unresolvedCapabilites;
+    private final Set<QName> resolvedCapabilities;
+
+    private final Set<String> nonModuleBasedCapabilities;
+
+    public NetconfDeviceCapabilities() {
+        this.unresolvedCapabilites = new HashMap<>();
+        this.resolvedCapabilities = new HashSet<>();
+        this.nonModuleBasedCapabilities = new HashSet<>();
+    }
+
+    public void addUnresolvedCapability(QName source, FailureReason reason) {
+        unresolvedCapabilites.put(source, reason);
+    }
+
+    public void addUnresolvedCapabilities(Collection<QName> capabilities, FailureReason reason) {
+        for (QName s : capabilities) {
+            unresolvedCapabilites.put(s, reason);
+        }
+    }
+
+    public void addCapabilities(Collection<QName> availableSchemas) {
+        resolvedCapabilities.addAll(availableSchemas);
+    }
+
+    public void addNonModuleBasedCapabilities(Collection<String> nonModuleCapabilities) {
+        this.nonModuleBasedCapabilities.addAll(nonModuleCapabilities);
+    }
+
+    public Set<String> getNonModuleBasedCapabilities() {
+        return nonModuleBasedCapabilities;
+    }
+
+    public Map<QName, FailureReason> getUnresolvedCapabilites() {
+        return unresolvedCapabilites;
+    }
+
+    public Set<QName> getResolvedCapabilities() {
+        return resolvedCapabilities;
+    }
+
+}
index aadb911..556fc2f 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.List;
@@ -46,8 +47,8 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
 
     private static final Logger logger = LoggerFactory.getLogger(NetconfDeviceCommunicator.class);
 
-    private final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice;
-    private final Optional<NetconfSessionCapabilities> overrideNetconfCapabilities;
+    private final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice;
+    private final Optional<NetconfSessionPreferences> overrideNetconfCapabilities;
     private final RemoteDeviceId id;
     private final Lock sessionLock = new ReentrantLock();
 
@@ -56,18 +57,18 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
     private NetconfClientSession session;
     private Future<?> initFuture;
 
-    public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice,
-            final NetconfSessionCapabilities netconfSessionCapabilities) {
-        this(id, remoteDevice, Optional.of(netconfSessionCapabilities));
+    public NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice,
+            final NetconfSessionPreferences netconfSessionPreferences) {
+        this(id, remoteDevice, Optional.of(netconfSessionPreferences));
     }
 
     public NetconfDeviceCommunicator(final RemoteDeviceId id,
-                                     final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice) {
-        this(id, remoteDevice, Optional.<NetconfSessionCapabilities>absent());
+                                     final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice) {
+        this(id, remoteDevice, Optional.<NetconfSessionPreferences>absent());
     }
 
-    private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> remoteDevice,
-            final Optional<NetconfSessionCapabilities> overrideNetconfCapabilities) {
+    private NetconfDeviceCommunicator(final RemoteDeviceId id, final RemoteDevice<NetconfSessionPreferences, NetconfMessage> remoteDevice,
+            final Optional<NetconfSessionPreferences> overrideNetconfCapabilities) {
         this.id = id;
         this.remoteDevice = remoteDevice;
         this.overrideNetconfCapabilities = overrideNetconfCapabilities;
@@ -80,16 +81,16 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
             logger.debug("{}: Session established", id);
             this.session = session;
 
-            NetconfSessionCapabilities netconfSessionCapabilities =
-                                             NetconfSessionCapabilities.fromNetconfSession(session);
-            logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionCapabilities);
+            NetconfSessionPreferences netconfSessionPreferences =
+                                             NetconfSessionPreferences.fromNetconfSession(session);
+            logger.trace("{}: Session advertised capabilities: {}", id, netconfSessionPreferences);
 
             if(overrideNetconfCapabilities.isPresent()) {
-                netconfSessionCapabilities = netconfSessionCapabilities.replaceModuleCaps(overrideNetconfCapabilities.get());
-                logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionCapabilities);
+                netconfSessionPreferences = netconfSessionPreferences.replaceModuleCaps(overrideNetconfCapabilities.get());
+                logger.debug("{}: Session capabilities overridden, capabilities that will be used: {}", id, netconfSessionPreferences);
             }
 
-            remoteDevice.onRemoteSessionUp(netconfSessionCapabilities, this);
+            remoteDevice.onRemoteSessionUp(netconfSessionPreferences, this);
         }
         finally {
             sessionLock.unlock();
@@ -103,6 +104,17 @@ public class NetconfDeviceCommunicator implements NetconfClientSessionListener,
         } else {
             initFuture = dispatch.createClient(config);
         }
+
+        initFuture.addListener(new GenericFutureListener<Future<Object>>(){
+
+            @Override
+            public void operationComplete(Future<Object> future) throws Exception {
+                if (!future.isSuccess()) {
+                    logger.debug("{}: Connection failed", id, future.cause());
+                    NetconfDeviceCommunicator.this.remoteDevice.onRemoteSessionFailed(future.cause());
+                }
+            }
+        });
     }
 
     private void tearDown( String reason ) {
@@ -18,7 +18,7 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class NetconfSessionCapabilities {
+public final class NetconfSessionPreferences {
 
     private static final class ParameterMatcher {
         private final Predicate<String> predicate;
@@ -45,7 +45,7 @@ public final class NetconfSessionCapabilities {
         }
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionCapabilities.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfSessionPreferences.class);
     private static final ParameterMatcher MODULE_PARAM = new ParameterMatcher("module=");
     private static final ParameterMatcher REVISION_PARAM = new ParameterMatcher("revision=");
     private static final ParameterMatcher BROKEN_REVISON_PARAM = new ParameterMatcher("amp;revision=");
@@ -60,7 +60,7 @@ public final class NetconfSessionCapabilities {
     private final Set<QName> moduleBasedCaps;
     private final Set<String> nonModuleCaps;
 
-    private NetconfSessionCapabilities(final Set<String> nonModuleCaps, final Set<QName> moduleBasedCaps) {
+    private NetconfSessionPreferences(final Set<String> nonModuleCaps, final Set<QName> moduleBasedCaps) {
         this.nonModuleCaps = Preconditions.checkNotNull(nonModuleCaps);
         this.moduleBasedCaps = Preconditions.checkNotNull(moduleBasedCaps);
     }
@@ -110,17 +110,17 @@ public final class NetconfSessionCapabilities {
                 || containsNonModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING.getNamespace().toString());
     }
 
-    public NetconfSessionCapabilities replaceModuleCaps(final NetconfSessionCapabilities netconfSessionModuleCapabilities) {
+    public NetconfSessionPreferences replaceModuleCaps(final NetconfSessionPreferences netconfSessionModuleCapabilities) {
         final Set<QName> moduleBasedCaps = Sets.newHashSet(netconfSessionModuleCapabilities.getModuleBasedCaps());
 
         // Preserve monitoring module, since it indicates support for ietf-netconf-monitoring
         if(containsModuleCapability(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING)) {
             moduleBasedCaps.add(NetconfMessageTransformUtil.IETF_NETCONF_MONITORING);
         }
-        return new NetconfSessionCapabilities(getNonModuleCaps(), moduleBasedCaps);
+        return new NetconfSessionPreferences(getNonModuleCaps(), moduleBasedCaps);
     }
 
-    public static NetconfSessionCapabilities fromNetconfSession(final NetconfClientSession session) {
+    public static NetconfSessionPreferences fromNetconfSession(final NetconfClientSession session) {
         return fromStrings(session.getServerCapabilities());
     }
 
@@ -132,7 +132,7 @@ public final class NetconfSessionCapabilities {
         return QName.cachedReference(QName.create(URI.create(namespace), null, moduleName).withoutRevision());
     }
 
-    public static NetconfSessionCapabilities fromStrings(final Collection<String> capabilities) {
+    public static NetconfSessionPreferences fromStrings(final Collection<String> capabilities) {
         final Set<QName> moduleBasedCaps = new HashSet<>();
         final Set<String> nonModuleCaps = Sets.newHashSet(capabilities);
 
@@ -176,7 +176,7 @@ public final class NetconfSessionCapabilities {
             addModuleQName(moduleBasedCaps, nonModuleCaps, capability, cachedQName(namespace, moduleName));
         }
 
-        return new NetconfSessionCapabilities(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps));
+        return new NetconfSessionPreferences(ImmutableSet.copyOf(nonModuleCaps), ImmutableSet.copyOf(moduleBasedCaps));
     }
 
 
@@ -184,4 +184,12 @@ public final class NetconfSessionCapabilities {
         moduleBasedCaps.add(qName);
         nonModuleCaps.remove(capability);
     }
+
+    private NetconfDeviceCapabilities capabilities = new NetconfDeviceCapabilities();
+
+    public NetconfDeviceCapabilities getNetconfDeviceCapabilities() {
+        return capabilities;
+    }
+
+
 }
index aa22e87..87ca11d 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.tx.ReadOnlyTx;
 import org.opendaylight.controller.sal.connect.netconf.sal.tx.ReadWriteTx;
 import org.opendaylight.controller.sal.connect.netconf.sal.tx.WriteCandidateTx;
@@ -33,10 +33,10 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 final class NetconfDeviceDataBroker implements DOMDataBroker {
     private final RemoteDeviceId id;
     private final NetconfBaseOps netconfOps;
-    private final NetconfSessionCapabilities netconfSessionPreferences;
+    private final NetconfSessionPreferences netconfSessionPreferences;
     private final DataNormalizer normalizer;
 
-    public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, final NetconfSessionCapabilities netconfSessionPreferences) {
+    public NetconfDeviceDataBroker(final RemoteDeviceId id, final RpcImplementation rpc, final SchemaContext schemaContext, final NetconfSessionPreferences netconfSessionPreferences) {
         this.id = id;
         this.netconfOps = new NetconfBaseOps(rpc);
         this.netconfSessionPreferences = netconfSessionPreferences;
index fc69a7e..3715969 100644 (file)
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
  *
  * All data changes are submitted to an ExecutorService to avoid Thread blocking while sal is waiting for schema.
  */
+@Deprecated
 final class NetconfDeviceDatastoreAdapter implements AutoCloseable {
 
     private static final Logger logger  = LoggerFactory.getLogger(NetconfDeviceDatastoreAdapter.class);
index bdeb129..db8a238 100644 (file)
@@ -16,7 +16,8 @@ import java.util.concurrent.ExecutorService;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
@@ -36,7 +37,7 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionCapabilities> {
+public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessionPreferences> {
 
     private static final Logger logger= LoggerFactory.getLogger(NetconfDeviceSalFacade.class);
 
@@ -63,7 +64,7 @@ public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDevice
 
     @Override
     public synchronized void onDeviceConnected(final SchemaContext schemaContext,
-                                               final NetconfSessionCapabilities netconfSessionPreferences, final RpcImplementation deviceRpc) {
+                                               final NetconfSessionPreferences netconfSessionPreferences, final RpcImplementation deviceRpc) {
 
         // TODO move SchemaAwareRpcBroker from sal-broker-impl, now we have depend on the whole sal-broker-impl
         final RpcProvisionRegistry rpcRegistry = new SchemaAwareRpcBroker(id.getPath().toString(), new SchemaContextProvider() {
@@ -93,12 +94,23 @@ public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDevice
 
         salProvider.getMountInstance().onDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService);
         salProvider.getDatastoreAdapter().updateDeviceState(true, netconfSessionPreferences.getModuleBasedCaps());
+        salProvider.getMountInstance().onTopologyDeviceConnected(schemaContext, domBroker, rpcRegistry, notificationService);
+        salProvider.getTopologyDatastoreAdapter().updateDeviceData(true, netconfSessionPreferences.getNetconfDeviceCapabilities());
     }
 
     @Override
     public synchronized void onDeviceDisconnected() {
         salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
+        salProvider.getTopologyDatastoreAdapter().updateDeviceData(false, new NetconfDeviceCapabilities());
         salProvider.getMountInstance().onDeviceDisconnected();
+        salProvider.getMountInstance().onTopologyDeviceDisconnected();
+    }
+
+    @Override
+    public void onDeviceFailed(Throwable throwable) {
+        salProvider.getTopologyDatastoreAdapter().setDeviceAsFailed(throwable);
+        salProvider.getMountInstance().onDeviceDisconnected();
+        salProvider.getMountInstance().onTopologyDeviceDisconnected();
     }
 
     private void registerRpcsToSal(final SchemaContext schemaContext, final RpcProvisionRegistry rpcRegistry, final RpcImplementation deviceRpc) {
index 171f2f4..dfae165 100644 (file)
@@ -37,6 +37,8 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
     private volatile NetconfDeviceDatastoreAdapter datastoreAdapter;
     private MountInstance mountInstance;
 
+    private volatile NetconfDeviceTopologyAdapter topologyDatastoreAdapter;
+
     public NetconfDeviceSalProvider(final RemoteDeviceId deviceId, final ExecutorService executor) {
         this.id = deviceId;
         this.executor = executor;
@@ -54,6 +56,12 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
         return datastoreAdapter;
     }
 
+    public NetconfDeviceTopologyAdapter getTopologyDatastoreAdapter() {
+        Preconditions.checkState(topologyDatastoreAdapter != null,
+                "%s: Sal provider %s was not initialized by sal. Cannot get topology datastore adapter", id);
+        return topologyDatastoreAdapter;
+    }
+
     @Override
     public void onSessionInitiated(final Broker.ProviderSession session) {
         logger.debug("{}: (BI)Session with sal established {}", id, session);
@@ -75,6 +83,8 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
 
         final DataBroker dataBroker = session.getSALService(DataBroker.class);
         datastoreAdapter = new NetconfDeviceDatastoreAdapter(id, dataBroker);
+
+        topologyDatastoreAdapter = new NetconfDeviceTopologyAdapter(id, dataBroker);
     }
 
     public void close() throws Exception {
@@ -90,11 +100,14 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
         private ObjectRegistration<DOMMountPoint> registration;
         private NotificationPublishService notificationSerivce;
 
+        private ObjectRegistration<DOMMountPoint> topologyRegistration;
+
         MountInstance(final DOMMountPointService mountService, final RemoteDeviceId id) {
             this.mountService = Preconditions.checkNotNull(mountService);
             this.id = Preconditions.checkNotNull(id);
         }
 
+        @Deprecated
         synchronized void onDeviceConnected(final SchemaContext initialCtx,
                 final DOMDataBroker broker, final RpcProvisionRegistry rpc,
                 final NotificationPublishService notificationSerivce) {
@@ -113,6 +126,7 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
             registration = mountBuilder.register();
         }
 
+        @Deprecated
         synchronized void onDeviceDisconnected() {
             if(registration == null) {
                 return;
@@ -128,10 +142,44 @@ final class NetconfDeviceSalProvider implements AutoCloseable, Provider, Binding
             }
         }
 
+        synchronized void onTopologyDeviceConnected(final SchemaContext initialCtx,
+                final DOMDataBroker broker, final RpcProvisionRegistry rpc,
+                final NotificationPublishService notificationSerivce) {
+
+            Preconditions.checkNotNull(mountService, "Closed");
+            Preconditions.checkState(topologyRegistration == null, "Already initialized");
+
+            final DOMMountPointService.DOMMountPointBuilder mountBuilder = mountService.createMountPoint(id.getTopologyPath());
+            mountBuilder.addInitialSchemaContext(initialCtx);
+
+            mountBuilder.addService(DOMDataBroker.class, broker);
+            mountBuilder.addService(RpcProvisionRegistry.class, rpc);
+            this.notificationSerivce = notificationSerivce;
+            mountBuilder.addService(NotificationPublishService.class, notificationSerivce);
+
+            topologyRegistration = mountBuilder.register();
+        }
+
+        synchronized void onTopologyDeviceDisconnected() {
+            if(topologyRegistration == null) {
+                return;
+            }
+
+            try {
+                topologyRegistration.close();
+            } catch (final Exception e) {
+                // Only log and ignore
+                logger.warn("Unable to unregister mount instance for {}. Ignoring exception", id.getTopologyPath(), e);
+            } finally {
+                topologyRegistration = null;
+            }
+        }
+
         @Override
         synchronized public void close() throws Exception {
             if(registration != null) {
                 onDeviceDisconnected();
+                onTopologyDeviceDisconnected();
             }
             mountService = null;
         }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapter.java
new file mode 100644 (file)
index 0000000..83664e4
--- /dev/null
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.AvailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.UnavailableCapabilities;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.UnavailableCapabilitiesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapability.FailureReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.fields.unavailable.capabilities.UnavailableCapabilityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class NetconfDeviceTopologyAdapter implements AutoCloseable {
+
+    public static final Logger logger = LoggerFactory.getLogger(NetconfDeviceTopologyAdapter.class);
+    public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
+        @Override
+        public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
+            return new UnavailableCapabilityBuilder()
+                    .setCapability(input.getKey().toString())
+                    .setFailureReason(input.getValue()).build();
+        }
+    };
+    public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
+        @Override
+        public String apply(QName qName) {
+            return qName.toString();
+        }
+    };
+
+    private final RemoteDeviceId id;
+    private final DataBroker dataService;
+
+    private final InstanceIdentifier<NetworkTopology> networkTopologyPath;
+    private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
+    private static final String UNKNOWN_REASON = "Unknown reason";
+
+    NetconfDeviceTopologyAdapter(final RemoteDeviceId id, final DataBroker dataService) {
+        this.id = id;
+        this.dataService = dataService;
+
+        this.networkTopologyPath = InstanceIdentifier.builder(NetworkTopology.class).build();
+        this.topologyListPath = networkTopologyPath.child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
+
+        initDeviceData();
+    }
+
+     private void initDeviceData() {
+        final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+
+        createNetworkTopologyIfNotPresent(writeTx);
+
+        final InstanceIdentifier<Node> path = id.getTopologyBindingPath();
+        NodeBuilder nodeBuilder = getNodeIdBuilder(id);
+        NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder();
+        netconfNodeBuilder.setConnectionStatus(ConnectionStatus.Connecting);
+        netconfNodeBuilder.setHost(id.getHost());
+        netconfNodeBuilder.setPort(new PortNumber(id.getAddress().getPort()));
+        nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build());
+        Node node = nodeBuilder.build();
+
+        logger.trace("{}: Init device state transaction {} putting if absent operational data started.", id, writeTx.getIdentifier());
+        writeTx.put(LogicalDatastoreType.OPERATIONAL, path, node);
+        logger.trace("{}: Init device state transaction {} putting operational data ended.", id, writeTx.getIdentifier());
+
+        logger.trace("{}: Init device state transaction {} putting if absent config data started.", id, writeTx.getIdentifier());
+        writeTx.put(LogicalDatastoreType.CONFIGURATION, path, getNodeWithId(id));
+        logger.trace("{}: Init device state transaction {} putting config data ended.", id, writeTx.getIdentifier());
+
+        commitTransaction(writeTx, "init");
+    }
+
+    public void updateDeviceData(boolean up, NetconfDeviceCapabilities capabilities) {
+        final Node data = buildDataForNetconfNode(up, capabilities);
+
+        final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+        logger.trace("{}: Update device state transaction {} merging operational data started.", id, writeTx.getIdentifier());
+        writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath(), data);
+        logger.trace("{}: Update device state transaction {} merging operational data ended.", id, writeTx.getIdentifier());
+
+        commitTransaction(writeTx, "update");
+    }
+
+    public void setDeviceAsFailed(Throwable throwable) {
+        String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
+
+        final NetconfNode netconfNode = new NetconfNodeBuilder().setConnectionStatus(ConnectionStatus.UnableToConnect).setConnectedMessage(reason).build();
+        final Node data = getNodeIdBuilder(id).addAugmentation(NetconfNode.class, netconfNode).build();
+
+        final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+        logger.trace("{}: Setting device state as failed {} putting operational data started.", id, writeTx.getIdentifier());
+        writeTx.put(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath(), data);
+        logger.trace("{}: Setting device state as failed {} putting operational data ended.", id, writeTx.getIdentifier());
+
+        commitTransaction(writeTx, "update-failed-device");
+    }
+
+    private Node buildDataForNetconfNode(boolean up, NetconfDeviceCapabilities capabilities) {
+        List<String> capabilityList = new ArrayList<>();
+        capabilityList.addAll(capabilities.getNonModuleBasedCapabilities());
+        capabilityList.addAll(FluentIterable.from(capabilities.getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
+        final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
+        avCapabalitiesBuilder.setAvailableCapability(capabilityList);
+
+        final UnavailableCapabilities unavailableCapabilities =
+                new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(capabilities.getUnresolvedCapabilites().entrySet())
+                        .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
+
+        final NetconfNodeBuilder netconfNodeBuilder = new NetconfNodeBuilder()
+                .setHost(id.getHost())
+                .setPort(new PortNumber(id.getAddress().getPort()))
+                .setConnectionStatus(up ? ConnectionStatus.Connected : ConnectionStatus.Connecting)
+                .setAvailableCapabilities(avCapabalitiesBuilder.build())
+                .setUnavailableCapabilities(unavailableCapabilities);
+
+        final NodeBuilder nodeBuilder = getNodeIdBuilder(id);
+        final Node node = nodeBuilder.addAugmentation(NetconfNode.class, netconfNodeBuilder.build()).build();
+
+        return node;
+    }
+
+    public void removeDeviceConfiguration() {
+        final WriteTransaction writeTx = dataService.newWriteOnlyTransaction();
+
+        logger.trace("{}: Close device state transaction {} removing all data started.", id, writeTx.getIdentifier());
+        writeTx.delete(LogicalDatastoreType.CONFIGURATION, id.getTopologyBindingPath());
+        writeTx.delete(LogicalDatastoreType.OPERATIONAL, id.getTopologyBindingPath());
+        logger.trace("{}: Close device state transaction {} removing all data ended.", id, writeTx.getIdentifier());
+
+        commitTransaction(writeTx, "close");
+    }
+
+    private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) {
+
+        final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
+        logger.trace("{}: Merging {} container to ensure its presence", id, networkTopology.QNAME, writeTx.getIdentifier());
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, networkTopologyPath, networkTopology);
+        writeTx.merge(LogicalDatastoreType.OPERATIONAL, networkTopologyPath, networkTopology);
+
+        final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(TopologyNetconf.QNAME.getLocalName())).build();
+        logger.trace("{}: Merging {} container to ensure its presence", id, topology.QNAME, writeTx.getIdentifier());
+        writeTx.merge(LogicalDatastoreType.CONFIGURATION, topologyListPath, topology);
+        writeTx.merge(LogicalDatastoreType.OPERATIONAL, topologyListPath, topology);
+    }
+
+    private void commitTransaction(final WriteTransaction transaction, final String txType) {
+        logger.trace("{}: Committing Transaction {}:{}", id, txType, transaction.getIdentifier());
+        final CheckedFuture<Void, TransactionCommitFailedException> result = transaction.submit();
+
+        Futures.addCallback(result, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                logger.trace("{}: Transaction({}) {} SUCCESSFUL", id, txType, transaction.getIdentifier());
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                logger.error("{}: Transaction({}) {} FAILED!", id, txType, transaction.getIdentifier(), t);
+                throw new IllegalStateException(id + "  Transaction(" + txType + ") not committed correctly", t);
+            }
+        });
+
+    }
+
+    private static Node getNodeWithId(final RemoteDeviceId id) {
+        final NodeBuilder builder = getNodeIdBuilder(id);
+        return builder.build();
+    }
+
+    private static NodeBuilder getNodeIdBuilder(final RemoteDeviceId id) {
+        final NodeBuilder nodeBuilder = new NodeBuilder();
+        nodeBuilder.setKey(new NodeKey(new NodeId(id.getName())));
+        return nodeBuilder;
+    }
+
+    @Override
+    public void close() throws Exception {
+        removeDeviceConfiguration();
+    }
+}
index 165d9c4..435ef99 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -27,11 +27,11 @@ public abstract class AbstractWriteTx implements DOMDataWriteTransaction {
     protected final RemoteDeviceId id;
     protected final NetconfBaseOps netOps;
     protected final DataNormalizer normalizer;
-    protected final NetconfSessionCapabilities netconfSessionPreferences;
+    protected final NetconfSessionPreferences netconfSessionPreferences;
     // Allow commit to be called only once
     protected boolean finished = false;
 
-    public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+    public AbstractWriteTx(final NetconfBaseOps netOps, final RemoteDeviceId id, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
         this.netOps = netOps;
         this.id = id;
         this.normalizer = normalizer;
index 4a9a939..710700b 100644 (file)
@@ -12,7 +12,7 @@ import com.google.common.base.Function;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -32,7 +32,7 @@ public class WriteCandidateRunningTx extends WriteCandidateTx {
 
     private static final Logger LOG  = LoggerFactory.getLogger(WriteCandidateRunningTx.class);
 
-    public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+    public WriteCandidateRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
         super(id, netOps, normalizer, netconfSessionPreferences);
     }
 
index 0ea6298..f9bf3c7 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -69,7 +69,7 @@ public class WriteCandidateTx extends AbstractWriteTx {
         }
     };
 
-    public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+    public WriteCandidateTx(final RemoteDeviceId id, final NetconfBaseOps rpc, final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
         super(rpc, id, normalizer, netconfSessionPreferences);
     }
 
index 28173b1..f92e40f 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfRpcFutureCallback;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -50,7 +50,7 @@ public class WriteRunningTx extends AbstractWriteTx {
     private static final Logger LOG  = LoggerFactory.getLogger(WriteRunningTx.class);
 
     public WriteRunningTx(final RemoteDeviceId id, final NetconfBaseOps netOps,
-                          final DataNormalizer normalizer, final NetconfSessionCapabilities netconfSessionPreferences) {
+                          final DataNormalizer normalizer, final NetconfSessionPreferences netconfSessionPreferences) {
         super(netOps, id, normalizer, netconfSessionPreferences);
     }
 
index 333b42e..7f13a7a 100644 (file)
@@ -7,33 +7,67 @@
  */
 package org.opendaylight.controller.sal.connect.util;
 
+import com.google.common.base.Preconditions;
+import java.net.InetSocketAddress;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.HostBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class RemoteDeviceId {
+public final class RemoteDeviceId {
 
     private final String name;
     private final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier path;
     private final InstanceIdentifier<Node> bindingPath;
     private final NodeKey key;
+    private final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier topologyPath;
+    private final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> topologyBindingPath;
+    private InetSocketAddress address;
+    private Host host;
 
+    @Deprecated
     public RemoteDeviceId(final ModuleIdentifier identifier) {
         this(Preconditions.checkNotNull(identifier).getInstanceName());
     }
 
+    public RemoteDeviceId(final ModuleIdentifier identifier, Host host) {
+        this(identifier);
+        this.host = host;
+    }
+
+    public RemoteDeviceId(final ModuleIdentifier identifier, InetSocketAddress address) {
+        this(identifier);
+        this.address = address;
+        this.host = buildHost();
+    }
+
+    @Deprecated
     public RemoteDeviceId(final String name) {
         Preconditions.checkNotNull(name);
         this.name = name;
         this.key = new NodeKey(new NodeId(name));
         this.path = createBIPath(name);
         this.bindingPath = createBindingPath(key);
+        this.topologyPath = createBIPathForTopology(name);
+        this.topologyBindingPath = createBindingPathForTopology(key);
+    }
+
+    public RemoteDeviceId(final String name, InetSocketAddress address) {
+        this(name);
+        this.address = address;
+        this.host = buildHost();
     }
 
     private static InstanceIdentifier<Node> createBindingPath(final NodeKey key) {
@@ -48,6 +82,32 @@ public class RemoteDeviceId {
         return builder.build();
     }
 
+    private static InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> createBindingPathForTopology(final NodeKey key) {
+        final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.builder(NetworkTopology.class).build();
+        final KeyedInstanceIdentifier<Topology, TopologyKey> topology = networkTopology.child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
+        return topology
+                .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class,
+                        new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey
+                                (new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId(key.getId().getValue())));
+    }
+
+    private static org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier createBIPathForTopology(final String name) {
+        final org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.InstanceIdentifierBuilder builder =
+                org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.builder();
+        builder
+                .node(NetworkTopology.QNAME)
+                .nodeWithKey(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), TopologyNetconf.QNAME.getLocalName())
+                .nodeWithKey(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.QNAME,
+                        QName.create(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.QNAME, "node-id"), name);
+        return builder.build();
+    }
+
+    private Host buildHost() {
+        return address.getAddress().getHostAddress() != null
+                ? HostBuilder.getDefaultInstance(address.getAddress().getHostAddress())
+                : HostBuilder.getDefaultInstance(address.getAddress().getHostName());
+    }
+
     public String getName() {
         return name;
     }
@@ -64,6 +124,22 @@ public class RemoteDeviceId {
         return key;
     }
 
+    public InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node> getTopologyBindingPath() {
+        return topologyBindingPath;
+    }
+
+    public YangInstanceIdentifier getTopologyPath() {
+        return topologyPath;
+    }
+
+    public InetSocketAddress getAddress() {
+        return address;
+    }
+
+    public Host getHost() {
+        return host;
+    }
+
     @Override
     public String toString() {
         return "RemoteDevice{" + name +'}';
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/yang/netconf-node-topology.yang b/opendaylight/md-sal/sal-netconf-connector/src/main/yang/netconf-node-topology.yang
new file mode 100644 (file)
index 0000000..11bf6a5
--- /dev/null
@@ -0,0 +1,75 @@
+module netconf-node-topology {
+    namespace "urn:opendaylight:netconf-node-topology";
+    prefix "nettop";
+
+    import network-topology { prefix nt; revision-date 2013-10-21; }
+    import yang-ext { prefix ext; revision-date "2013-07-09";}
+    import ietf-inet-types { prefix inet; revision-date "2010-09-24"; }
+
+    revision "2015-01-14" {
+        description "Initial revision of Topology model";
+    }
+
+    augment "/nt:network-topology/nt:topology/nt:topology-types" {
+        container topology-netconf {
+        }
+    }
+
+    grouping netconf-node-fields {
+        leaf connection-status {
+            type enumeration {
+                enum connecting;
+                enum connected;
+                enum unable-to-connect;
+            }
+        }
+
+        leaf host {
+            type inet:host;
+        }
+
+        leaf port {
+            type inet:port-number;
+        }
+
+        leaf connected-message {
+            type string;
+        }
+
+        container available-capabilities {
+            leaf-list available-capability {
+                type string;
+            }
+        }
+
+        container unavailable-capabilities {
+            list unavailable-capability {
+                leaf capability {
+                    type string;
+                }
+
+                leaf failure-reason {
+                    type enumeration {
+                        enum missing-source;
+                        enum unable-to-resolve;
+                    }
+                }
+            }
+        }
+
+        container pass-through {
+            when "../connection-status = connected";
+            description
+                "When the underlying node is connected, its NETCONF context
+                is available verbatim under this container through the
+                mount extension.";
+        }
+    }
+
+    augment "/nt:network-topology/nt:topology/nt:node" {
+        when "../../nt:topology-types/topology-netconf";
+        ext:augment-identifier "netconf-node";
+
+        uses netconf-node-fields;
+    }
+}
index 80ac4d7..0ddafa3 100644 (file)
@@ -17,6 +17,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
@@ -39,7 +40,7 @@ import org.opendaylight.controller.sal.connect.api.MessageTransformer;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.api.SchemaSourceProviderFactory;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.sal.NetconfDeviceRpc;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -90,7 +91,7 @@ public class NetconfDeviceTest {
     private static final NetconfStateSchemas.NetconfStateSchemasResolver stateSchemasResolver = new NetconfStateSchemas.NetconfStateSchemasResolver() {
 
         @Override
-        public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionCapabilities remoteSessionCapabilities, final RemoteDeviceId id) {
+        public NetconfStateSchemas resolve(final NetconfDeviceRpc deviceRpc, final NetconfSessionPreferences remoteSessionCapabilities, final RemoteDeviceId id) {
             return NetconfStateSchemas.EMPTY;
         }
     };
@@ -99,7 +100,7 @@ public class NetconfDeviceTest {
     public void testNetconfDeviceFailFirstSchemaFailSecondEmpty() throws Exception {
         final ArrayList<String> capList = Lists.newArrayList(TEST_CAPABILITY);
 
-        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
         final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
 
         final SchemaContextFactory schemaFactory = getSchemaFactory();
@@ -116,7 +117,7 @@ public class NetconfDeviceTest {
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
         final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer());
         // Monitoring not supported
-        final NetconfSessionCapabilities sessionCaps = getSessionCaps(false, capList);
+        final NetconfSessionPreferences sessionCaps = getSessionCaps(false, capList);
         device.onRemoteSessionUp(sessionCaps, listener);
 
         Mockito.verify(facade, Mockito.timeout(5000)).onDeviceDisconnected();
@@ -126,7 +127,7 @@ public class NetconfDeviceTest {
 
     @Test
     public void testNetconfDeviceMissingSource() throws Exception {
-        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
         final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
 
         final SchemaContextFactory schemaFactory = getSchemaFactory();
@@ -148,10 +149,10 @@ public class NetconfDeviceTest {
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaFactory, stateSchemasResolver);
         final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), getMessageTransformer());
         // Monitoring supported
-        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2));
+        final NetconfSessionPreferences sessionCaps = getSessionCaps(true, Lists.newArrayList(TEST_CAPABILITY, TEST_CAPABILITY2));
         device.onRemoteSessionUp(sessionCaps, listener);
 
-        Mockito.verify(facade, Mockito.timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+        Mockito.verify(facade, Mockito.timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
         Mockito.verify(schemaFactory, times(2)).createSchemaContext(anyCollectionOf(SourceIdentifier.class));
     }
 
@@ -165,7 +166,7 @@ public class NetconfDeviceTest {
 
     @Test
     public void testNotificationBeforeSchema() throws Exception {
-        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
         final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
 
         final MessageTransformer<NetconfMessage> messageTransformer = getMessageTransformer();
@@ -179,7 +180,7 @@ public class NetconfDeviceTest {
 
         verify(facade, times(0)).onNotification(any(CompositeNode.class));
 
-        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+        final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
                 Lists.newArrayList(TEST_CAPABILITY));
 
         device.onRemoteSessionUp(sessionCaps, listener);
@@ -194,7 +195,7 @@ public class NetconfDeviceTest {
 
     @Test
     public void testNetconfDeviceReconnect() throws Exception {
-        final RemoteDeviceHandler<NetconfSessionCapabilities> facade = getFacade();
+        final RemoteDeviceHandler<NetconfSessionPreferences> facade = getFacade();
         final RemoteDeviceCommunicator<NetconfMessage> listener = getListener();
 
         final SchemaContextFactory schemaContextProviderFactory = getSchemaFactory();
@@ -203,13 +204,13 @@ public class NetconfDeviceTest {
         final NetconfDevice.SchemaResourcesDTO schemaResourcesDTO
                 = new NetconfDevice.SchemaResourcesDTO(getSchemaRegistry(), schemaContextProviderFactory, stateSchemasResolver);
         final NetconfDevice device = new NetconfDevice(schemaResourcesDTO, getId(), facade, getExecutor(), messageTransformer);
-        final NetconfSessionCapabilities sessionCaps = getSessionCaps(true,
+        final NetconfSessionPreferences sessionCaps = getSessionCaps(true,
                 Lists.newArrayList(TEST_NAMESPACE + "?module=" + TEST_MODULE + "&amp;revision=" + TEST_REVISION));
         device.onRemoteSessionUp(sessionCaps, listener);
 
         verify(schemaContextProviderFactory, timeout(5000)).createSchemaContext(any(Collection.class));
         verify(messageTransformer, timeout(5000)).onGlobalContextUpdated(any(SchemaContext.class));
-        verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+        verify(facade, timeout(5000)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
 
         device.onRemoteSessionDown();
         verify(facade, timeout(5000)).onDeviceDisconnected();
@@ -218,7 +219,7 @@ public class NetconfDeviceTest {
 
         verify(schemaContextProviderFactory, timeout(5000).times(2)).createSchemaContext(any(Collection.class));
         verify(messageTransformer, timeout(5000).times(3)).onGlobalContextUpdated(any(SchemaContext.class));
-        verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+        verify(facade, timeout(5000).times(2)).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
     }
 
     private SchemaContextFactory getSchemaFactory() {
@@ -236,9 +237,9 @@ public class NetconfDeviceTest {
         return parser.resolveSchemaContext(models);
     }
 
-    private RemoteDeviceHandler<NetconfSessionCapabilities> getFacade() throws Exception {
-        final RemoteDeviceHandler<NetconfSessionCapabilities> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
-        doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionCapabilities.class), any(RpcImplementation.class));
+    private RemoteDeviceHandler<NetconfSessionPreferences> getFacade() throws Exception {
+        final RemoteDeviceHandler<NetconfSessionPreferences> remoteDeviceHandler = mockCloseableClass(RemoteDeviceHandler.class);
+        doNothing().when(remoteDeviceHandler).onDeviceConnected(any(SchemaContext.class), any(NetconfSessionPreferences.class), any(RpcImplementation.class));
         doNothing().when(remoteDeviceHandler).onDeviceDisconnected();
         doNothing().when(remoteDeviceHandler).onNotification(any(CompositeNode.class));
         return remoteDeviceHandler;
@@ -283,7 +284,7 @@ public class NetconfDeviceTest {
         return messageTransformer;
     }
 
-    public NetconfSessionCapabilities getSessionCaps(final boolean addMonitor, final Collection<String> additionalCapabilities) {
+    public NetconfSessionPreferences getSessionCaps(final boolean addMonitor, final Collection<String> additionalCapabilities) {
         final ArrayList<String> capabilities = Lists.newArrayList(
                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0,
                 XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
@@ -294,7 +295,7 @@ public class NetconfDeviceTest {
 
         capabilities.addAll(additionalCapabilities);
 
-        return NetconfSessionCapabilities.fromStrings(
+        return NetconfSessionPreferences.fromStrings(
                 capabilities);
     }
 
index a24034d..fad3d8e 100644 (file)
@@ -77,7 +77,7 @@ public class NetconfDeviceCommunicatorTest {
     NetconfClientSession mockSession;
 
     @Mock
-    RemoteDevice<NetconfSessionCapabilities, NetconfMessage> mockDevice;
+    RemoteDevice<NetconfSessionPreferences, NetconfMessage> mockDevice;
 
     NetconfDeviceCommunicator communicator;
 
@@ -92,7 +92,7 @@ public class NetconfDeviceCommunicatorTest {
     void setupSession()
     {
         doReturn( Collections.<String>emptySet() ).when( mockSession ).getServerCapabilities();
-        doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionCapabilities.class ),
+        doNothing().when( mockDevice ).onRemoteSessionUp( any( NetconfSessionPreferences.class ),
                                                           any( RemoteDeviceCommunicator.class ) );
         communicator.onSessionUp( mockSession );
     }
@@ -130,8 +130,8 @@ public class NetconfDeviceCommunicatorTest {
                                  testCapability );
         doReturn( serverCapabilities ).when( mockSession ).getServerCapabilities();
 
-        ArgumentCaptor<NetconfSessionCapabilities> netconfSessionCapabilities =
-                                              ArgumentCaptor.forClass( NetconfSessionCapabilities.class );
+        ArgumentCaptor<NetconfSessionPreferences> netconfSessionCapabilities =
+                                              ArgumentCaptor.forClass( NetconfSessionPreferences.class );
         doNothing().when( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) );
 
         communicator.onSessionUp( mockSession );
@@ -139,7 +139,7 @@ public class NetconfDeviceCommunicatorTest {
         verify( mockSession ).getServerCapabilities();
         verify( mockDevice ).onRemoteSessionUp( netconfSessionCapabilities.capture(), eq( communicator ) );
 
-        NetconfSessionCapabilities actualCapabilites = netconfSessionCapabilities.getValue();
+        NetconfSessionPreferences actualCapabilites = netconfSessionCapabilities.getValue();
         assertEquals( "containsModuleCapability", true, actualCapabilites.containsNonModuleCapability(
                 NetconfMessageTransformUtil.NETCONF_ROLLBACK_ON_ERROR_URI.toString()) );
         assertEquals( "containsModuleCapability", false, actualCapabilites.containsNonModuleCapability(testCapability) );
@@ -340,7 +340,7 @@ public class NetconfDeviceCommunicatorTest {
      */
     @Test
     public void testNetconfDeviceReconnectInCommunicator() throws Exception {
-        final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> device = mock(RemoteDevice.class);
+        final RemoteDevice<NetconfSessionPreferences, NetconfMessage> device = mock(RemoteDevice.class);
 
         final TimedReconnectStrategy timedReconnectStrategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, 10000, 0, 1.0, null, 100L, null);
         final ReconnectStrategy reconnectStrategy = spy(new ReconnectStrategy() {
@@ -10,7 +10,7 @@ import org.junit.Test;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.yangtools.yang.common.QName;
 
-public class NetconfSessionCapabilitiesTest {
+public class NetconfSessionPreferencesTest {
 
     @Test
     public void testMerge() throws Exception {
@@ -21,7 +21,7 @@ public class NetconfSessionCapabilitiesTest {
                 "urn:ietf:params:netconf:base:1.0",
                 "urn:ietf:params:netconf:capability:rollback-on-error:1.0"
         );
-        final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1);
+        final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1);
         assertCaps(sessionCaps1, 2, 3);
 
         final List<String> caps2 = Lists.newArrayList(
@@ -29,10 +29,10 @@ public class NetconfSessionCapabilitiesTest {
                 "namespace:4?module=module4&revision=2012-12-12",
                 "randomNonModuleCap"
         );
-        final NetconfSessionCapabilities sessionCaps2 = NetconfSessionCapabilities.fromStrings(caps2);
+        final NetconfSessionPreferences sessionCaps2 = NetconfSessionPreferences.fromStrings(caps2);
         assertCaps(sessionCaps2, 1, 2);
 
-        final NetconfSessionCapabilities merged = sessionCaps1.replaceModuleCaps(sessionCaps2);
+        final NetconfSessionPreferences merged = sessionCaps1.replaceModuleCaps(sessionCaps2);
         assertCaps(merged, 2, 2 + 1 /*Preserved monitoring*/);
         for (final QName qName : sessionCaps2.getModuleBasedCaps()) {
             assertThat(merged.getModuleBasedCaps(), hasItem(qName));
@@ -52,11 +52,11 @@ public class NetconfSessionCapabilitiesTest {
                 "namespace:2?module=module2&amp;RANDOMSTRING;revision=2013-12-12" // This one should be ignored(same as first), since revision is in wrong format
         );
 
-        final NetconfSessionCapabilities sessionCaps1 = NetconfSessionCapabilities.fromStrings(caps1);
+        final NetconfSessionPreferences sessionCaps1 = NetconfSessionPreferences.fromStrings(caps1);
         assertCaps(sessionCaps1, 0, 3);
     }
 
-    private void assertCaps(final NetconfSessionCapabilities sessionCaps1, final int nonModuleCaps, final int moduleCaps) {
+    private void assertCaps(final NetconfSessionPreferences sessionCaps1, final int nonModuleCaps, final int moduleCaps) {
         assertEquals(nonModuleCaps, sessionCaps1.getNonModuleCaps().size());
         assertEquals(moduleCaps, sessionCaps1.getModuleBasedCaps().size());
     }
diff --git a/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapterTest.java b/opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceTopologyAdapterTest.java
new file mode 100644 (file)
index 0000000..a1551b2
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connect.netconf.sal;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.Futures;
+import java.net.InetSocketAddress;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCapabilities;
+import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class NetconfDeviceTopologyAdapterTest {
+
+    private RemoteDeviceId id = new RemoteDeviceId("test", new InetSocketAddress("localhost", 22));
+
+    @Mock
+    private DataBroker broker;
+    @Mock
+    private WriteTransaction writeTx;
+    @Mock
+    private Node data;
+
+    private String txIdent = "test transaction";
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        doReturn(writeTx).when(broker).newWriteOnlyTransaction();
+        doNothing().when(writeTx).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+        doNothing().when(writeTx).merge(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+
+        doReturn(txIdent).when(writeTx).getIdentifier();
+    }
+
+    @Test
+    public void testFailedDevice() throws Exception {
+        doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
+
+        NetconfDeviceTopologyAdapter adapter = new NetconfDeviceTopologyAdapter(id, broker);
+        adapter.setDeviceAsFailed(null);
+
+        verify(broker, times(2)).newWriteOnlyTransaction();
+        verify(writeTx, times(3)).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+    }
+
+    @Test
+    public void testDeviceUpdate() throws Exception {
+        doReturn(Futures.immediateCheckedFuture(null)).when(writeTx).submit();
+
+        NetconfDeviceTopologyAdapter adapter = new NetconfDeviceTopologyAdapter(id, broker);
+        adapter.updateDeviceData(true, new NetconfDeviceCapabilities());
+
+        verify(broker, times(2)).newWriteOnlyTransaction();
+        verify(writeTx, times(3)).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(Node.class));
+    }
+
+}
\ No newline at end of file
index ce97541..a37fade 100644 (file)
@@ -21,7 +21,7 @@ import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfBaseOps;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
@@ -60,7 +60,7 @@ public class NetconfDeviceWriteOnlyTxTest {
     @Test
     public void testDiscardChanges() {
         final WriteCandidateTx tx = new WriteCandidateTx(id, new NetconfBaseOps(rpc), normalizer,
-                NetconfSessionCapabilities.fromStrings(Collections.<String>emptySet()));
+                NetconfSessionPreferences.fromStrings(Collections.<String>emptySet()));
         final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = tx.submit();
         try {
             submitFuture.checkedGet();
@@ -84,7 +84,7 @@ public class NetconfDeviceWriteOnlyTxTest {
                 .when(rpc).invokeRpc(any(QName.class), any(CompositeNode.class));
 
         final WriteRunningTx tx = new WriteRunningTx(id, new NetconfBaseOps(rpc), normalizer,
-                NetconfSessionCapabilities.fromStrings(Collections.<String>emptySet()));
+                NetconfSessionPreferences.fromStrings(Collections.<String>emptySet()));
         try {
             tx.delete(LogicalDatastoreType.CONFIGURATION, yangIId);
         } catch (final Exception e) {
index 48ccd82..13399f6 100644 (file)
@@ -10,16 +10,15 @@ package org.opendaylight.controller.remote.rpc;
 
 import akka.actor.Terminated;
 import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import org.opendaylight.controller.cluster.common.actor.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TerminationMonitor extends UntypedActor{
-    protected final LoggingAdapter LOG =
-        Logging.getLogger(getContext().system(), this);
+    private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
 
     public TerminationMonitor(){
-        LOG.info("Created TerminationMonitor");
+        LOG.debug("Created TerminationMonitor");
     }
 
     @Override public void onReceive(Object message) throws Exception {
index 845c1c8..219646d 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
 import com.google.common.base.Preconditions;
@@ -32,8 +30,6 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
  */
 public class RpcRegistry extends BucketStore<RoutingTable> {
 
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
     public RpcRegistry() {
         getLocalBucket().setData(new RoutingTable());
     }
index 934609b..628deb4 100644 (file)
@@ -13,8 +13,6 @@ import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +27,8 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.utils.ConditionalProbe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -43,7 +43,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
 
     private static final Long NO_VERSION = -1L;
 
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+    protected final Logger log = LoggerFactory.getLogger(getClass());
 
     /**
      * Bucket owned by the node
index 1bbcc69..8af1c83 100644 (file)
@@ -17,14 +17,7 @@ import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
 import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -32,15 +25,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Gossiper that syncs bucket store across nodes in the cluster.
@@ -61,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go
 
 public class Gossiper extends AbstractUntypedActorWithMetering {
 
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     private Cluster cluster;
 
@@ -121,30 +119,29 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     @Override
     public void postStop(){
-        if (cluster != null)
+        if (cluster != null) {
             cluster.unsubscribe(getSelf());
-        if (gossipTask != null)
+        }
+        if (gossipTask != null) {
             gossipTask.cancel();
+        }
     }
 
     @Override
     protected void handleReceive(Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
-        if (message instanceof GossipTick)
+        if (message instanceof GossipTick) {
             receiveGossipTick();
-
-            //Message from remote gossiper with its bucket versions
-        else if (message instanceof GossipStatus)
+        } else if (message instanceof GossipStatus) {
+            // Message from remote gossiper with its bucket versions
             receiveGossipStatus((GossipStatus) message);
-
-            //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
-            //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
-            //message with its local versions
-        else if (message instanceof GossipEnvelope)
+        } else if (message instanceof GossipEnvelope) {
+            // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+            // message. The contained buckets are newer as determined by the remote gossiper by
+            // comparing the GossipStatus message with its local versions.
             receiveGossip((GossipEnvelope) message);
-
-        else if (message instanceof ClusterEvent.MemberUp) {
+        } else if (message instanceof ClusterEvent.MemberUp) {
             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
 
         } else if (message instanceof ClusterEvent.MemberRemoved) {
@@ -153,8 +150,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         } else if ( message instanceof ClusterEvent.UnreachableMember){
             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
 
-        } else
+        } else {
             unhandled(message);
+        }
     }
 
     /**
@@ -181,11 +179,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     void receiveMemberUp(Member member) {
 
-        if (selfAddress.equals(member.address()))
+        if (selfAddress.equals(member.address())) {
             return; //ignore up notification for self
+        }
 
-        if (!clusterMembers.contains(member.address()))
+        if (!clusterMembers.contains(member.address())) {
             clusterMembers.add(member.address());
+        }
         if(log.isDebugEnabled()) {
             log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
         }
@@ -198,13 +198,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
      */
     void receiveGossipTick(){
-        if (clusterMembers.size() == 0) return; //no members to send gossip status to
+        if (clusterMembers.size() == 0) {
+            return; //no members to send gossip status to
+        }
 
         Address remoteMemberToGossipTo;
 
-        if (clusterMembers.size() == 1)
+        if (clusterMembers.size() == 1) {
             remoteMemberToGossipTo = clusterMembers.get(0);
-        else {
+        else {
             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
         }
@@ -229,8 +231,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     void receiveGossipStatus(GossipStatus status){
         //Don't accept messages from non-members
-        if (!clusterMembers.contains(status.from()))
+        if (!clusterMembers.contains(status.from())) {
             return;
+        }
 
         final ActorRef sender = getSender();
         Future<Object> futureReply =
@@ -385,19 +388,23 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
                     for (Address address : remoteVersions.keySet()){
 
-                        if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+                        if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
                             continue; //this condition is taken care of by above diffs
-                        if (localVersions.get(address) <  remoteVersions.get(address))
+                        }
+                        if (localVersions.get(address) <  remoteVersions.get(address)) {
                             localIsOlder.add(address);
-                        else if (localVersions.get(address) > remoteVersions.get(address))
+                        } else if (localVersions.get(address) > remoteVersions.get(address)) {
                             localIsNewer.add(address);
+                        }
                     }
 
-                    if (!localIsOlder.isEmpty())
+                    if (!localIsOlder.isEmpty()) {
                         sendGossipStatusTo(sender, localVersions );
+                    }
 
-                    if (!localIsNewer.isEmpty())
+                    if (!localIsNewer.isEmpty()) {
                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+                    }
 
                 }
                 return null;
index c292d93..e1226a5 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-parser-impl</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-netconf-connector</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
index 64397de..8c38ee2 100644 (file)
@@ -70,7 +70,7 @@ public class Main {
         }
         case SSH: {
             writeStatus(consoleIO, "Connecting to %s via SSH. Please wait.", cliArgs.getAddress());
-            connectionManager.connectBlocking(cliArgs.getAddress(), getClientSshConfig(cliArgs));
+            connectionManager.connectBlocking(cliArgs.getAddress(), cliArgs.getServerAddress(), getClientSshConfig(cliArgs));
             break;
         }
         case NONE: {/* Do not connect initially */
index d5c9dc6..bede549 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.controller.netconf.cli.commands.CommandDispatcher;
 import org.opendaylight.controller.netconf.cli.io.ConsoleContext;
 import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
-import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionCapabilities;
+import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -23,7 +23,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Implementation of RemoteDeviceHandler. Integrates cli with
  * sal-netconf-connector.
  */
-public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<NetconfSessionCapabilities> {
+public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<NetconfSessionPreferences> {