Merge "Use ImmutableSet/Map in ServiceReferenceRegistry"
authorTony Tkacik <ttkacik@cisco.com>
Thu, 20 Nov 2014 08:41:16 +0000 (08:41 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 20 Nov 2014 08:41:16 +0000 (08:41 +0000)
64 files changed:
opendaylight/adsal/switchmanager/integrationtest/src/test/java/org/opendaylight/controller/switchmanager/internal/SwitchManagerIT.java
opendaylight/adsal/topologymanager/integrationtest/src/test/java/org/opendaylight/controller/topologymanager/TopologyManagerIT.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/IsolatedLeaderCheck.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/xml/codec/XmlUtilsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/controller/md/sal/dom/xsql/jdbc/JDBCServer.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestCodec.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/cnsn/to/json/test/CnSnToJsonLeafrefType.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/json/to/cnsn/test/JsonLeafrefToCnSnTest.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/XmlAndJsonToCnSnLeafRefTest.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/xml/to/cnsn/test/XmlToCnSnTest.java
opendaylight/md-sal/sal-rest-connector/src/test/resources/cnsn-to-json/leafref/cont-augment-module.yang
opendaylight/md-sal/sal-rest-connector/src/test/resources/cnsn-to-json/leafref/xml/data_ref_to_not_leaf.xml
opendaylight/md-sal/sal-rest-connector/src/test/resources/leafref/json/jsondata.json
opendaylight/md-sal/sal-rest-connector/src/test/resources/leafref/xml/xmldata.xml
opendaylight/md-sal/sal-rest-connector/src/test/resources/xml-to-cnsn/leafref/leafref-module
opendaylight/md-sal/sal-rest-connector/src/test/resources/xml-to-cnsn/leafref/xml/data.xml
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/Activator.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/CapabilityStrippingConfigSnapshotHolder.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandler.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusherImpl.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/NoOpStorageAdapter.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/PersisterAggregator.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/PropertiesProviderBaseImpl.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/CapabilityStrippingConfigSnapshotHolderTest.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationHandlerTest.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/ConfigPersisterNotificationListenerTest.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/DummyAdapter.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/PersisterAggregatorTest.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterTest.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/MockedBundleContext.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/TestingExceptionHandler.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfDocumentedException.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfSession.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/jmx/CommitJMXNotification.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/jmx/DefaultCommitOperationMXBean.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/jmx/NetconfJMXNotification.java
opendaylight/netconf/netconf-api/src/test/java/org/opendaylight/controller/netconf/api/NetconfDocumentedExceptionTest.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/SubtreeFilter.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/SubtreeFilterTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/osgi/NetconfImplActivatorTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationServiceFactoryTrackerTest.java
opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/post-filter.xml [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/pre-filter.xml [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/request.xml [new file with mode: 0644]

index 64098ec5157b6b4c216393c2c00b615fce032806..3963abf26540c40229cff6079e77a6fa2f9ae94b 100644 (file)
@@ -155,7 +155,7 @@ public class SwitchManagerIT {
         assertFalse(debugit);
 
         // Now lets create a hosttracker for testing purpose
-        ServiceReference s = bc.getServiceReference(ISwitchManager.class
+        ServiceReference<?> s = bc.getServiceReference(ISwitchManager.class
                 .getName());
         if (s != null) {
             this.switchManager = (ISwitchManager) bc.getService(s);
index 08934f32c78c5640fe2926017f2cafee8e58a03b..c2fc8aaa8efd0b85886bf9a65b7ccb5ee5de0b52 100644 (file)
@@ -159,7 +159,7 @@ public class TopologyManagerIT {
 
         Set<Property> properties = new HashSet<Property>();
 
-        ServiceReference r = bc.getServiceReference(IPluginInTopologyService.class
+        ServiceReference<?> r = bc.getServiceReference(IPluginInTopologyService.class
                 .getName());
         TopologyServices topologyServices = null;
         if (r != null) {
index 8e4a44cf20f7a5b9b6474424ccba8e4d99e1dee3..6dfa4afd6b6951a351790d0fb87679b40d19dc90 100644 (file)
@@ -80,7 +80,7 @@ public class ExampleActor extends RaftActor {
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
                 String followers = "";
-                if (getRaftState() == RaftState.Leader) {
+                if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
                 } else {
index f202a8bb1d6bc9c12b71c013fc0fd5e9daf990bf..de6169791ed4cb4405e6f47e85e0ed3535155fd1 100644 (file)
@@ -44,6 +44,11 @@ public class TestDriver {
      *  stopLoggingForClient:{nodeName}
      *  printNodes
      *  printState
+     *
+     *  Note: when run on IDE and on debug log level, the debug logs in
+     *  AbstractUptypedActor and AbstractUptypedPersistentActor would need to be commented out.
+     *  Also RaftActor handleCommand(), debug log which prints for every command other than AE/AER
+     *
      * @param args
      * @throws Exception
      */
index bff2a2779733761bff220e7e1223e85c46d18a73..433c3f7e4b832d081de375e8b792f19f1ca4caa4 100644 (file)
@@ -62,4 +62,10 @@ public interface ConfigParams {
      * The number of journal log entries to batch on recovery before applying.
      */
     int getJournalRecoveryLogBatchSize();
+
+    /**
+     * The interval in which the leader needs to check itself if its isolated
+     * @return FiniteDuration
+     */
+    FiniteDuration getIsolatedCheckInterval();
 }
index dc4145358aa332febafa690baa549fbfccfa91b2..a2092234d54134fbe3de765d17041ad766d689c4 100644 (file)
@@ -44,6 +44,8 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+    private FiniteDuration isolatedLeaderCheckInterval =
+        new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
 
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
@@ -57,6 +59,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
 
+    public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+    }
+
     @Override
     public long getSnapshotBatchCount() {
         return snapshotBatchCount;
@@ -87,4 +93,9 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     public int getJournalRecoveryLogBatchSize() {
         return journalRecoveryLogBatchSize;
     }
+
+    @Override
+    public FiniteDuration getIsolatedCheckInterval() {
+        return isolatedLeaderCheckInterval;
+    }
 }
index 65114eb6591d471860c756f780fe9d65ffab1e0d..216ad4103dc4409182e0c590e63afdd2b34e2aa0 100644 (file)
@@ -3,5 +3,6 @@ package org.opendaylight.controller.cluster.raft;
 public enum RaftState {
     Candidate,
     Follower,
-    Leader
+    Leader,
+    IsolatedLeader;
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/IsolatedLeaderCheck.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/IsolatedLeaderCheck.java
new file mode 100644 (file)
index 0000000..36fd813
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Message sent by the IsolatedLeaderCheck scheduler in the Leader to itself
+ * in order to check if its isolated.
+ */
+public class IsolatedLeaderCheck {
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
new file mode 100644 (file)
index 0000000..d85ac8e
--- /dev/null
@@ -0,0 +1,738 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * The behavior of a RaftActor when it is in the Leader state
+ * <p/>
+ * Leaders:
+ * <ul>
+ * <li> Upon election: send initial empty AppendEntries RPCs
+ * (heartbeat) to each server; repeat during idle periods to
+ * prevent election timeouts (§5.2)
+ * <li> If command received from client: append entry to local log,
+ * respond after entry applied to state machine (§5.3)
+ * <li> If last log index â‰¥ nextIndex for a follower: send
+ * AppendEntries RPC with log entries starting at nextIndex
+ * <ul>
+ * <li> If successful: update nextIndex and matchIndex for
+ * follower (§5.3)
+ * <li> If AppendEntries fails because of log inconsistency:
+ * decrement nextIndex and retry (§5.3)
+ * </ul>
+ * <li> If there exists an N such that N > commitIndex, a majority
+ * of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
+ * set commitIndex = N (§5.3, Â§5.4).
+ */
+public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
+    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+
+    protected final Set<String> followers;
+
+    private Cancellable heartbeatSchedule = null;
+
+    private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+    protected final int minReplicationCount;
+
+    protected final int minIsolatedLeaderPeerCount;
+
+    private Optional<ByteString> snapshot;
+
+    public AbstractLeader(RaftActorContext context) {
+        super(context);
+
+        followers = context.getPeerAddresses().keySet();
+
+        for (String followerId : followers) {
+            FollowerLogInformation followerLogInformation =
+                new FollowerLogInformationImpl(followerId,
+                    new AtomicLong(context.getCommitIndex()),
+                    new AtomicLong(-1),
+                    context.getConfigParams().getElectionTimeOutInterval());
+
+            followerToLog.put(followerId, followerLogInformation);
+        }
+
+        leaderId = context.getId();
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Election:Leader has following peers: {}", followers);
+        }
+
+        minReplicationCount = getMajorityVoteCount(followers.size());
+
+        // the isolated Leader peer count will be 1 less than the majority vote count.
+        // this is because the vote count has the self vote counted in it
+        // for e.g
+        // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
+        // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
+        // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
+        minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+
+        snapshot = Optional.absent();
+
+        // Immediately schedule a heartbeat
+        // Upon election: send initial empty AppendEntries RPCs
+        // (heartbeat) to each server; repeat during idle periods to
+        // prevent election timeouts (§5.2)
+        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+    }
+
+    private Optional<ByteString> getSnapshot() {
+        return snapshot;
+    }
+
+    @VisibleForTesting
+    void setSnapshot(Optional<ByteString> snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+        AppendEntries appendEntries) {
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(appendEntries.toString());
+        }
+
+        return this;
+    }
+
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+        AppendEntriesReply appendEntriesReply) {
+
+        if(! appendEntriesReply.isSuccess()) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug(appendEntriesReply.toString());
+            }
+        }
+
+        // Update the FollowerLogInformation
+        String followerId = appendEntriesReply.getFollowerId();
+        FollowerLogInformation followerLogInformation =
+            followerToLog.get(followerId);
+
+        if(followerLogInformation == null){
+            LOG.error("Unknown follower {}", followerId);
+            return this;
+        }
+
+        followerLogInformation.markFollowerActive();
+
+        if (appendEntriesReply.isSuccess()) {
+            followerLogInformation
+                .setMatchIndex(appendEntriesReply.getLogLastIndex());
+            followerLogInformation
+                .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+        } else {
+
+            // TODO: When we find that the follower is out of sync with the
+            // Leader we simply decrement that followers next index by 1.
+            // Would it be possible to do better than this? The RAFT spec
+            // does not explicitly deal with it but may be something for us to
+            // think about
+
+            followerLogInformation.decrNextIndex();
+        }
+
+        // Now figure out if this reply warrants a change in the commitIndex
+        // If there exists an N such that N > commitIndex, a majority
+        // of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
+        // set commitIndex = N (§5.3, Â§5.4).
+        for (long N = context.getCommitIndex() + 1; ; N++) {
+            int replicatedCount = 1;
+
+            for (FollowerLogInformation info : followerToLog.values()) {
+                if (info.getMatchIndex().get() >= N) {
+                    replicatedCount++;
+                }
+            }
+
+            if (replicatedCount >= minReplicationCount) {
+                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
+                if (replicatedLogEntry != null &&
+                    replicatedLogEntry.getTerm() == currentTerm()) {
+                    context.setCommitIndex(N);
+                }
+            } else {
+                break;
+            }
+        }
+
+        // Apply the change to the state machine
+        if (context.getCommitIndex() > context.getLastApplied()) {
+            applyLogToStateMachine(context.getCommitIndex());
+        }
+
+        return this;
+    }
+
+    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+
+        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
+        if(toRemove != null) {
+            trackerList.remove(toRemove);
+        }
+
+        return toRemove;
+    }
+
+    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+        for (ClientRequestTracker tracker : trackerList) {
+            if (tracker.getIndex() == logIndex) {
+                return tracker;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+        RequestVoteReply requestVoteReply) {
+        return this;
+    }
+
+    @Override
+    public RaftState state() {
+        return RaftState.Leader;
+    }
+
+    @Override
+    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+        Preconditions.checkNotNull(sender, "sender should not be null");
+
+        Object message = fromSerializableMessage(originalMessage);
+
+        if (message instanceof RaftRPC) {
+            RaftRPC rpc = (RaftRPC) message;
+            // If RPC request or response contains term T > currentTerm:
+            // set currentTerm = T, convert to follower (§5.1)
+            // This applies to all RPC messages and responses
+            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+                return switchBehavior(new Follower(context));
+            }
+        }
+
+        try {
+            if (message instanceof SendHeartBeat) {
+                sendHeartBeat();
+                return this;
+
+            } else if(message instanceof InitiateInstallSnapshot) {
+                installSnapshotIfNeeded();
+
+            } else if(message instanceof SendInstallSnapshot) {
+                // received from RaftActor
+                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+                sendInstallSnapshot();
+
+            } else if (message instanceof Replicate) {
+                replicate((Replicate) message);
+
+            } else if (message instanceof InstallSnapshotReply){
+                handleInstallSnapshotReply((InstallSnapshotReply) message);
+
+            }
+        } finally {
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+        }
+
+        return super.handleMessage(sender, message);
+    }
+
+    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+        String followerId = reply.getFollowerId();
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+        followerLogInformation.markFollowerActive();
+
+        if (followerToSnapshot != null &&
+            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+            if (reply.isSuccess()) {
+                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                    //this was the last chunk reply
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("InstallSnapshotReply received, " +
+                                "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+                            reply.getChunkIndex(), followerId,
+                            context.getReplicatedLog().getSnapshotIndex() + 1
+                        );
+                    }
+
+                    followerLogInformation.setMatchIndex(
+                        context.getReplicatedLog().getSnapshotIndex());
+                    followerLogInformation.setNextIndex(
+                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    mapFollowerToSnapshot.remove(followerId);
+
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
+                            followerToLog.get(followerId).getNextIndex().get());
+                    }
+
+                    if (mapFollowerToSnapshot.isEmpty()) {
+                        // once there are no pending followers receiving snapshots
+                        // we can remove snapshot from the memory
+                        setSnapshot(Optional.<ByteString>absent());
+                    }
+
+                } else {
+                    followerToSnapshot.markSendStatus(true);
+                }
+            } else {
+                LOG.info("InstallSnapshotReply received, " +
+                        "sending snapshot chunk failed, Will retry, Chunk:{}",
+                    reply.getChunkIndex()
+                );
+                followerToSnapshot.markSendStatus(false);
+            }
+
+        } else {
+            LOG.error("ERROR!!" +
+                    "FollowerId in InstallSnapshotReply not known to Leader" +
+                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+            );
+        }
+    }
+
+    private void replicate(Replicate replicate) {
+        long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Replicate message {}", logIndex);
+        }
+
+        // Create a tracker entry we will use this later to notify the
+        // client actor
+        trackerList.add(
+            new ClientRequestTrackerImpl(replicate.getClientActor(),
+                replicate.getIdentifier(),
+                logIndex)
+        );
+
+        if (followers.size() == 0) {
+            context.setCommitIndex(logIndex);
+            applyLogToStateMachine(logIndex);
+        } else {
+            sendAppendEntries();
+        }
+    }
+
+    private void sendAppendEntries() {
+        // Send an AppendEntries to all followers
+        for (String followerId : followers) {
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+            if (followerActor != null) {
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long followerNextIndex = followerLogInformation.getNextIndex().get();
+                boolean isFollowerActive = followerLogInformation.isFollowerActive();
+                List<ReplicatedLogEntry> entries = null;
+
+                if (mapFollowerToSnapshot.get(followerId) != null) {
+                    // if install snapshot is in process , then sent next chunk if possible
+                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerId);
+                    } else {
+                        // we send a heartbeat even if we have not received a reply for the last chunk
+                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
+                            Collections.<ReplicatedLogEntry>emptyList());
+                    }
+
+                } else {
+                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+
+                    if (isFollowerActive &&
+                        context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        // FIXME : Sending one entry at a time
+                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                    } else if (isFollowerActive && followerNextIndex >= 0 &&
+                        leaderLastIndex >= followerNextIndex ) {
+                        // if the followers next index is not present in the leaders log, and
+                        // if the follower is just not starting and if leader's index is more than followers index
+                        // then snapshot should be sent
+
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                    "leader-last-index:{}", followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+                            );
+                        }
+                        actor().tell(new InitiateInstallSnapshot(), actor());
+
+                        // we would want to sent AE as the capture snapshot might take time
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+
+                    } else {
+                        //we send an AppendEntries, even if the follower is inactive
+                        // in-order to update the followers timestamp, in case it becomes active again
+                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+                    }
+
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
+                }
+            }
+        }
+    }
+
+    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+        List<ReplicatedLogEntry> entries) {
+        followerActor.tell(
+            new AppendEntries(currentTerm(), context.getId(),
+                prevLogIndex(followerNextIndex),
+                prevLogTerm(followerNextIndex), entries,
+                context.getCommitIndex()).toSerializable(),
+            actor()
+        );
+    }
+
+    /**
+     * An installSnapshot is scheduled at a interval that is a multiple of
+     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+     * snapshots at every heartbeat.
+     *
+     * Install Snapshot works as follows
+     * 1. Leader sends a InitiateInstallSnapshot message to self
+     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+     * 5. On complete, Follower sends back a InstallSnapshotReply.
+     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+     * and replenishes the memory by deleting the snapshot in Replicated log.
+     *
+     */
+    private void installSnapshotIfNeeded() {
+        for (String followerId : followers) {
+            ActorSelection followerActor =
+                context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(followerId);
+
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    LOG.info("{} follower needs a snapshot install", followerId);
+                    if (snapshot.isPresent()) {
+                        // if a snapshot is present in the memory, most likely another install is in progress
+                        // no need to capture snapshot
+                        sendSnapshotChunk(followerActor, followerId);
+
+                    } else {
+                        initiateCaptureSnapshot();
+                        //we just need 1 follower who would need snapshot to be installed.
+                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+                        // who needs an install and send to all who need
+                        break;
+                    }
+
+                }
+            }
+        }
+    }
+
+    // on every install snapshot, we try to capture the snapshot.
+    // Once a capture is going on, another one issued will get ignored by RaftActor.
+    private void initiateCaptureSnapshot() {
+        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+        long lastAppliedIndex = -1;
+        long lastAppliedTerm = -1;
+
+        if (lastAppliedEntry != null) {
+            lastAppliedIndex = lastAppliedEntry.getIndex();
+            lastAppliedTerm = lastAppliedEntry.getTerm();
+        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        boolean isInstallSnapshotInitiated = true;
+        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+            actor());
+    }
+
+
+    private void sendInstallSnapshot() {
+        for (String followerId : followers) {
+            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
+                if (!context.getReplicatedLog().isPresent(nextIndex) &&
+                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                    sendSnapshotChunk(followerActor, followerId);
+                }
+            }
+        }
+    }
+
+    /**
+     *  Sends a snapshot chunk to a given follower
+     *  InstallSnapshot should qualify as a heartbeat too.
+     */
+    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+        try {
+            if (snapshot.isPresent()) {
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().getSnapshotTerm(),
+                        getNextSnapshotChunk(followerId,snapshot.get()),
+                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                    ).toSerializable(),
+                    actor()
+                );
+                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
+            }
+        } catch (IOException e) {
+            LOG.error(e, "InstallSnapshot failed for Leader.");
+        }
+    }
+
+    /**
+     * Acccepts snaphot as ByteString, enters into map for future chunks
+     * creates and return a ByteString chunk
+     */
+    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        if (followerToSnapshot == null) {
+            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+        }
+        ByteString nextChunk = followerToSnapshot.getNextChunk();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+        }
+        return nextChunk;
+    }
+
+    private void sendHeartBeat() {
+        if (followers.size() > 0) {
+            sendAppendEntries();
+        }
+    }
+
+    private void stopHeartBeat() {
+        if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+            heartbeatSchedule.cancel();
+        }
+    }
+
+    private void scheduleHeartBeat(FiniteDuration interval) {
+        if(followers.size() == 0){
+            // Optimization - do not bother scheduling a heartbeat as there are
+            // no followers
+            return;
+        }
+
+        stopHeartBeat();
+
+        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
+        // message is sent to itself.
+        // Scheduling the heartbeat only once here because heartbeats do not
+        // need to be sent if there are other messages being sent to the remote
+        // actor.
+        heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+            interval, context.getActor(), new SendHeartBeat(),
+            context.getActorSystem().dispatcher(), context.getActor());
+    }
+
+    @Override
+    public void close() throws Exception {
+        stopHeartBeat();
+    }
+
+    @Override
+    public String getLeaderId() {
+        return context.getId();
+    }
+
+    protected boolean isLeaderIsolated() {
+        int minPresent = minIsolatedLeaderPeerCount;
+        for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            if (followerLogInformation.isFollowerActive()) {
+                --minPresent;
+                if (minPresent == 0) {
+                    break;
+                }
+            }
+        }
+        return (minPresent != 0);
+    }
+
+    /**
+     * Encapsulates the snapshot bytestring and handles the logic of sending
+     * snapshot chunks
+     */
+    protected class FollowerToSnapshot {
+        private ByteString snapshotBytes;
+        private int offset = 0;
+        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+        private int replyReceivedForOffset;
+        // if replyStatus is false, the previous chunk is attempted
+        private boolean replyStatus = false;
+        private int chunkIndex;
+        private int totalChunks;
+
+        public FollowerToSnapshot(ByteString snapshotBytes) {
+            this.snapshotBytes = snapshotBytes;
+            replyReceivedForOffset = -1;
+            chunkIndex = 1;
+            int size = snapshotBytes.size();
+            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
+                    size, totalChunks);
+            }
+        }
+
+        public ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+
+        public int incrementOffset() {
+            if(replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                offset = offset + context.getConfigParams().getSnapshotChunkSize();
+            }
+            return offset;
+        }
+
+        public int incrementChunkIndex() {
+            if (replyStatus) {
+                // if prev chunk failed, we would want to sent the same chunk again
+                chunkIndex =  chunkIndex + 1;
+            }
+            return chunkIndex;
+        }
+
+        public int getChunkIndex() {
+            return chunkIndex;
+        }
+
+        public int getTotalChunks() {
+            return totalChunks;
+        }
+
+        public boolean canSendNextChunk() {
+            // we only send a false if a chunk is sent but we have not received a reply yet
+            return replyReceivedForOffset == offset;
+        }
+
+        public boolean isLastChunk(int chunkIndex) {
+            return totalChunks == chunkIndex;
+        }
+
+        public void markSendStatus(boolean success) {
+            if (success) {
+                // if the chunk sent was successful
+                replyReceivedForOffset = offset;
+                replyStatus = true;
+            } else {
+                // if the chunk sent was failure
+                replyReceivedForOffset = offset;
+                replyStatus = false;
+            }
+        }
+
+        public ByteString getNextChunk() {
+            int snapshotLength = getSnapshotBytes().size();
+            int start = incrementOffset();
+            int size = context.getConfigParams().getSnapshotChunkSize();
+            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+                size = snapshotLength;
+            } else {
+                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+                    size = snapshotLength - start;
+                }
+            }
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("length={}, offset={},size={}",
+                    snapshotLength, start, size);
+            }
+            return getSnapshotBytes().substring(start, start + size);
+
+        }
+    }
+
+    // called from example-actor for printing the follower-states
+    public String printFollowerStates() {
+        StringBuilder sb = new StringBuilder();
+        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+        }
+        return "[" + sb.toString() + "]";
+    }
+
+    @VisibleForTesting
+    void markFollowerActive(String followerId) {
+        followerToLog.get(followerId).markFollowerActive();
+    }
+}
index eed74bba82fbf8a4f8e4c1b5431887451c49d59d..f235221da940d4cb21c491e78b9624c2afa18a80 100644 (file)
@@ -390,7 +390,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
-        LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
+        LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state());
         try {
             close();
         } catch (Exception e) {
@@ -399,4 +399,27 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
         return behavior;
     }
+
+    protected int getMajorityVoteCount(int numPeers) {
+        // Votes are required from a majority of the peers including self.
+        // The numMajority field therefore stores a calculated value
+        // of the number of votes required for this candidate to win an
+        // election based on it's known peers.
+        // If a peer was added during normal operation and raft replicas
+        // came to know about them then the new peer would also need to be
+        // taken into consideration when calculating this value.
+        // Here are some examples for what the numMajority would be for n
+        // peers
+        // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1
+        // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2
+        // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3
+
+        int numMajority = 0;
+        if (numPeers > 0) {
+            int self = 1;
+            numMajority = (numPeers + self) / 2 + 1;
+        }
+        return numMajority;
+
+    }
 }
index 4a3e2c5d664406844edaddee6308abf112b0f79c..702417273ff586fc635ebcb65e15fa4c8bd64885 100644 (file)
@@ -56,25 +56,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             LOG.debug("Election:Candidate has following peers: {}", peers);
         }
 
-        if(peers.size() > 0) {
-            // Votes are required from a majority of the peers including self.
-            // The votesRequired field therefore stores a calculated value
-            // of the number of votes required for this candidate to win an
-            // election based on it's known peers.
-            // If a peer was added during normal operation and raft replicas
-            // came to know about them then the new peer would also need to be
-            // taken into consideration when calculating this value.
-            // Here are some examples for what the votesRequired would be for n
-            // peers
-            // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
-            // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
-            // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
-            int noOfPeers = peers.size();
-            int self = 1;
-            votesRequired = (noOfPeers + self) / 2 + 1;
-        } else {
-            votesRequired = 0;
-        }
+        votesRequired = getMajorityVoteCount(peers.size());
 
         startNewTerm();
         scheduleElection(electionDuration());
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java
new file mode 100644 (file)
index 0000000..4f77711
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+
+/**
+ * Leader which is termed as isolated.
+ * <p/>
+ * If the reply from the majority of the followers  is not received then the leader changes its behavior
+ * to IsolatedLeader. An isolated leader may have followers and they would continue to receive replicated messages.
+ * <p/>
+ * A schedule is run, at an interval of (10 * Heartbeat-time-interval),  in the Leader
+ * to check if its isolated or not.
+ * <p/>
+ * In the Isolated Leader , on every AppendEntriesReply, we aggressively check if the leader is isolated.
+ * If no, then the state is switched back to Leader.
+ *
+ */
+public class IsolatedLeader extends AbstractLeader {
+    public IsolatedLeader(RaftActorContext context) {
+        super(context);
+    }
+
+    // we received an Append Entries reply, we should switch the Behavior to Leader
+    @Override
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+        AppendEntriesReply appendEntriesReply) {
+        RaftActorBehavior ret = super.handleAppendEntriesReply(sender, appendEntriesReply);
+
+        // it can happen that this isolated leader interacts with a new leader in the cluster and
+        // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
+        if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
+            LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", leaderId);
+            return switchBehavior(new Leader(context));
+        }
+        return ret;
+    }
+
+    @Override
+    public RaftState state() {
+        return RaftState.IsolatedLeader;
+    }
+}
index d83362b58081c0e4c4576a848bf10ca29d8fc7da..0dd39001136a25416c241eb8fb69085266d3752c 100644 (file)
@@ -9,42 +9,14 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
-import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
-import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * The behavior of a RaftActor when it is in the Leader state
  * <p/>
@@ -67,546 +39,41 @@ import java.util.concurrent.atomic.AtomicLong;
  * of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, Â§5.4).
  */
-public class Leader extends AbstractRaftActorBehavior {
-
-
-    protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
-    private final Set<String> followers;
-
-    private Cancellable heartbeatSchedule = null;
+public class Leader extends AbstractLeader {
     private Cancellable installSnapshotSchedule = null;
-
-    private List<ClientRequestTracker> trackerList = new ArrayList<>();
-
-    private final int minReplicationCount;
-
-    private Optional<ByteString> snapshot;
+    private Cancellable isolatedLeaderCheckSchedule = null;
 
     public Leader(RaftActorContext context) {
         super(context);
 
-        followers = context.getPeerAddresses().keySet();
-
-        for (String followerId : followers) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(context.getCommitIndex()),
-                    new AtomicLong(-1),
-                    context.getConfigParams().getElectionTimeOutInterval());
-
-            followerToLog.put(followerId, followerLogInformation);
-        }
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers: {}", followers);
-        }
-
-        if (followers.size() > 0) {
-            minReplicationCount = (followers.size() + 1) / 2 + 1;
-        } else {
-            minReplicationCount = 0;
-        }
-
-        snapshot = Optional.absent();
-
-        // Immediately schedule a heartbeat
-        // Upon election: send initial empty AppendEntries RPCs
-        // (heartbeat) to each server; repeat during idle periods to
-        // prevent election timeouts (§5.2)
-        scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
-
-        scheduleInstallSnapshotCheck(
-            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
-                context.getConfigParams().getHeartBeatInterval().unit())
-        );
-
-    }
-
-    private Optional<ByteString> getSnapshot() {
-        return snapshot;
-    }
-
-    @VisibleForTesting
-    void setSnapshot(Optional<ByteString> snapshot) {
-        this.snapshot = snapshot;
-    }
-
-    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(appendEntries.toString());
-        }
-
-        return this;
-    }
-
-    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply) {
-
-        if(! appendEntriesReply.isSuccess()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug(appendEntriesReply.toString());
-            }
-        }
-
-        // Update the FollowerLogInformation
-        String followerId = appendEntriesReply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
-
-        if(followerLogInformation == null){
-            LOG.error("Unknown follower {}", followerId);
-            return this;
-        }
-
-        followerLogInformation.markFollowerActive();
-
-        if (appendEntriesReply.isSuccess()) {
-            followerLogInformation
-                .setMatchIndex(appendEntriesReply.getLogLastIndex());
-            followerLogInformation
-                .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
-        } else {
-
-            // TODO: When we find that the follower is out of sync with the
-            // Leader we simply decrement that followers next index by 1.
-            // Would it be possible to do better than this? The RAFT spec
-            // does not explicitly deal with it but may be something for us to
-            // think about
-
-            followerLogInformation.decrNextIndex();
-        }
-
-        // Now figure out if this reply warrants a change in the commitIndex
-        // If there exists an N such that N > commitIndex, a majority
-        // of matchIndex[i] â‰¥ N, and log[N].term == currentTerm:
-        // set commitIndex = N (§5.3, Â§5.4).
-        for (long N = context.getCommitIndex() + 1; ; N++) {
-            int replicatedCount = 1;
-
-            for (FollowerLogInformation info : followerToLog.values()) {
-                if (info.getMatchIndex().get() >= N) {
-                    replicatedCount++;
-                }
-            }
-
-            if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry =
-                    context.getReplicatedLog().get(N);
-                if (replicatedLogEntry != null
-                    && replicatedLogEntry.getTerm()
-                    == currentTerm()) {
-                    context.setCommitIndex(N);
-                }
-            } else {
-                break;
-            }
-        }
-
-        // Apply the change to the state machine
-        if (context.getCommitIndex() > context.getLastApplied()) {
-            applyLogToStateMachine(context.getCommitIndex());
-        }
-
-        return this;
-    }
-
-    protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
-        ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
-        if(toRemove != null) {
-            trackerList.remove(toRemove);
-        }
-
-        return toRemove;
-    }
-
-    protected ClientRequestTracker findClientRequestTracker(long logIndex) {
-        for (ClientRequestTracker tracker : trackerList) {
-            if (tracker.getIndex() == logIndex) {
-                return tracker;
-            }
-        }
-
-        return null;
-    }
-
-    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply) {
-        return this;
-    }
+        scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
 
-    @Override public RaftState state() {
-        return RaftState.Leader;
+        scheduleIsolatedLeaderCheck(
+            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+                context.getConfigParams().getHeartBeatInterval().unit()));
     }
 
     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
-        Object message = fromSerializableMessage(originalMessage);
-
-        if (message instanceof RaftRPC) {
-            RaftRPC rpc = (RaftRPC) message;
-            // If RPC request or response contains term T > currentTerm:
-            // set currentTerm = T, convert to follower (§5.1)
-            // This applies to all RPC messages and responses
-            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
-                return switchBehavior(new Follower(context));
-            }
-        }
-
-        try {
-            if (message instanceof SendHeartBeat) {
-                sendHeartBeat();
-                return this;
-
-            } else if(message instanceof InitiateInstallSnapshot) {
-                installSnapshotIfNeeded();
-
-            } else if(message instanceof SendInstallSnapshot) {
-                // received from RaftActor
-                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
-                sendInstallSnapshot();
-
-            } else if (message instanceof Replicate) {
-                replicate((Replicate) message);
-
-            } else if (message instanceof InstallSnapshotReply){
-                handleInstallSnapshotReply(
-                    (InstallSnapshotReply) message);
-            }
-        } finally {
-            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
-        }
-
-        return super.handleMessage(sender, message);
-    }
-
-    private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
-        String followerId = reply.getFollowerId();
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-        followerLogInformation.markFollowerActive();
-
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
-            if (reply.isSuccess()) {
-                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
-                    //this was the last chunk reply
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("InstallSnapshotReply received, " +
-                                "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                            reply.getChunkIndex(), followerId,
-                            context.getReplicatedLog().getSnapshotIndex() + 1
-                        );
-                    }
-
-                    followerLogInformation.setMatchIndex(
-                        context.getReplicatedLog().getSnapshotIndex());
-                    followerLogInformation.setNextIndex(
-                        context.getReplicatedLog().getSnapshotIndex() + 1);
-                    mapFollowerToSnapshot.remove(followerId);
-
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
-                            followerToLog.get(followerId).getNextIndex().get());
-                    }
-
-                    if (mapFollowerToSnapshot.isEmpty()) {
-                        // once there are no pending followers receiving snapshots
-                        // we can remove snapshot from the memory
-                        setSnapshot(Optional.<ByteString>absent());
-                    }
-
-                } else {
-                    followerToSnapshot.markSendStatus(true);
-                }
-            } else {
-                LOG.info("InstallSnapshotReply received, " +
-                        "sending snapshot chunk failed, Will retry, Chunk:{}",
-                    reply.getChunkIndex()
-                );
-                followerToSnapshot.markSendStatus(false);
+        if (originalMessage instanceof IsolatedLeaderCheck) {
+            if (isLeaderIsolated()) {
+                LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                    minIsolatedLeaderPeerCount, leaderId);
+                return switchBehavior(new IsolatedLeader(context));
             }
-
-        } else {
-            LOG.error("ERROR!!" +
-                    "FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
-        }
-    }
-
-    private void replicate(Replicate replicate) {
-        long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message {}", logIndex);
         }
 
-        // Create a tracker entry we will use this later to notify the
-        // client actor
-        trackerList.add(
-            new ClientRequestTrackerImpl(replicate.getClientActor(),
-                replicate.getIdentifier(),
-                logIndex)
-        );
-
-        if (followers.size() == 0) {
-            context.setCommitIndex(logIndex);
-            applyLogToStateMachine(logIndex);
-        } else {
-            sendAppendEntries();
-        }
+        return super.handleMessage(sender, originalMessage);
     }
 
-    private void sendAppendEntries() {
-        // Send an AppendEntries to all followers
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
-            if (followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex().get();
-                boolean isFollowerActive = followerLogInformation.isFollowerActive();
-                List<ReplicatedLogEntry> entries = null;
-
-                if (mapFollowerToSnapshot.get(followerId) != null) {
-                    // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
-                        sendSnapshotChunk(followerActor, followerId);
-                    } else {
-                        // we send a heartbeat even if we have not received a reply for the last chunk
-                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
-                    }
-
-                } else {
-                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
-                        // FIXME : Sending one entry at a time
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    } else if (isFollowerActive && followerNextIndex >= 0 &&
-                        leaderLastIndex >= followerNextIndex ) {
-                        // if the followers next index is not present in the leaders log, and
-                        // if the follower is just not starting and if leader's index is more than followers index
-                        // then snapshot should be sent
-
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug("InitiateInstallSnapshot to follower:{}," +
-                                    "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                    "leader-last-index:{}", followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                            );
-                        }
-                        actor().tell(new InitiateInstallSnapshot(), actor());
-
-                        // we would want to sent AE as the capture snapshot might take time
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
-
-                    } else {
-                        //we send an AppendEntries, even if the follower is inactive
-                        // in-order to update the followers timestamp, in case it becomes active again
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
-                    }
-
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
-                }
-            }
-        }
-    }
-
-    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex()).toSerializable(),
-            actor()
-        );
-    }
-
-    /**
-     * An installSnapshot is scheduled at a interval that is a multiple of
-     * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
-     * snapshots at every heartbeat.
-     *
-     * Install Snapshot works as follows
-     * 1. Leader sends a InitiateInstallSnapshot message to self
-     * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
-     * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
-     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
-     * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
-     * 5. On complete, Follower sends back a InstallSnapshotReply.
-     * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
-     * and replenishes the memory by deleting the snapshot in Replicated log.
-     *
-     */
-    private void installSnapshotIfNeeded() {
-        for (String followerId : followers) {
-            ActorSelection followerActor =
-                context.getPeerActorSelection(followerId);
-
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation =
-                    followerToLog.get(followerId);
-
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
-                if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    LOG.info("{} follower needs a snapshot install", followerId);
-                    if (snapshot.isPresent()) {
-                        // if a snapshot is present in the memory, most likely another install is in progress
-                        // no need to capture snapshot
-                        sendSnapshotChunk(followerActor, followerId);
-
-                    } else {
-                        initiateCaptureSnapshot();
-                        //we just need 1 follower who would need snapshot to be installed.
-                        // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
-                        // who needs an install and send to all who need
-                        break;
-                    }
-
-                }
-            }
-        }
-    }
-
-    // on every install snapshot, we try to capture the snapshot.
-    // Once a capture is going on, another one issued will get ignored by RaftActor.
-    private void initiateCaptureSnapshot() {
-        LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
-        ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
-        long lastAppliedIndex = -1;
-        long lastAppliedTerm = -1;
-
-        if (lastAppliedEntry != null) {
-            lastAppliedIndex = lastAppliedEntry.getIndex();
-            lastAppliedTerm = lastAppliedEntry.getTerm();
-        } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
-            lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
-            lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
-        }
-
-        boolean isInstallSnapshotInitiated = true;
-        actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-            actor());
-    }
-
-
-    private void sendInstallSnapshot() {
-        for (String followerId : followers) {
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
-            if(followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long nextIndex = followerLogInformation.getNextIndex().get();
-
-                if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
-                }
-            }
-        }
-    }
-
-    /**
-     *  Sends a snapshot chunk to a given follower
-     *  InstallSnapshot should qualify as a heartbeat too.
-     */
-    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
-        try {
-            if (snapshot.isPresent()) {
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        context.getReplicatedLog().getSnapshotIndex(),
-                        context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
-                    ).toSerializable(),
-                    actor()
-                );
-                LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                    followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                    mapFollowerToSnapshot.get(followerId).getTotalChunks());
-            }
-        } catch (IOException e) {
-            LOG.error(e, "InstallSnapshot failed for Leader.");
-        }
-    }
-
-    /**
-     * Acccepts snaphot as ByteString, enters into map for future chunks
-     * creates and return a ByteString chunk
-     */
-    private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-        if (followerToSnapshot == null) {
-            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
-            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
-        }
-        ByteString nextChunk = followerToSnapshot.getNextChunk();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
-        }
-        return nextChunk;
-    }
-
-    private void sendHeartBeat() {
-        if (followers.size() > 0) {
-            sendAppendEntries();
-        }
-    }
-
-    private void stopHeartBeat() {
-        if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
-            heartbeatSchedule.cancel();
-        }
-    }
-
-    private void stopInstallSnapshotSchedule() {
+    protected void stopInstallSnapshotSchedule() {
         if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
             installSnapshotSchedule.cancel();
         }
     }
 
-    private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followers.size() == 0){
-            // Optimization - do not bother scheduling a heartbeat as there are
-            // no followers
-            return;
-        }
-
-        stopHeartBeat();
-
-        // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
-        // message is sent to itself.
-        // Scheduling the heartbeat only once here because heartbeats do not
-        // need to be sent if there are other messages being sent to the remote
-        // actor.
-        heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
-            interval, context.getActor(), new SendHeartBeat(),
-            context.getActorSystem().dispatcher(), context.getActor());
-    }
-
-    private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+    protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
         if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
@@ -624,122 +91,22 @@ public class Leader extends AbstractRaftActorBehavior {
                 context.getActorSystem().dispatcher(), context.getActor());
     }
 
-
-
-    @Override public void close() throws Exception {
-        stopHeartBeat();
-    }
-
-    @Override public String getLeaderId() {
-        return context.getId();
-    }
-
-    /**
-     * Encapsulates the snapshot bytestring and handles the logic of sending
-     * snapshot chunks
-     */
-    protected class FollowerToSnapshot {
-        private ByteString snapshotBytes;
-        private int offset = 0;
-        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
-        private int replyReceivedForOffset;
-        // if replyStatus is false, the previous chunk is attempted
-        private boolean replyStatus = false;
-        private int chunkIndex;
-        private int totalChunks;
-
-        public FollowerToSnapshot(ByteString snapshotBytes) {
-            this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
-            int size = snapshotBytes.size();
-            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
-                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Snapshot {} bytes, total chunks to send:{}",
-                    size, totalChunks);
-            }
-        }
-
-        public ByteString getSnapshotBytes() {
-            return snapshotBytes;
-        }
-
-        public int incrementOffset() {
-            if(replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                offset = offset + context.getConfigParams().getSnapshotChunkSize();
-            }
-            return offset;
-        }
-
-        public int incrementChunkIndex() {
-            if (replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                chunkIndex =  chunkIndex + 1;
-            }
-            return chunkIndex;
-        }
-
-        public int getChunkIndex() {
-            return chunkIndex;
-        }
-
-        public int getTotalChunks() {
-            return totalChunks;
-        }
-
-        public boolean canSendNextChunk() {
-            // we only send a false if a chunk is sent but we have not received a reply yet
-            return replyReceivedForOffset == offset;
-        }
-
-        public boolean isLastChunk(int chunkIndex) {
-            return totalChunks == chunkIndex;
-        }
-
-        public void markSendStatus(boolean success) {
-            if (success) {
-                // if the chunk sent was successful
-                replyReceivedForOffset = offset;
-                replyStatus = true;
-            } else {
-                // if the chunk sent was failure
-                replyReceivedForOffset = offset;
-                replyStatus = false;
-            }
-        }
-
-        public ByteString getNextChunk() {
-            int snapshotLength = getSnapshotBytes().size();
-            int start = incrementOffset();
-            int size = context.getConfigParams().getSnapshotChunkSize();
-            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
-                size = snapshotLength;
-            } else {
-                if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
-                    size = snapshotLength - start;
-                }
-            }
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("length={}, offset={},size={}",
-                    snapshotLength, start, size);
-            }
-            return getSnapshotBytes().substring(start, start + size);
-
+    protected void stopIsolatedLeaderCheckSchedule() {
+        if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+            isolatedLeaderCheckSchedule.cancel();
         }
     }
 
-    // called from example-actor for printing the follower-states
-    public String printFollowerStates() {
-        StringBuilder sb = new StringBuilder();
-        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            boolean isFollowerActive = followerLogInformation.isFollowerActive();
-            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+    protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+        isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+            context.getActor(), new IsolatedLeaderCheck(),
+            context.getActorSystem().dispatcher(), context.getActor());
+    }
 
-        }
-        return "[" + sb.toString() + "]";
+    @Override public void close() throws Exception {
+        stopInstallSnapshotSchedule();
+        stopIsolatedLeaderCheckSchedule();
+        super.close();
     }
 
     @VisibleForTesting
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
new file mode 100644 (file)
index 0000000..708068a
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
+
+    private ActorRef leaderActor =
+        getSystem().actorOf(Props.create(DoNothingActor.class));
+
+    private ActorRef senderActor =
+        getSystem().actorOf(Props.create(DoNothingActor.class));
+
+    @Override
+    protected RaftActorBehavior createBehavior(
+        RaftActorContext actorContext) {
+        return new Leader(actorContext);
+    }
+
+    @Override
+    protected RaftActorContext createActorContext() {
+        return createActorContext(leaderActor);
+    }
+
+
+    @Test
+    public void testHandleMessageWithThreeMembers() {
+        new JavaTestKit(getSystem()) {{
+            String followerAddress1 = "akka://test/user/$a";
+            String followerAddress2 = "akka://test/user/$b";
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerAddress1);
+            peerAddresses.put("follower-2", followerAddress2);
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+            assertTrue(isolatedLeader.state() == RaftState.IsolatedLeader);
+
+            // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
+            RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1));
+
+            assertEquals(RaftState.Leader, behavior.state());
+
+            behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.Leader, behavior.state());
+        }};
+    }
+
+    @Test
+    public void testHandleMessageWithFiveMembers() {
+        new JavaTestKit(getSystem()) {{
+
+            String followerAddress1 = "akka://test/user/$a";
+            String followerAddress2 = "akka://test/user/$b";
+            String followerAddress3 = "akka://test/user/$c";
+            String followerAddress4 = "akka://test/user/$d";
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerAddress1);
+            peerAddresses.put("follower-2", followerAddress2);
+            peerAddresses.put("follower-3", followerAddress3);
+            peerAddresses.put("follower-4", followerAddress4);
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+            assertEquals(RaftState.IsolatedLeader, isolatedLeader.state());
+
+            // in a 5 member cluster, atleast 2 followers need to be active and return a reply
+            RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.IsolatedLeader, behavior.state());
+
+            behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.Leader, behavior.state());
+
+            behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
+                    isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+            assertEquals(RaftState.Leader, behavior.state());
+        }};
+    }
+
+    @Test
+    public void testHandleMessageFromAnotherLeader() {
+        new JavaTestKit(getSystem()) {{
+            String followerAddress1 = "akka://test/user/$a";
+            String followerAddress2 = "akka://test/user/$b";
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerAddress1);
+            peerAddresses.put("follower-2", followerAddress2);
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+            assertTrue(isolatedLeader.state() == RaftState.IsolatedLeader);
+
+            // if an append-entries reply is received by the isolated-leader, and that reply
+            // has a term  > than its own term, then IsolatedLeader switches to Follower
+            // bowing itself to another leader in the cluster
+            RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+                new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
+                    isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1));
+
+            assertEquals(RaftState.Follower, behavior.state());
+        }};
+
+    }
+}
index 168eb3e5f22c9752dcbe089fe2e87393713d2650..6b534deb1f5db6e8f7b773ddd2004f6b0f44ee71 100644 (file)
@@ -1,10 +1,20 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -18,6 +28,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -31,15 +42,6 @@ import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -941,10 +943,82 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
 
                 assertEquals(RaftState.Leader, raftActorBehavior.state());
+            }};
+    }
 
+    @Test
+    public void testIsolatedLeaderCheckNoFollowers() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef leaderActor = getTestActor();
 
-            }};
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            leaderActorContext.setPeerAddresses(peerAddresses);
 
+            Leader leader = new Leader(leaderActorContext);
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue(behavior instanceof Leader);
+        }};
+    }
+
+    @Test
+    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor1 = getTestActor();
+            ActorRef followerActor2 = getTestActor();
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerActor1.path().toString());
+            peerAddresses.put("follower-2", followerActor2.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.stopIsolatedLeaderCheckSchedule();
+
+            leader.markFollowerActive("follower-1");
+            leader.markFollowerActive("follower-2");
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+                behavior instanceof Leader);
+
+            // kill 1 follower and verify if that got killed
+            final JavaTestKit probe = new JavaTestKit(getSystem());
+            probe.watch(followerActor1);
+            followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg1.getActor(), followerActor1);
+
+            //sleep enough for all the follower stopwatches to lapse
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+            leader.markFollowerActive("follower-2");
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+                behavior instanceof Leader);
+
+            // kill 2nd follower and leader should change to Isolated leader
+            followerActor2.tell(PoisonPill.getInstance(), null);
+            probe.watch(followerActor2);
+            followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg2.getActor(), followerActor2);
+
+            //sleep enough for the remaining the follower stopwatches to lapse
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+                behavior instanceof IsolatedLeader);
+
+        }};
     }
 
     class MockLeader extends Leader {
index fae7eb1a33434bdc52fc3534fe3ee45eee553870..c7bf7d1f7ac2d90d9eb548b5a73a05701f5bab92 100644 (file)
@@ -179,7 +179,7 @@ public class NormalizedNodeSerializer {
                 ValueSerializer.serialize(builder, this, value);
 
             } else if (value instanceof Iterable) {
-                Iterable iterable = (Iterable) value;
+                Iterable<?> iterable = (Iterable<?>) value;
 
                 for (Object o : iterable) {
                     if (o instanceof NormalizedNode) {
index 2574bd681eeaf72762a8ff8c047fd0d5567f3390..cac58587a5ca3acac688c923d47af3846484e4b7 100644 (file)
@@ -92,7 +92,7 @@ public class XmlUtilsTest {
 
     YangInstanceIdentifier instance = (YangInstanceIdentifier) secondNode.getValue();
     Iterable<YangInstanceIdentifier.PathArgument> iterable = instance.getPathArguments();
-    Iterator it = iterable.iterator();
+    Iterator<YangInstanceIdentifier.PathArgument> it = iterable.iterator();
     YangInstanceIdentifier.NodeIdentifier firstPath = (YangInstanceIdentifier.NodeIdentifier) it.next();
     Assert.assertEquals("node", firstPath.getNodeType().getLocalName());
     YangInstanceIdentifier.NodeIdentifierWithPredicates secondPath = (YangInstanceIdentifier.NodeIdentifierWithPredicates)it.next();
index 2048bde613868ed7bb6ea30e8fa25776d7ecc9a4..e18c00ec4b0b5449ddc071175b379372528796f4 100644 (file)
@@ -119,6 +119,7 @@ public class DatastoreContext {
         private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
         private boolean persistent = true;
         private ConfigurationReader configurationReader = new FileConfigurationReader();
+        private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -180,18 +181,24 @@ public class DatastoreContext {
             return this;
         }
 
-
         public Builder persistent(boolean persistent){
             this.persistent = persistent;
             return this;
         }
 
+        public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+            this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+            return this;
+        }
+
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
             raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                     TimeUnit.MILLISECONDS));
             raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+            raftConfig.setIsolatedLeaderCheckInterval(
+                new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
 
             return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
                     operationTimeoutInSeconds, shardTransactionIdleTimeout,
index 2f3fbdcef17ee9b6fc86226ad80d93c2e36ab42b..8eb653a44feb54731c840549d54ca24cafacc292 100644 (file)
@@ -63,6 +63,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardTransactionCommitQueueCapacity(
                         props.getShardTransactionCommitQueueCapacity().getValue().intValue())
                 .persistent(props.getPersistent().booleanValue())
+                .shardIsolatedLeaderCheckIntervalInMillis(
+                    props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
index ecb3a910178484af829cba489c7f33ec30b33662..2a12aff4ef321b35c79c9ca491ac532045f56a36 100644 (file)
@@ -63,6 +63,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardTransactionCommitQueueCapacity(
                         props.getShardTransactionCommitQueueCapacity().getValue().intValue())
                 .persistent(props.getPersistent().booleanValue())
+                .shardIsolatedLeaderCheckIntervalInMillis(
+                    props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance("operational",
index 995e98f38fff30f1340df40ad0162e19b9d7c264..4d3d438b3267b857457e4f7a5c767c52a9338c6a 100644 (file)
@@ -153,6 +153,13 @@ module distributed-datastore-provider {
             type boolean;
             description "Enable or disable data persistence";
          }
+
+        leaf shard-isolated-leader-check-interval-in-millis {
+            default 5000;
+            type heartbeat-interval-type;
+            description "The interval at which the leader of the shard will check if its majority
+                        followers are active and term itself as isolated";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.
index 5be701f82e64497fea4ef863dfd7a147bd360499..5979771d28c88e1ad765bfa5d83053b5974a724c 100644 (file)
@@ -121,7 +121,7 @@ public class JDBCServer extends Thread {
             while (entry.getValue().next()) {
                 Map rec = entry.getValue().getCurrent();
                 Map newRec = new HashMap();
-                for (Iterator iter = rec.entrySet().iterator(); iter.hasNext();) {
+                for (Iterator<?> iter = rec.entrySet().iterator(); iter.hasNext();) {
                     Map.Entry e = (Map.Entry) iter.next();
                     String key = (String) e.getKey();
                     Object value = e.getValue();
index 02819c15c78b4408c91ff56a725e013017a0b55b..2971865a70fb50f65e7c032295bf5bf5963a33df 100644 (file)
@@ -67,18 +67,18 @@ public class NetconfMessageTransformer implements MessageTransformer<NetconfMess
             if(schemaContext.isPresent()) {
                 if (NetconfMessageTransformUtil.isDataEditOperation(rpc)) {
                     final DataNodeContainer schemaForEdit = NetconfMessageTransformUtil.createSchemaForEdit(schemaContext.get());
-                    w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForEdit, codecProvider);
+                    w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForEdit, codecProvider);
                 } else if (NetconfMessageTransformUtil.isGetOperation(rpc)) {
                     final DataNodeContainer schemaForGet = NetconfMessageTransformUtil.createSchemaForGet(schemaContext.get());
-                    w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForGet, codecProvider);
+                    w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForGet, codecProvider);
                 } else if (NetconfMessageTransformUtil.isGetConfigOperation(rpc)) {
                     final DataNodeContainer schemaForGetConfig = NetconfMessageTransformUtil.createSchemaForGetConfig(schemaContext.get());
-                    w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForGetConfig, codecProvider);
+                    w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForGetConfig, codecProvider);
                 } else {
                     final Optional<RpcDefinition> schemaForRpc = NetconfMessageTransformUtil.findSchemaForRpc(rpc, schemaContext.get());
                     if(schemaForRpc.isPresent()) {
                         final DataNodeContainer schemaForGetConfig = NetconfMessageTransformUtil.createSchemaForRpc(schemaForRpc.get());
-                        w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForGetConfig, codecProvider);
+                        w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForGetConfig, codecProvider);
                     } else {
                         w3cPayload = toRpcRequestWithoutSchema(rpcPayload, codecProvider);
                     }
index 665fafacc893b7b05235daaa34f8e709d6f2702f..ceac03e3d97eccabca05633652179ce1bb3acb1c 100644 (file)
@@ -88,11 +88,6 @@ public class RestCodec {
                             "Value is not instance of IdentityrefTypeDefinition but is {}. Therefore NULL is used as translation of  - {}",
                             input == null ? "null" : input.getClass(), String.valueOf(input));
                     return null;
-                } else if (type instanceof LeafrefTypeDefinition) {
-                    if (input instanceof IdentityValuesDTO) {
-                        return LEAFREF_DEFAULT_CODEC.deserialize(((IdentityValuesDTO) input).getOriginValue());
-                    }
-                    return LEAFREF_DEFAULT_CODEC.deserialize(input);
                 } else if (type instanceof InstanceIdentifierTypeDefinition) {
                     if (input instanceof IdentityValuesDTO) {
                         return instanceIdentifier.deserialize(input);
@@ -232,7 +227,7 @@ public class RestCodec {
             IdentityValue valueWithNamespace = data.getValuesWithNamespaces().get(0);
             Module module = getModuleByNamespace(valueWithNamespace.getNamespace(), mountPoint);
             if (module == null) {
-                logger.info("Module by namespace '{}' of first node in instance-identiefier was not found.",
+                logger.info("Module by namespace '{}' of first node in instance-identifier was not found.",
                         valueWithNamespace.getNamespace());
                 logger.info("Instance-identifier will be translated as NULL for data - {}",
                         String.valueOf(valueWithNamespace.getValue()));
index cd860efab75073e13cf0cd2cfd93e12fd7200a5e..ded398a33d0f0390beab6f8e723412182a8172ce 100644 (file)
@@ -84,7 +84,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.opendaylight.yangtools.yang.model.api.TypeDefinition;
 import org.opendaylight.yangtools.yang.model.api.type.IdentityrefTypeDefinition;
+import org.opendaylight.yangtools.yang.model.api.type.LeafrefTypeDefinition;
 import org.opendaylight.yangtools.yang.model.util.EmptyType;
+import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
 import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder;
 import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder;
 import org.slf4j.Logger;
@@ -1241,7 +1243,9 @@ public class RestconfImpl implements RestconfService {
                 try {
                     this.normalizeNode(nodeWrap, schema, null, mountPoint);
                 } catch (IllegalArgumentException e) {
-                    throw new RestconfDocumentedException(e.getMessage(), ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+                    RestconfDocumentedException restconfDocumentedException = new RestconfDocumentedException(e.getMessage(), ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+                    restconfDocumentedException.addSuppressed(e);
+                    throw restconfDocumentedException;
                 }
                 if (nodeWrap instanceof CompositeNodeWrapper) {
                     return ((CompositeNodeWrapper) nodeWrap).unwrap();
@@ -1319,11 +1323,14 @@ public class RestconfImpl implements RestconfService {
         final Object value = simpleNode.getValue();
         Object inputValue = value;
         TypeDefinition<? extends Object> typeDefinition = this.typeDefinition(schema);
-        if ((typeDefinition instanceof IdentityrefTypeDefinition)) {
-            if ((value instanceof String)) {
-                inputValue = new IdentityValuesDTO(simpleNode.getNamespace().toString(), (String) value, null,
-                        (String) value);
-            } // else value is already instance of IdentityValuesDTO
+
+        // For leafrefs, extract the type it is pointing to
+        if(typeDefinition instanceof LeafrefTypeDefinition) {
+            typeDefinition = SchemaContextUtil.getBaseTypeForLeafRef(((LeafrefTypeDefinition) typeDefinition), mountPoint == null ? this.controllerContext.getGlobalSchema() : mountPoint.getSchemaContext(), schema);
+        }
+
+        if (typeDefinition instanceof IdentityrefTypeDefinition) {
+            inputValue = parseToIdentityValuesDTO(simpleNode, value, inputValue);
         }
 
         Object outputValue = inputValue;
@@ -1336,6 +1343,14 @@ public class RestconfImpl implements RestconfService {
         simpleNode.setValue(outputValue);
     }
 
+    private Object parseToIdentityValuesDTO(final SimpleNodeWrapper simpleNode, final Object value, Object inputValue) {
+        if ((value instanceof String)) {
+            inputValue = new IdentityValuesDTO(simpleNode.getNamespace().toString(), (String) value, null,
+                    (String) value);
+        } // else value is already instance of IdentityValuesDTO
+        return inputValue;
+    }
+
     private void normalizeCompositeNode(final CompositeNodeWrapper compositeNodeBuilder,
             final DataNodeContainer schema, final DOMMountPoint mountPoint, final QName currentAugment) {
         final List<NodeWrapper<?>> children = compositeNodeBuilder.getValues();
index b5d3528e95857cd95c9956522bf633f99edc6763..fa79fb7677d264969a6473ef8dfcc5d37c317a32 100644 (file)
@@ -63,7 +63,7 @@ public class CnSnToJsonLeafrefType extends YangAndXmlAndDataSchemaLoader {
     @Test
     public void leafrefToNotLeafTest() {
         String json = toJson("/cnsn-to-json/leafref/xml/data_ref_to_not_leaf.xml");
-        validateJson(".*\"cont-augment-module\\p{Blank}*:\\p{Blank}*lf6\":\\p{Blank}*\"44.33\".*", json);
+        validateJson(".*\"cont-augment-module\\p{Blank}*:\\p{Blank}*lf6\":\\p{Blank}*\"44\".*", json);
     }
 
     /**
index 59696bc534803935ab5c0c0126d0a43fc2c75fa5..bdd74e8f9617a634235814304f21a3362eefeb6f 100644 (file)
@@ -51,9 +51,7 @@ public class JsonLeafrefToCnSnTest extends YangAndXmlAndDataSchemaLoader {
         }
 
         assertNotNull(lf2);
-        assertTrue(lf2.getValue() instanceof String);
-        assertEquals("121", lf2.getValue());
-
+        assertEquals(121, lf2.getValue());
     }
 
 }
index 1c8e53e69ff5091d9a9013947d09cb6381d5723a..7b216ef1ba54f276628349a534871f4694852c00 100644 (file)
@@ -12,17 +12,27 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+
 import javax.ws.rs.WebApplicationException;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.sal.rest.impl.JsonToCompositeNodeProvider;
 import org.opendaylight.controller.sal.rest.impl.XmlToCompositeNodeProvider;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class XmlAndJsonToCnSnLeafRefTest extends YangAndXmlAndDataSchemaLoader {
 
+    final QName refContQName = QName.create("referenced:module", "2014-04-17", "cont");
+    final QName refLf1QName = QName.create(refContQName, "lf1");
+    final QName contQName = QName.create("leafref:module", "2014-04-17", "cont");
+    final QName lf1QName = QName.create(contQName, "lf1");
+    final QName lf2QName = QName.create(contQName, "lf2");
+    final QName lf3QName = QName.create(contQName, "lf3");
+
     @BeforeClass
     public static void initialize() {
         dataLoad("/leafref/yang", 2, "leafref-module", "cont");
@@ -36,7 +46,11 @@ public class XmlAndJsonToCnSnLeafRefTest extends YangAndXmlAndDataSchemaLoader {
         CompositeNode cnSn = (CompositeNode)node;
 
         TestUtils.normalizeCompositeNode(cnSn, modules, schemaNodePath);
-        verifyContPredicate(cnSn, "/ns:cont/ns:lf1", "/cont/lf1", "/ns:cont/ns:lf1", "../lf1");
+
+        verifyContPredicate(cnSn, "lf4", YangInstanceIdentifier.builder().node(refContQName).node(refLf1QName).build());
+        verifyContPredicate(cnSn, "lf2", YangInstanceIdentifier.builder().node(contQName).node(lf1QName).build());
+        verifyContPredicate(cnSn, "lf3", YangInstanceIdentifier.builder().node(contQName).node(lf2QName).build());
+        verifyContPredicate(cnSn, "lf5", YangInstanceIdentifier.builder().node(contQName).node(lf3QName).build());
     }
 
     @Test
@@ -47,31 +61,23 @@ public class XmlAndJsonToCnSnLeafRefTest extends YangAndXmlAndDataSchemaLoader {
         CompositeNode cnSn = (CompositeNode)node;
 
         TestUtils.normalizeCompositeNode(cnSn, modules, schemaNodePath);
-        verifyContPredicate(cnSn, "/leafref-module:cont/leafref-module:lf1", "/leafref-module:cont/leafref-module:lf1",
-                "/referenced-module:cont/referenced-module:lf1", "/leafref-module:cont/leafref-module:lf1");
+
+        verifyContPredicate(cnSn, "lf4", YangInstanceIdentifier.builder().node(refContQName).node(refLf1QName).build());
+        verifyContPredicate(cnSn, "lf2", YangInstanceIdentifier.builder().node(contQName).node(lf1QName).build());
+        verifyContPredicate(cnSn, "lf3", YangInstanceIdentifier.builder().node(contQName).node(lf2QName).build());
+        verifyContPredicate(cnSn, "lf5", YangInstanceIdentifier.builder().node(contQName).node(lf3QName).build());
     }
 
-    private void verifyContPredicate(CompositeNode cnSn, String... values) throws URISyntaxException {
-        Object lf2Value = null;
-        Object lf3Value = null;
-        Object lf4Value = null;
-        Object lf5Value = null;
-
-        for (Node<?> node : cnSn.getValue()) {
-            if (node.getNodeType().getLocalName().equals("lf2")) {
-                lf2Value = ((SimpleNode<?>) node).getValue();
-            } else if (node.getNodeType().getLocalName().equals("lf3")) {
-                lf3Value = ((SimpleNode<?>) node).getValue();
-            } else if (node.getNodeType().getLocalName().equals("lf4")) {
-                lf4Value = ((SimpleNode<?>) node).getValue();
-            } else if (node.getNodeType().getLocalName().equals("lf5")) {
-                lf5Value = ((SimpleNode<?>) node).getValue();
+    private void verifyContPredicate(CompositeNode cnSn, String leafName, Object value) throws URISyntaxException {
+        Object parsed = null;
+
+        for (final Node<?> node : cnSn.getValue()) {
+            if (node.getNodeType().getLocalName().equals(leafName)) {
+                parsed = node.getValue();
             }
         }
-        assertEquals(values[0], lf2Value);
-        assertEquals(values[1], lf3Value);
-        assertEquals(values[2], lf4Value);
-        assertEquals(values[3], lf5Value);
+
+        assertEquals(value, parsed);
     }
 
 }
index d0af29e913fa633381c176ff1796a2fbd3f41c29..64568da769cad13132806df5bb551edd51bcaf20 100644 (file)
@@ -54,8 +54,7 @@ public class XmlToCnSnTest extends YangAndXmlAndDataSchemaLoader {
         }
 
         assertNotNull(lf2);
-        assertTrue(lf2.getValue() instanceof String);
-        assertEquals("121", lf2.getValue());
+        assertEquals(121, lf2.getValue());
     }
 
     @Test
index afc23b7946cf93b4e73b4c1c31255986562db1da..27b2dae2434dc63e1fb464de0cb62ea88b9157cc 100644 (file)
@@ -1,42 +1,42 @@
 module cont-augment-module {
-  namespace "cont:augment:module";  
+  namespace "cont:augment:module";
 
   prefix "cntaugmod";
-  
+
   import main-module {prefix mamo; revision-date 2013-12-2;}
-         
+
   revision 2013-12-2 {
-  
+
   }
-  
+
   augment "/mamo:cont" {
        leaf-list lflst1 {
                type leafref {
-                       path "../lf1";
+                       path "../mamo:lf1";
                }
-       }       
-       
+       }
+
        leaf lf4 {
                type leafref {
-                       path "../lf1";
+                       path "../mamo:lf1";
                }
        }
-       
+
        /* reference to not leaf element */
        leaf lf6 {
                type leafref {
                        path "../lflst1";
                }
        }
-       
+
        leaf lf7 {
                type leafref {
                        path "../lf4";
                }
        }
   }
-       
 
-         
+
+
+
 }
\ No newline at end of file
index cbe455b33b09beaa3b152d76edaefff716e87dbe..f4a435e3bf6b94e8e70ff46d08adf2f9fa4456e6 100644 (file)
@@ -2,7 +2,7 @@
     "leafref-module:cont" : {
         "lf4" : "/referenced-module:cont/referenced-module:lf1",
         "lf2" : "/leafref-module:cont/leafref-module:lf1",
-        "lf3" : "/leafref-module:cont/leafref-module:lf1",
-        "lf5" : "/leafref-module:cont/leafref-module:lf1"
+        "lf3" : "/leafref-module:cont/leafref-module:lf2",
+        "lf5" : "/leafref-module:cont/leafref-module:lf3"
     }
 }
\ No newline at end of file
index 01bf092d27f005605def37d43dda00f2c4450e72..1b5ce835fc0e9a44e94311df8f6c9f9d65827922 100644 (file)
@@ -1,6 +1,6 @@
-<cont xmlns="leafref:module">
-    <lf4 xmlns:ns="referenced:module">/ns:cont/ns:lf1</lf4>
-    <lf2 xmlns:ns="leafref:module">/ns:cont/ns:lf1</lf2>
-    <lf3 xmlns:ns="leafref:module">/cont/lf1</lf3>
-    <lf5 xmlns:ns="leafref:module">../lf1</lf5>
+<cont xmlnsa="leafref:module">
+    <lf4 xmlns:nsa="referenced:module">/nsa:cont/nsa:lf1</lf4>
+    <lf2 xmlns:nsa="leafref:module">/nsa:cont/nsa:lf1</lf2>
+    <lf3 xmlns:ns="leafref:module">/ns:cont/ns:lf2</lf3>
+    <lf5 xmlns:nsa="leafref:module">/nsa:cont/nsa:lf3</lf5>
 </cont>
index 8ca9f090968fa8ed25d4f69b5243f011f1f709c5..6fe770b40baebc9bdef35be75504c0c731461e87 100644 (file)
@@ -1,19 +1,61 @@
 module leafref-module {
-  namespace "leafref:module";  
+  namespace "leafref:module";
 
   prefix "lfrfmo";
-  revision 2013-11-18 {    
+  revision 2013-11-18 {
   }
 
+  identity base {}
+
     container cont {
         leaf lf1 {
             type int32;
         }
         leaf lf2 {
             type leafref {
-                path "/cont/lf1"; 
+                path "/cont/lf1";
+            }
+        }
+
+        leaf lf-ident {
+            type identityref {
+                base "lfrfmo:base";
             }
         }
+
+        leaf lf-ident-ref {
+            type leafref {
+                path "/cont/lf-ident";
+            }
+        }
+
+        leaf lf-ident-ref-relative {
+            type leafref {
+                path "../lf-ident";
+            }
+        }
+
+        leaf lf-ident-ref-relative-cnd {
+            type leafref {
+                path "/lfrfmo:cont/lfrfmo:lis[lfrfmo:id='abc']/lf-ident-ref";
+            }
+        }
+
+
+        list lis {
+            key "id";
+
+            leaf id {
+                type string;
+            }
+
+            leaf lf-ident-ref {
+                type leafref {
+                    path "/cont/lf-ident";
+                }
+            }
+        }
+
     }
-  
+
 }
\ No newline at end of file
index 06200a69b577088d98fe9252ee5252931e62672b..c3071e5610bcacbafe0863ccc6c5f34766661781 100644 (file)
@@ -1,4 +1,8 @@
 <cont>
     <lf1>121</lf1>
     <lf2>121</lf2>
+    <lf-ident xmlns:a="leafref:module">a:base</lf-ident>
+    <lf-ident-ref xmlns:a="leafref:module">a:base</lf-ident-ref>
+    <lf-ident-ref-relative xmlns:a="leafref:module">a:base</lf-ident-ref-relative>
+    <lf-ident-ref-relative-cnd xmlns:a="leafref:module">a:base</lf-ident-ref-relative-cnd>
 </cont>
\ No newline at end of file
index 3ba92b092ed2f5ccb067a8ee25aae3efd08a4d82..99f9c04fcc91d32c60640f0878ab289a6961358f 100644 (file)
@@ -29,7 +29,7 @@ public class Activator implements BundleActivator {
     private static final Logger logger = LoggerFactory.getLogger(Activator.class);
 
     private BundleContext context;
-    private ServiceRegistration osgiRegistration;
+    private ServiceRegistration<?> osgiRegistration;
     private ConfigRegistryLookupThread configRegistryLookup = null;
 
     @Override
index ab353e349b2908b67ade7f583f3601b7aa16ce3b..eac58cbd7f298b97d02603b972e8f3ff4c004000 100644 (file)
@@ -9,6 +9,12 @@
 package org.opendaylight.controller.netconf.persist.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -17,19 +23,12 @@ import org.slf4j.LoggerFactory;
 import org.w3c.dom.Attr;
 import org.w3c.dom.Element;
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
 /**
  * Inspects snapshot xml to be stored, remove all capabilities that are not referenced by it.
  * Useful when persisting current configuration.
  */
 public class CapabilityStrippingConfigSnapshotHolder implements ConfigSnapshotHolder {
-    private static final Logger logger = LoggerFactory.getLogger(CapabilityStrippingConfigSnapshotHolder.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CapabilityStrippingConfigSnapshotHolder.class);
 
     private final String configSnapshot;
     private final StripCapabilitiesResult stripCapabilitiesResult;
@@ -54,7 +53,7 @@ public class CapabilityStrippingConfigSnapshotHolder implements ConfigSnapshotHo
     static StripCapabilitiesResult stripCapabilities(XmlElement configElement, Set<String> allCapabilitiesFromHello) {
         // collect all namespaces
         Set<String> foundNamespacesInXML = getNamespaces(configElement);
-        logger.trace("All capabilities {}\nFound namespaces in XML {}", allCapabilitiesFromHello, foundNamespacesInXML);
+        LOG.trace("All capabilities {}\nFound namespaces in XML {}", allCapabilitiesFromHello, foundNamespacesInXML);
         // required are referenced both in xml and hello
         SortedSet<String> requiredCapabilities = new TreeSet<>();
         // can be removed
@@ -68,7 +67,7 @@ public class CapabilityStrippingConfigSnapshotHolder implements ConfigSnapshotHo
             }
         }
 
-        logger.trace("Required capabilities {}, \nObsolete capabilities {}",
+        LOG.trace("Required capabilities {}, \nObsolete capabilities {}",
                 requiredCapabilities, obsoleteCapabilities);
 
         return new StripCapabilitiesResult(requiredCapabilities, obsoleteCapabilities);
index d72c26cb775dd76f34645d1d5bc533eb4bc082fe..7618807a0e2b23808cb93e73e99d190e93fdeca4 100644 (file)
@@ -8,21 +8,20 @@
 
 package org.opendaylight.controller.netconf.persist.impl;
 
-import org.opendaylight.controller.config.persist.api.Persister;
-import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
-import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
-import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.Closeable;
+import java.io.IOException;
 import javax.annotation.concurrent.ThreadSafe;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServerConnection;
 import javax.management.Notification;
 import javax.management.NotificationListener;
 import javax.management.ObjectName;
-import java.io.Closeable;
-import java.io.IOException;
+import org.opendaylight.controller.config.persist.api.Persister;
+import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
+import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
+import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Responsible for listening for notifications from netconf (via JMX) containing latest
@@ -32,7 +31,7 @@ import java.io.IOException;
 @ThreadSafe
 public class ConfigPersisterNotificationHandler implements Closeable {
 
-    private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
     private final MBeanServerConnection mBeanServerConnection;
     private final NotificationListener listener;
 
@@ -48,7 +47,7 @@ public class ConfigPersisterNotificationHandler implements Closeable {
     }
 
     private static void registerAsJMXListener(final MBeanServerConnection mBeanServerConnection, final NotificationListener listener) {
-        logger.trace("Called registerAsJMXListener");
+        LOG.trace("Called registerAsJMXListener");
         try {
             mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.OBJECT_NAME, listener, null, null);
         } catch (InstanceNotFoundException | IOException e) {
@@ -65,13 +64,13 @@ public class ConfigPersisterNotificationHandler implements Closeable {
                 mBeanServerConnection.removeNotificationListener(on, listener);
             }
         } catch (final Exception e) {
-            logger.warn("Unable to unregister {} as listener for {}", listener, on, e);
+            LOG.warn("Unable to unregister {} as listener for {}", listener, on, e);
         }
     }
 }
 
 class ConfigPersisterNotificationListener implements NotificationListener {
-    private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationListener.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterNotificationListener.class);
 
     private final Persister persisterAggregator;
 
@@ -87,14 +86,14 @@ class ConfigPersisterNotificationListener implements NotificationListener {
         // Socket should not be closed at this point
         // Activator unregisters this as JMX listener before close is called
 
-        logger.trace("Received notification {}", notification);
+        LOG.trace("Received notification {}", notification);
         if (notification instanceof CommitJMXNotification) {
             try {
                 handleAfterCommitNotification((CommitJMXNotification) notification);
             } catch (final Exception e) {
                 // log exceptions from notification Handler here since
                 // notificationBroadcastSupport logs only DEBUG level
-                logger.warn("Failed to handle notification {}", notification, e);
+                LOG.warn("Failed to handle notification {}", notification, e);
                 throw e;
             }
         } else {
@@ -106,7 +105,7 @@ class ConfigPersisterNotificationListener implements NotificationListener {
         try {
             persisterAggregator.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(),
                     notification.getCapabilities()));
-            logger.trace("Configuration persisted successfully");
+            LOG.trace("Configuration persisted successfully");
         } catch (final IOException e) {
             throw new RuntimeException("Unable to persist configuration snapshot", e);
         }
index b346522f4498b587c5550f7898f6a9639b48f395..b06219c978558cb69c92bb353d279cb0f8c480cb 100644 (file)
@@ -53,7 +53,7 @@ import org.xml.sax.SAXException;
 
 @Immutable
 public class ConfigPusherImpl implements ConfigPusher {
-    private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
 
     private final long maxWaitForCapabilitiesMillis;
     private final long conflictingVersionTimeoutMillis;
@@ -83,35 +83,35 @@ public class ConfigPusherImpl implements ConfigPusher {
                  * it is good idea to perform garbage collection to prune
                  * any garbage we have accumulated during startup.
                  */
-                logger.debug("Running post-initialization garbage collection...");
+                LOG.debug("Running post-initialization garbage collection...");
                 System.gc();
-                logger.debug("Post-initialization garbage collection completed.");
-                logger.debug("ConfigPusher has pushed configs {}, gc completed", configs);
+                LOG.debug("Post-initialization garbage collection completed.");
+                LOG.debug("ConfigPusher has pushed configs {}, gc completed", configs);
             }
             catch (NetconfDocumentedException e) {
-                logger.error("Error pushing configs {}",configs);
+                LOG.error("Error pushing configs {}",configs);
                 throw new IllegalStateException(e);
             }
         }
     }
 
     public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
-        logger.debug("Requested to push configs {}", configs);
+        LOG.debug("Requested to push configs {}", configs);
         this.queue.put(configs);
     }
 
     private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
-        logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
+        LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
         LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
         // start pushing snapshots:
         for (ConfigSnapshotHolder configSnapshotHolder : configs) {
             if(configSnapshotHolder != null) {
                 EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
-                logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+                LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
                 result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
             }
         }
-        logger.debug("All configuration snapshots have been pushed successfully.");
+        LOG.debug("All configuration snapshots have been pushed successfully.");
         return result;
     }
 
@@ -133,7 +133,7 @@ public class ConfigPusherImpl implements ConfigPusher {
                 return pushConfig(configSnapshotHolder, operationService);
             } catch (ConflictingVersionException e) {
                 lastException = e;
-                logger.debug("Conflicting version detected, will retry after timeout");
+                LOG.debug("Conflicting version detected, will retry after timeout");
                 sleep();
             }
         } while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
@@ -148,7 +148,7 @@ public class ConfigPusherImpl implements ConfigPusher {
             try {
                 return getOperationService(expectedCapabilities, idForReporting);
             } catch (NotEnoughCapabilitiesException e) {
-                logger.debug("Not enough capabilities: " + e.toString());
+                LOG.debug("Not enough capabilities: {}", e.toString());
                 lastException = e;
                 sleep();
             }
@@ -187,9 +187,9 @@ public class ConfigPusherImpl implements ConfigPusher {
             return serviceCandidate;
         } else {
             serviceCandidate.close();
-            logger.trace("Netconf server did not provide required capabilities for {} " +
+            LOG.trace("Netconf server did not provide required capabilities for {} ", idForReporting,
                     "Expected but not found: {}, all expected {}, current {}",
-                    idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
+                     notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
             );
             throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
         }
@@ -234,7 +234,7 @@ public class ConfigPusherImpl implements ConfigPusher {
         } catch (SAXException | IOException e) {
             throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
         }
-        logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
+        LOG.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
         Stopwatch stopwatch = new Stopwatch().start();
         NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
 
@@ -244,16 +244,16 @@ public class ConfigPusherImpl implements ConfigPusher {
         Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
                 "commit", configSnapshotHolder.toString());
 
-        if (logger.isTraceEnabled()) {
+        if (LOG.isTraceEnabled()) {
             StringBuilder response = new StringBuilder("editConfig response = {");
             response.append(XmlUtil.toString(editResponseMessage));
             response.append("}");
             response.append("commit response = {");
             response.append(XmlUtil.toString(commitResponseMessage));
             response.append("}");
-            logger.trace("Last configuration loaded successfully");
-            logger.trace("Detailed message {}", response);
-            logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+            LOG.trace("Last configuration loaded successfully");
+            LOG.trace("Detailed message {}", response);
+            LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
         }
         return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
     }
index 27f930990dd22149e17fb8092a2ae30bbaa93a6e..26e497387a012c3a6c91103659c0cebc47a325f7 100644 (file)
@@ -8,6 +8,9 @@
 
 package org.opendaylight.controller.netconf.persist.impl;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
 import org.opendaylight.controller.config.persist.api.Persister;
 import org.opendaylight.controller.config.persist.api.PropertiesProvider;
@@ -15,32 +18,28 @@ import org.opendaylight.controller.config.persist.api.StorageAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
 public class NoOpStorageAdapter implements StorageAdapter, Persister {
-    private static final Logger logger = LoggerFactory.getLogger(NoOpStorageAdapter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NoOpStorageAdapter.class);
 
     @Override
     public Persister instantiate(PropertiesProvider propertiesProvider) {
-        logger.debug("instantiate called with {}", propertiesProvider);
+        LOG.debug("instantiate called with {}", propertiesProvider);
         return this;
     }
 
     @Override
     public void persistConfig(ConfigSnapshotHolder holder) throws IOException {
-        logger.debug("persistConfig called with {}", holder);
+        LOG.debug("persistConfig called with {}", holder);
     }
 
     @Override
     public List<ConfigSnapshotHolder> loadLastConfigs() throws IOException {
-        logger.debug("loadLastConfig called");
+        LOG.debug("loadLastConfig called");
         return Collections.emptyList();
     }
 
     @Override
     public void close() {
-        logger.debug("close called");
+        LOG.debug("close called");
     }
 }
index 7e68ac18757e194d3c4b405f69c6a7f2453c9044..0c51166fe4ddedac16302cdc2b622d57d47b7c61 100644 (file)
@@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public final class PersisterAggregator implements Persister {
-    private static final Logger logger = LoggerFactory.getLogger(PersisterAggregator.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PersisterAggregator.class);
 
     public static class PersisterWithConfiguration {
 
@@ -139,7 +139,7 @@ public final class PersisterAggregator implements Persister {
                 persisterWithConfigurations.add(PersisterAggregator.loadConfiguration(index, propertiesProvider));
             }
         }
-        logger.debug("Initialized persister with following adapters {}", persisterWithConfigurations);
+        LOG.debug("Initialized persister with following adapters {}", persisterWithConfigurations);
         return new PersisterAggregator(persisterWithConfigurations);
     }
 
@@ -147,7 +147,7 @@ public final class PersisterAggregator implements Persister {
     public void persistConfig(ConfigSnapshotHolder holder) throws IOException {
         for (PersisterWithConfiguration persisterWithConfiguration: persisterWithConfigurations){
             if (!persisterWithConfiguration.readOnly){
-                logger.debug("Calling {}.persistConfig", persisterWithConfiguration.getStorage());
+                LOG.debug("Calling {}.persistConfig", persisterWithConfiguration.getStorage());
                 persisterWithConfiguration.getStorage().persistConfig(holder);
             }
         }
@@ -169,12 +169,12 @@ public final class PersisterAggregator implements Persister {
                 throw new RuntimeException("Error while calling loadLastConfig on " +  persisterWithConfiguration, e);
             }
             if (!configs.isEmpty()) {
-                logger.debug("Found non empty configs using {}:{}", persisterWithConfiguration, configs);
+                LOG.debug("Found non empty configs using {}:{}", persisterWithConfiguration, configs);
                 return configs;
             }
         }
         // no storage had an answer
-        logger.debug("No non-empty list of configuration snapshots found");
+        LOG.debug("No non-empty list of configuration snapshots found");
         return Collections.emptyList();
     }
 
@@ -190,7 +190,7 @@ public final class PersisterAggregator implements Persister {
             try{
                 persisterWithConfiguration.storage.close();
             }catch(RuntimeException e) {
-                logger.error("Error while closing {}", persisterWithConfiguration.storage, e);
+                LOG.error("Error while closing {}", persisterWithConfiguration.storage, e);
                 if (lastException == null){
                     lastException = e;
                 } else {
index 8e9f9978c4dd837334b07db2d1f6c96764e0ede9..135d5ff9be8c765d4e52432e9f69a14179b0a51d 100644 (file)
@@ -8,13 +8,12 @@
 
 package org.opendaylight.controller.netconf.persist.impl.osgi;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
 import javax.management.MBeanServer;
-
 import org.opendaylight.controller.config.persist.api.ConfigPusher;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
@@ -34,11 +33,9 @@ import org.osgi.util.tracker.ServiceTrackerCustomizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 public class ConfigPersisterActivator implements BundleActivator {
 
-    private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterActivator.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterActivator.class);
     private static final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
 
     public static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS_PROPERTY = "maxWaitForCapabilitiesMillis";
@@ -57,7 +54,7 @@ public class ConfigPersisterActivator implements BundleActivator {
 
     @Override
     public void start(final BundleContext context) throws Exception {
-        logger.debug("ConfigPersister starting");
+        LOG.debug("ConfigPersister starting");
         this.context = context;
 
         autoCloseables = new ArrayList<>();
@@ -68,7 +65,7 @@ public class ConfigPersisterActivator implements BundleActivator {
         long maxWaitForCapabilitiesMillis = getMaxWaitForCapabilitiesMillis(propertiesProvider);
         List<ConfigSnapshotHolder> configs = persisterAggregator.loadLastConfigs();
         long conflictingVersionTimeoutMillis = getConflictingVersionTimeoutMillis(propertiesProvider);
-        logger.debug("Following configs will be pushed: {}", configs);
+        LOG.debug("Following configs will be pushed: {}", configs);
 
         InnerCustomizer innerCustomizer = new InnerCustomizer(configs, maxWaitForCapabilitiesMillis,
                 conflictingVersionTimeoutMillis, persisterAggregator);
@@ -117,7 +114,7 @@ public class ConfigPersisterActivator implements BundleActivator {
 
         @Override
         public NetconfOperationProvider addingService(ServiceReference<NetconfOperationProvider> reference) {
-            logger.trace("Got OuterCustomizer.addingService {}", reference);
+            LOG.trace("Got OuterCustomizer.addingService {}", reference);
             // JMX was registered, track config-netconf-connector
             Filter filter;
             try {
@@ -156,15 +153,15 @@ public class ConfigPersisterActivator implements BundleActivator {
 
         @Override
         public NetconfOperationServiceFactory addingService(ServiceReference<NetconfOperationServiceFactory> reference) {
-            logger.trace("Got InnerCustomizer.addingService {}", reference);
+            LOG.trace("Got InnerCustomizer.addingService {}", reference);
             NetconfOperationServiceFactory service = reference.getBundle().getBundleContext().getService(reference);
 
-            logger.debug("Creating new job queue");
+            LOG.debug("Creating new job queue");
 
             final ConfigPusherImpl configPusher = new ConfigPusherImpl(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis);
-            logger.debug("Configuration Persister got {}", service);
-            logger.debug("Context was {}", context);
-            logger.debug("Registration was {}", registration);
+            LOG.debug("Configuration Persister got {}", service);
+            LOG.debug("Context was {}", context);
+            LOG.debug("Registration was {}", registration);
 
             final Thread pushingThread = new Thread(new Runnable() {
                 @Override
@@ -177,12 +174,12 @@ public class ConfigPersisterActivator implements BundleActivator {
                             registration = context.registerService(ConfigPusher.class.getName(), configPusher, null);
                             configPusher.process(autoCloseables, platformMBeanServer, persisterAggregator);
                         } else {
-                            logger.warn("Unable to process configs as BundleContext is null");
+                            LOG.warn("Unable to process configs as BundleContext is null");
                         }
                     } catch (InterruptedException e) {
-                        logger.info("ConfigPusher thread stopped",e);
+                        LOG.info("ConfigPusher thread stopped",e);
                     }
-                    logger.info("Configuration Persister initialization completed.");
+                    LOG.info("Configuration Persister initialization completed.");
                 }
             }, "config-pusher");
             synchronized (autoCloseables) {
index 2a95cca937ddbee88a7f0723b47f2991d0c64a5f..d73f3b801e62c24a9fd63a2a47c1631bd049974e 100644 (file)
@@ -14,7 +14,7 @@ import org.slf4j.LoggerFactory;
 
 public class PropertiesProviderBaseImpl implements PropertiesProvider {
 
-    private static final Logger logger = LoggerFactory.getLogger(PropertiesProviderBaseImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PropertiesProviderBaseImpl.class);
     private final BundleContext bundleContext;
 
     public PropertiesProviderBaseImpl(BundleContext bundleContext) {
@@ -28,7 +28,7 @@ public class PropertiesProviderBaseImpl implements PropertiesProvider {
     }
 
     public String getPropertyWithoutPrefix(String fullKey){
-        logger.trace("Full key {}", fullKey);
+        LOG.trace("Full key {}", fullKey);
         return bundleContext.getProperty(fullKey);
     }
 
index 7e9d80abc0a8814f655fdf964c946f98fe956058..b22924a722878ca455ee1631793cb4aa9466cbe7 100644 (file)
@@ -7,18 +7,17 @@
  */
 package org.opendaylight.controller.netconf.persist.impl;
 
+import static org.junit.Assert.assertEquals;
+
 import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.commons.io.IOUtils;
 import org.junit.Test;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.w3c.dom.Element;
 
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
 public class CapabilityStrippingConfigSnapshotHolderTest {
 
     @Test
index f16083e3f37071ffab6385a7b2d58e99b201c48a..e96b547169c6dda0abbe5a4c1a40ff57e9a2ac70 100644 (file)
@@ -17,7 +17,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import javax.management.MBeanServerConnection;
-
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
 import javax.management.ObjectName;
index bf031f192ab7126e5c5fecddf7294f27b9aac4f9..f0cd267dd6a77c03487616e03abd5a01151a5bb6 100644 (file)
@@ -8,10 +8,9 @@
 
 package org.opendaylight.controller.netconf.persist.impl;
 
+import com.google.common.collect.Lists;
 import java.util.Collections;
-
 import javax.management.Notification;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,8 +24,6 @@ import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
 import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 
-import com.google.common.collect.Lists;
-
 public class ConfigPersisterNotificationListenerTest {
 
     @Mock
index e824b588324e25a232ff03f6faef2f54be7f3839..792f8cd1c0390ee4746c79a7c41f94df430a1a3a 100644 (file)
@@ -7,15 +7,14 @@
  */
 package org.opendaylight.controller.netconf.persist.impl;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
 import org.opendaylight.controller.config.persist.api.Persister;
 import org.opendaylight.controller.config.persist.api.PropertiesProvider;
 import org.opendaylight.controller.config.persist.api.StorageAdapter;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
 public class DummyAdapter implements StorageAdapter, Persister {
 
     static int persist = 0;
index bef1237f980116f84f0f0ba8dc0f58b10028850b..e6464f8403b57859ce6ce2798a4c20a97cbe85f5 100644 (file)
@@ -8,21 +8,6 @@
 
 package org.opendaylight.controller.netconf.persist.impl;
 
-import com.google.common.collect.Lists;
-
-import org.junit.Test;
-import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
-import org.opendaylight.controller.config.persist.api.Persister;
-import org.opendaylight.controller.config.persist.storage.file.xml.XmlFileStorageAdapter;
-import org.opendaylight.controller.netconf.persist.impl.osgi.ConfigPersisterActivator;
-import org.opendaylight.controller.netconf.persist.impl.osgi.PropertiesProviderBaseImpl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -31,7 +16,19 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.controller.netconf.persist.impl.PersisterAggregator.PersisterWithConfiguration;
-import static org.opendaylight.controller.netconf.persist.impl.PersisterAggregatorTest.TestingPropertiesProvider.loadFile;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.Test;
+import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.config.persist.api.Persister;
+import org.opendaylight.controller.config.persist.storage.file.xml.XmlFileStorageAdapter;
+import org.opendaylight.controller.netconf.persist.impl.osgi.ConfigPersisterActivator;
+import org.opendaylight.controller.netconf.persist.impl.osgi.PropertiesProviderBaseImpl;
 
 public class PersisterAggregatorTest {
 
@@ -72,7 +69,7 @@ public class PersisterAggregatorTest {
 
     @Test
     public void testDummyAdapter() throws Exception {
-        PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(loadFile("test1.properties"));
+        PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(TestingPropertiesProvider.loadFile("test1.properties"));
         List<PersisterWithConfiguration> persisters = persisterAggregator.getPersisterWithConfigurations();
         assertEquals(1, persisters.size());
         PersisterWithConfiguration persister = persisters.get(0);
@@ -107,7 +104,7 @@ public class PersisterAggregatorTest {
 
     @Test
     public void testLoadFromPropertyFile() throws Exception {
-        PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(loadFile("test2.properties"));
+        PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(TestingPropertiesProvider.loadFile("test2.properties"));
         List<PersisterWithConfiguration> persisters = persisterAggregator.getPersisterWithConfigurations();
         assertEquals(1, persisters.size());
         PersisterWithConfiguration persister = persisters.get(0);
@@ -118,7 +115,7 @@ public class PersisterAggregatorTest {
     @Test
     public void testFileStorageNumberOfBackups() throws Exception {
         try {
-            PersisterAggregator.createFromProperties(loadFile("test3.properties"));
+            PersisterAggregator.createFromProperties(TestingPropertiesProvider.loadFile("test3.properties"));
             fail();
         } catch (RuntimeException e) {
             assertThat(
index e02e27a745a032e1f36414c63e9e93dfd55e41c1..142d8f5226d0f7141336dcfc181428f59bd0bf42 100644 (file)
@@ -7,7 +7,15 @@
  */
 package org.opendaylight.controller.netconf.persist.impl.osgi;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Sets;
+import java.io.IOException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,17 +33,8 @@ import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.xml.sax.SAXException;
 
-import java.io.IOException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
 public class ConfigPersisterTest {
-    private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterTest.class);
 
     private MockedBundleContext ctx;
     private ConfigPersisterActivator configPersisterActivator;
@@ -136,7 +135,7 @@ public class ConfigPersisterTest {
         doReturn(getConflictingService()).when(ctx.serviceFactory).createService(anyString());
         Thread.sleep(500);
         // working service:
-        logger.info("Switching to working service **");
+        LOG.info("Switching to working service **");
         doReturn(getWorkingService(getOKDocument())).when(ctx.serviceFactory).createService(anyString());
         Thread.sleep(1000);
         assertCannotRegisterAsJMXListener_pushWasSuccessful();
index 3e5249468da703051ed8fe436843d37c316ab7c5..0d866ecda7ee7349c1f470be5b110a3ab4a9376b 100644 (file)
@@ -13,6 +13,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -20,7 +22,6 @@ import java.util.Dictionary;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.config.persist.api.ConfigPusher;
@@ -38,9 +39,6 @@ import org.osgi.framework.ServiceListener;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 final class MockedBundleContext {
     @Mock
     private BundleContext context;
index c8140973eb9d7ee0c890cbba77ffaa6b8c689327..fcd39d6ae674df5320d0a5a429c3ab25b49e4613 100644 (file)
@@ -19,13 +19,13 @@ import org.slf4j.LoggerFactory;
 
 final class TestingExceptionHandler implements Thread.UncaughtExceptionHandler {
 
-    private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TestingExceptionHandler.class);
 
     private Throwable t;
 
     @Override
     public void uncaughtException(Thread t, Throwable e) {
-        logger.debug("Uncaught exception in thread {}", t, e);
+        LOG.debug("Uncaught exception in thread {}", t, e);
         this.t = e;
     }
 
index 0c365b9d30ebcb5463440404b0c9d10a3a015d5c..e5f32653c53d1a9cfd6bdd19dde5ce4d22bf339f 100644 (file)
@@ -21,10 +21,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
-
 import org.opendaylight.controller.config.api.ConflictingVersionException;
 import org.opendaylight.controller.config.api.ValidationException;
 import org.slf4j.Logger;
index 0bd54979f8cab1c86a080b8bd9a2689625a122c0..ca91589d74eaf92f4ba1ed4bbb72d4955b6109e5 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.netconf.api;
 
 import io.netty.channel.ChannelFuture;
-
 import org.opendaylight.protocol.framework.ProtocolSession;
 
 public interface NetconfSession extends ProtocolSession<NetconfMessage> {
index 74cd4a4e05a5186cf6c4df5543b03a106c61fa3e..4e3954a24d1bd9cd64d172fcce2946e7a0ed4067 100644 (file)
@@ -8,10 +8,9 @@
 
 package org.opendaylight.controller.netconf.api.jmx;
 
-import org.w3c.dom.Element;
-
-import javax.management.NotificationBroadcasterSupport;
 import java.util.Set;
+import javax.management.NotificationBroadcasterSupport;
+import org.w3c.dom.Element;
 
 public class CommitJMXNotification extends NetconfJMXNotification {
 
index e45d3c38a209ee26f0ba2b92e63637ad155f6c01..edc0bc99c5464fbbf10452ec367a2320b5650cdc 100644 (file)
@@ -8,9 +8,8 @@
 
 package org.opendaylight.controller.netconf.api.jmx;
 
-import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
-
 import javax.management.ObjectName;
+import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
 
 public interface DefaultCommitOperationMXBean {
 
index 5aa4bffce3a48710afffe51776eb5f07712a1372..cba98ee435f0e5ab44a2abea780a4ca415b6556f 100644 (file)
@@ -9,10 +9,8 @@
 package org.opendaylight.controller.netconf.api.jmx;
 
 import java.util.Set;
-
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
-
 import org.w3c.dom.Element;
 
 public abstract class NetconfJMXNotification extends Notification {
index cdf8b913ecd807deb9de6b1e39e7727716e523fc..dc18b106bf23ea6208bb28afd4c911d2bb7a9cc4 100644 (file)
@@ -11,15 +11,14 @@ package org.opendaylight.controller.netconf.api;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.Iterator;
-
 import javax.xml.namespace.NamespaceContext;
 import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPathExpressionException;
 import javax.xml.xpath.XPathFactory;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity;
@@ -29,8 +28,6 @@ import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
 
-import com.google.common.collect.ImmutableMap;
-
 
 /**
  * Unit tests for NetconfDocumentedException.
index 42a8bae4484b93a3ed4689223cbf34a9aacbd986..8b2ca86010a06aa5c1b509bd617a08de72fd684a 100644 (file)
@@ -178,10 +178,10 @@ public class SubtreeFilter {
             return false;
         }
 
-        final String unprefixedFilterContent = filter.getTextContent().substring(prefix.length());
-        final String unprefixedSrcCOntnet = src.getTextContent().substring(prefix.length());
+        final String unprefixedFilterContent = filter.getTextContent().substring(prefixToNamespaceOfFilter.getKey().length() + 1);
+        final String unprefixedSrcContnet = src.getTextContent().substring(prefixToNamespaceOfSrc.getKey().length() + 1);
         // Finally compare unprefixed content
-        return unprefixedFilterContent.equals(unprefixedSrcCOntnet);
+        return unprefixedFilterContent.equals(unprefixedSrcContnet);
     }
 
     enum MatchingResult {
index 5d9470750e54053423b8bb871192fafedf9bf53b..51dfa4b1a8df3ed9a785c468ec38b8ddfc5136f7 100644 (file)
@@ -36,7 +36,7 @@ public class SubtreeFilterTest {
     @Parameters
     public static Collection<Object[]> data() {
         List<Object[]> result = new ArrayList<>();
-        for (int i = 0; i <= 9; i++) {
+        for (int i = 0; i <= 10; i++) {
             result.add(new Object[]{i});
         }
         return result;
index 9470e6d09c9f0c4d24d989b77188e6d79f7db6bd..fd9295a4b365fe7908b8a330cd205ae5ba2fa62c 100644 (file)
@@ -34,9 +34,9 @@ public class NetconfImplActivatorTest {
     @Mock
     private Filter filter;
     @Mock
-    private ServiceReference reference;
+    private ServiceReference<?> reference;
     @Mock
-    private ServiceRegistration registration;
+    private ServiceRegistration<?> registration;
 
     @Before
     public void setUp() throws Exception {
@@ -44,7 +44,7 @@ public class NetconfImplActivatorTest {
         doReturn(filter).when(bundle).createFilter(anyString());
         doNothing().when(bundle).addServiceListener(any(ServiceListener.class), anyString());
 
-        ServiceReference[] refs = new ServiceReference[0];
+        ServiceReference<?>[] refs = new ServiceReference[0];
         doReturn(refs).when(bundle).getServiceReferences(anyString(), anyString());
         doReturn(Arrays.asList(refs)).when(bundle).getServiceReferences(any(Class.class), anyString());
         doReturn("").when(bundle).getProperty(anyString());
index 374e8aeb9fe78977f9b7c7dd4175938cb1e23668..0d7158aa21de7fc02d69d85697f6031abbba6cfa 100644 (file)
@@ -32,7 +32,7 @@ public class NetconfOperationServiceFactoryTrackerTest {
     @Mock
     private NetconfOperationServiceFactory factory;
     @Mock
-    private ServiceReference reference;
+    private ServiceReference<NetconfOperationServiceFactory> reference;
 
     private NetconfOperationServiceFactoryTracker tracker;
 
diff --git a/opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/post-filter.xml b/opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/post-filter.xml
new file mode 100644 (file)
index 0000000..3331cb8
--- /dev/null
@@ -0,0 +1,47 @@
+<!--
+  ~ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="m-10">
+    <data>
+        <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+            <module>
+                <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">prefix:sal-netconf-connector</type>
+                <name>controller-config</name>
+                <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1830</port>
+                <connection-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">20000</connection-timeout-millis>
+                <between-attempts-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">2000</between-attempts-timeout-millis>
+                <sleep-factor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1.5</sleep-factor>
+                <password xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</password>
+                <dom-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+                    <name>dom-broker</name>
+                </dom-registry>
+                <client-dispatcher xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf">prefix:netconf-client-dispatcher</type>
+                    <name>global-netconf-dispatcher</name>
+                </client-dispatcher>
+                <username xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</username>
+                <address xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">127.0.0.1</address>
+                <processing-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+                    <name>global-netconf-processing-executor</name>
+                </processing-executor>
+                <tcp-only xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">false</tcp-only>
+                <binding-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+                    <name>binding-osgi-broker</name>
+                </binding-registry>
+                <max-connection-attempts xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">0</max-connection-attempts>
+                <event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
+                    <name>global-event-executor</name>
+                </event-executor>
+            </module>
+        </modules>
+    </data>
+</rpc-reply>
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/pre-filter.xml b/opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/pre-filter.xml
new file mode 100644 (file)
index 0000000..f2620bb
--- /dev/null
@@ -0,0 +1,52 @@
+<!--
+  ~ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="m-10">
+    <data>
+        <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+            <module>
+                <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">prefix:sal-netconf-connector</type>
+                <name>controller-config</name>
+                <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1830</port>
+                <connection-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">20000</connection-timeout-millis>
+                <between-attempts-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">2000</between-attempts-timeout-millis>
+                <sleep-factor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1.5</sleep-factor>
+                <password xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</password>
+                <dom-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+                    <name>dom-broker</name>
+                </dom-registry>
+                <client-dispatcher xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf">prefix:netconf-client-dispatcher</type>
+                    <name>global-netconf-dispatcher</name>
+                </client-dispatcher>
+                <username xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</username>
+                <address xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">127.0.0.1</address>
+                <processing-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+                    <name>global-netconf-processing-executor</name>
+                </processing-executor>
+                <tcp-only xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">false</tcp-only>
+                <binding-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+                    <name>binding-osgi-broker</name>
+                </binding-registry>
+                <max-connection-attempts xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">0</max-connection-attempts>
+                <event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
+                    <name>global-event-executor</name>
+                </event-executor>
+            </module>
+            <module>
+                <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:shutdown:impl">prefix:shutdown</type>
+                <name>shutdown</name>
+                <secret xmlns="urn:opendaylight:params:xml:ns:yang:controller:shutdown:impl"/>
+            </module>
+        </modules>
+    </data>
+</rpc-reply>
\ No newline at end of file
diff --git a/opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/request.xml b/opendaylight/netconf/netconf-impl/src/test/resources/subtree/10/request.xml
new file mode 100644 (file)
index 0000000..259b123
--- /dev/null
@@ -0,0 +1,15 @@
+<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="m-9">
+    <get-config>
+        <source>
+            <running/>
+        </source>
+        <filter xmlns:a="urn:ietf:params:xml:ns:netconf:base:1.0" a:type="subtree">
+            <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+                <module>
+                    <type xmlns:x="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">x:sal-netconf-connector</type>
+                    <name>controller-config</name>
+                </module>
+            </modules>
+        </filter>
+    </get-config>
+</rpc>