assertFalse(debugit);
// Now lets create a hosttracker for testing purpose
- ServiceReference s = bc.getServiceReference(ISwitchManager.class
+ ServiceReference<?> s = bc.getServiceReference(ISwitchManager.class
.getName());
if (s != null) {
this.switchManager = (ISwitchManager) bc.getService(s);
Set<Property> properties = new HashSet<Property>();
- ServiceReference r = bc.getServiceReference(IPluginInTopologyService.class
+ ServiceReference<?> r = bc.getServiceReference(IPluginInTopologyService.class
.getName());
TopologyServices topologyServices = null;
if (r != null) {
package org.opendaylight.controller.config.manager.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
}
@Override
- public Set<ObjectName> lookupConfigBeans(String moduleName) {
+ public Set<ObjectName> lookupConfigBeans(final String moduleName) {
throw new UnsupportedOperationException();
}
@Override
- public Set<ObjectName> lookupConfigBeans(String moduleName, String instanceName) {
+ public Set<ObjectName> lookupConfigBeans(final String moduleName, final String instanceName) {
throw new UnsupportedOperationException();
}
@Override
- public ObjectName lookupConfigBean(String moduleName, String instanceName) throws InstanceNotFoundException {
+ public ObjectName lookupConfigBean(final String moduleName, final String instanceName) throws InstanceNotFoundException {
throw new UnsupportedOperationException();
}
@Override
- public void checkConfigBeanExists(ObjectName objectName) throws InstanceNotFoundException {
+ public void checkConfigBeanExists(final ObjectName objectName) throws InstanceNotFoundException {
throw new InstanceNotFoundException("Cannot find " + objectName + " - Tried to use mocking registry");
}
}
@Override
- public ServiceReferenceJMXRegistration registerMBean(ServiceReferenceMXBeanImpl object, ObjectName on) throws InstanceAlreadyExistsException {
+ public ServiceReferenceJMXRegistration registerMBean(final ServiceReferenceMXBeanImpl object, final ObjectName on) throws InstanceAlreadyExistsException {
throw new UnsupportedOperationException();
}
/**
* Static constructor for transaction controller. Take current state as seen by config registry, allow writing new data.
*/
- public static SearchableServiceReferenceWritableRegistry createSRWritableRegistry(ServiceReferenceReadableRegistry oldReadableRegistry,
- ConfigTransactionLookupRegistry txLookupRegistry,
- Map<String, Map.Entry<ModuleFactory, BundleContext>> currentlyRegisteredFactories) {
+ public static SearchableServiceReferenceWritableRegistry createSRWritableRegistry(final ServiceReferenceReadableRegistry oldReadableRegistry,
+ final ConfigTransactionLookupRegistry txLookupRegistry,
+ final Map<String, Map.Entry<ModuleFactory, BundleContext>> currentlyRegisteredFactories) {
if (txLookupRegistry == null) {
throw new IllegalArgumentException("txLookupRegistry is null");
/**
* Copy back state to config registry after commit.
*/
- public static CloseableServiceReferenceReadableRegistry createSRReadableRegistry(ServiceReferenceWritableRegistry oldWritableRegistry,
- LookupRegistry lookupRegistry, BaseJMXRegistrator baseJMXRegistrator) {
+ public static CloseableServiceReferenceReadableRegistry createSRReadableRegistry(final ServiceReferenceWritableRegistry oldWritableRegistry,
+ final LookupRegistry lookupRegistry, final BaseJMXRegistrator baseJMXRegistrator) {
ServiceReferenceRegistryImpl old = (ServiceReferenceRegistryImpl) oldWritableRegistry;
// even if factories do change, nothing in the mapping can change between transactions
/**
* Fill refNames and mBeans maps from old instance
*/
- private static void copy(ServiceReferenceRegistryImpl old, ServiceReferenceRegistryImpl newRegistry, String nullableDstTransactionName) {
+ private static void copy(final ServiceReferenceRegistryImpl old, final ServiceReferenceRegistryImpl newRegistry, final String nullableDstTransactionName) {
for (Entry<ServiceReference, Entry<ServiceReferenceMXBeanImpl, ServiceReferenceJMXRegistration>> refNameEntry : old.mBeans.entrySet()) {
ObjectName currentImplementation;
ObjectName currentImplementationSrc = refNameEntry.getValue().getKey().getCurrentImplementation();
}
}
- private static Map<String, ModuleFactory> extractFactoriesMap(Map<String, Map.Entry<ModuleFactory, BundleContext>> currentlyRegisteredFactories) {
+ private static Map<String, ModuleFactory> extractFactoriesMap(final Map<String, Map.Entry<ModuleFactory, BundleContext>> currentlyRegisteredFactories) {
Map<String, ModuleFactory> result = new HashMap<>();
for (Entry<String, Entry<ModuleFactory, BundleContext>> entry : currentlyRegisteredFactories.entrySet()) {
result.put(entry.getKey(), entry.getValue().getKey());
return result;
}
- private ServiceReferenceRegistryImpl(Map<String, ModuleFactory> factories, LookupRegistry lookupRegistry,
- ServiceReferenceTransactionRegistratorFactory serviceReferenceRegistratorFactory,
- boolean writable) {
+ private ServiceReferenceRegistryImpl(final Map<String, ModuleFactory> factories, final LookupRegistry lookupRegistry,
+ final ServiceReferenceTransactionRegistratorFactory serviceReferenceRegistratorFactory,
+ final boolean writable) {
this.factories = factories;
this.writable = writable;
this.lookupRegistry = lookupRegistry;
Set<String> qNames = InterfacesHelper.getQNames(siAnnotations);
allAnnotations.addAll(siAnnotations);
allQNameSet.addAll(qNames);
- modifiableFactoryNamesToQNames.put(entry.getKey(), Collections.unmodifiableSet(qNames));
+ modifiableFactoryNamesToQNames.put(entry.getKey(), qNames);
}
- this.factoryNamesToQNames = Collections.unmodifiableMap(modifiableFactoryNamesToQNames);
- this.allQNames = Collections.unmodifiableSet(allQNameSet);
+ this.factoryNamesToQNames = ImmutableMap.copyOf(modifiableFactoryNamesToQNames);
+ this.allQNames = ImmutableSet.copyOf(allQNameSet);
// fill namespacesToAnnotations
Map<String /* namespace */, Map<String /* localName */, ServiceInterfaceAnnotation>> modifiableNamespacesToAnnotations =
new HashMap<>();
ofNamespace.put(sia.localName(), sia);
modifiableServiceQNamesToAnnotations.put(sia.value(), sia);
}
- this.namespacesToAnnotations = Collections.unmodifiableMap(modifiableNamespacesToAnnotations);
- this.serviceQNamesToAnnotations = Collections.unmodifiableMap(modifiableServiceQNamesToAnnotations);
+ this.namespacesToAnnotations = ImmutableMap.copyOf(modifiableNamespacesToAnnotations);
+ this.serviceQNamesToAnnotations = ImmutableMap.copyOf(modifiableServiceQNamesToAnnotations);
LOGGER.trace("factoryNamesToQNames:{}", this.factoryNamesToQNames);
}
@Override
- public Map<ServiceInterfaceAnnotation, String /* service ref name */> findServiceInterfaces(ModuleIdentifier moduleIdentifier) {
+ public Map<ServiceInterfaceAnnotation, String /* service ref name */> findServiceInterfaces(final ModuleIdentifier moduleIdentifier) {
Map<ServiceInterfaceAnnotation, String /* service ref name */> result = modulesToServiceRef.get(moduleIdentifier);
if (result == null) {
return Collections.emptyMap();
}
@Override
- public synchronized Set<String> lookupServiceInterfaceNames(ObjectName objectName) throws InstanceNotFoundException {
+ public synchronized Set<String> lookupServiceInterfaceNames(final ObjectName objectName) throws InstanceNotFoundException {
lookupRegistry.checkConfigBeanExists(objectName);
String factoryName = ObjectNameUtil.getFactoryName(objectName);
}
@Override
- public synchronized String getServiceInterfaceName(String namespace, String localName) {
+ public synchronized String getServiceInterfaceName(final String namespace, final String localName) {
Map<String /* localName */, ServiceInterfaceAnnotation> ofNamespace = namespacesToAnnotations.get(namespace);
if (ofNamespace == null) {
LOGGER.error("Cannot find namespace {} in {}", namespace, namespacesToAnnotations);
return result;
}
- private ObjectName getObjectName(ModuleIdentifier moduleIdentifier) {
+ private ObjectName getObjectName(final ModuleIdentifier moduleIdentifier) {
ObjectName on;
try {
on = lookupRegistry.lookupConfigBean(moduleIdentifier.getFactoryName(), moduleIdentifier.getInstanceName());
}
@Override
- public synchronized ObjectName lookupConfigBeanByServiceInterfaceName(String serviceInterfaceQName, String refName) {
+ public synchronized ObjectName lookupConfigBeanByServiceInterfaceName(final String serviceInterfaceQName, final String refName) {
ServiceReference serviceReference = new ServiceReference(serviceInterfaceQName, refName);
ModuleIdentifier moduleIdentifier = refNames.get(serviceReference);
if (moduleIdentifier == null) {
}
@Override
- public synchronized Map<String /* refName */, ObjectName> lookupServiceReferencesByServiceInterfaceName(String serviceInterfaceQName) {
+ public synchronized Map<String /* refName */, ObjectName> lookupServiceReferencesByServiceInterfaceName(final String serviceInterfaceQName) {
Map<String, Map<String, ObjectName>> serviceMapping = getServiceMapping();
Map<String, ObjectName> innerMap = serviceMapping.get(serviceInterfaceQName);
if (innerMap == null) {
}
@Override
- public synchronized ObjectName getServiceReference(String serviceInterfaceQName, String refName) throws InstanceNotFoundException {
+ public synchronized ObjectName getServiceReference(final String serviceInterfaceQName, final String refName) throws InstanceNotFoundException {
ServiceReference serviceReference = new ServiceReference(serviceInterfaceQName, refName);
if (mBeans.containsKey(serviceReference) == false) {
throw new InstanceNotFoundException("Cannot find " + serviceReference);
}
@Override
- public synchronized void checkServiceReferenceExists(ObjectName objectName) throws InstanceNotFoundException {
+ public synchronized void checkServiceReferenceExists(final ObjectName objectName) throws InstanceNotFoundException {
String actualTransactionName = ObjectNameUtil.getTransactionName(objectName);
String expectedTransactionName = serviceReferenceRegistrator.getNullableTransactionName();
if (writable & actualTransactionName == null || (writable && actualTransactionName.equals(expectedTransactionName) == false)) {
}
@Override
- public synchronized ObjectName saveServiceReference(String serviceInterfaceName, String refName, ObjectName moduleON) throws InstanceNotFoundException {
+ public synchronized ObjectName saveServiceReference(final String serviceInterfaceName, final String refName, final ObjectName moduleON) throws InstanceNotFoundException {
assertWritable();
ServiceReference serviceReference = new ServiceReference(serviceInterfaceName, refName);
return saveServiceReference(serviceReference, moduleON);
}
- private synchronized ObjectName saveServiceReference(ServiceReference serviceReference, ObjectName moduleON)
+ private synchronized ObjectName saveServiceReference(final ServiceReference serviceReference, final ObjectName moduleON)
throws InstanceNotFoundException{
return saveServiceReference(serviceReference, moduleON, false);
}
- private synchronized ObjectName saveServiceReference(ServiceReference serviceReference, ObjectName moduleON,
- boolean skipChecks) throws InstanceNotFoundException {
+ private synchronized ObjectName saveServiceReference(final ServiceReference serviceReference, final ObjectName moduleON,
+ final boolean skipChecks) throws InstanceNotFoundException {
// make sure it is found
if (skipChecks == false) {
}
@Override
- public ServiceReferenceJMXRegistration setValue(ServiceReferenceJMXRegistration value) {
+ public ServiceReferenceJMXRegistration setValue(final ServiceReferenceJMXRegistration value) {
throw new UnsupportedOperationException();
}
};
}
- private ObjectName getServiceON(ServiceReference serviceReference) {
+ private ObjectName getServiceON(final ServiceReference serviceReference) {
if (writable) {
return ObjectNameUtil.createTransactionServiceON(serviceReferenceRegistrator.getNullableTransactionName(),
serviceReference.getServiceInterfaceQName(), serviceReference.getRefName());
}
@Override
- public synchronized void removeServiceReference(String serviceInterfaceName, String refName) throws InstanceNotFoundException{
+ public synchronized void removeServiceReference(final String serviceInterfaceName, final String refName) throws InstanceNotFoundException{
ServiceReference serviceReference = new ServiceReference(serviceInterfaceName, refName);
removeServiceReference(serviceReference);
}
- private synchronized void removeServiceReference(ServiceReference serviceReference) throws InstanceNotFoundException {
+ private synchronized void removeServiceReference(final ServiceReference serviceReference) throws InstanceNotFoundException {
LOGGER.debug("Removing service reference {} from {}", serviceReference, this);
assertWritable();
// is the qName known?
}
@Override
- public synchronized boolean removeServiceReferences(ObjectName moduleObjectName) throws InstanceNotFoundException {
+ public synchronized boolean removeServiceReferences(final ObjectName moduleObjectName) throws InstanceNotFoundException {
lookupRegistry.checkConfigBeanExists(moduleObjectName);
String factoryName = ObjectNameUtil.getFactoryName(moduleObjectName);
// check that service interface name exist
}
- private boolean removeServiceReferences(ObjectName moduleObjectName, Set<String> qNames) throws InstanceNotFoundException {
+ private boolean removeServiceReferences(final ObjectName moduleObjectName, final Set<String> qNames) throws InstanceNotFoundException {
ObjectNameUtil.checkType(moduleObjectName, ObjectNameUtil.TYPE_MODULE);
assertWritable();
Set<ServiceReference> serviceReferencesLinkingTo = findServiceReferencesLinkingTo(moduleObjectName, qNames);
return serviceReferencesLinkingTo.isEmpty() == false;
}
- private Set<ServiceReference> findServiceReferencesLinkingTo(ObjectName moduleObjectName, Set<String> serviceInterfaceQNames) {
+ private Set<ServiceReference> findServiceReferencesLinkingTo(final ObjectName moduleObjectName, final Set<String> serviceInterfaceQNames) {
String factoryName = ObjectNameUtil.getFactoryName(moduleObjectName);
if (serviceInterfaceQNames == null) {
LOGGER.warn("Possible error in code: cannot find factoryName {} in {}, object name {}", factoryName, factoryNamesToQNames, moduleObjectName);
*/
package org.opendaylight.controller.config.manager.impl.util;
+import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import org.opendaylight.controller.config.spi.Module;
import org.opendaylight.controller.config.spi.ModuleFactory;
-public class InterfacesHelper {
+public final class InterfacesHelper {
private InterfacesHelper() {
}
for (ServiceInterfaceAnnotation sia: siAnnotations) {
qNames.add(sia.value());
}
- return Collections.unmodifiableSet(qNames);
+ return ImmutableSet.copyOf(qNames);
}
public static Set<ServiceInterfaceAnnotation> getServiceInterfaceAnnotations(final ModuleFactory factory) {
} else if (message instanceof PrintRole) {
if(LOG.isDebugEnabled()) {
String followers = "";
- if (getRaftState() == RaftState.Leader) {
+ if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
} else {
* stopLoggingForClient:{nodeName}
* printNodes
* printState
+ *
+ * Note: when run on IDE and on debug log level, the debug logs in
+ * AbstractUptypedActor and AbstractUptypedPersistentActor would need to be commented out.
+ * Also RaftActor handleCommand(), debug log which prints for every command other than AE/AER
+ *
* @param args
* @throws Exception
*/
* The number of journal log entries to batch on recovery before applying.
*/
int getJournalRecoveryLogBatchSize();
+
+ /**
+ * The interval in which the leader needs to check itself if its isolated
+ * @return FiniteDuration
+ */
+ FiniteDuration getIsolatedCheckInterval();
}
private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
+ private FiniteDuration isolatedLeaderCheckInterval =
+ new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
+ public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
+ this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+ }
+
@Override
public long getSnapshotBatchCount() {
return snapshotBatchCount;
public int getJournalRecoveryLogBatchSize() {
return journalRecoveryLogBatchSize;
}
+
+ @Override
+ public FiniteDuration getIsolatedCheckInterval() {
+ return isolatedLeaderCheckInterval;
+ }
}
public enum RaftState {
Candidate,
Follower,
- Leader
+ Leader,
+ IsolatedLeader;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+/**
+ * Message sent by the IsolatedLeaderCheck scheduler in the Leader to itself
+ * in order to check if its isolated.
+ */
+public class IsolatedLeaderCheck {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
+import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * The behavior of a RaftActor when it is in the Leader state
+ * <p/>
+ * Leaders:
+ * <ul>
+ * <li> Upon election: send initial empty AppendEntries RPCs
+ * (heartbeat) to each server; repeat during idle periods to
+ * prevent election timeouts (§5.2)
+ * <li> If command received from client: append entry to local log,
+ * respond after entry applied to state machine (§5.3)
+ * <li> If last log index ≥ nextIndex for a follower: send
+ * AppendEntries RPC with log entries starting at nextIndex
+ * <ul>
+ * <li> If successful: update nextIndex and matchIndex for
+ * follower (§5.3)
+ * <li> If AppendEntries fails because of log inconsistency:
+ * decrement nextIndex and retry (§5.3)
+ * </ul>
+ * <li> If there exists an N such that N > commitIndex, a majority
+ * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ * set commitIndex = N (§5.3, §5.4).
+ */
+public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+ protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
+ protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+
+ protected final Set<String> followers;
+
+ private Cancellable heartbeatSchedule = null;
+
+ private List<ClientRequestTracker> trackerList = new ArrayList<>();
+
+ protected final int minReplicationCount;
+
+ protected final int minIsolatedLeaderPeerCount;
+
+ private Optional<ByteString> snapshot;
+
+ public AbstractLeader(RaftActorContext context) {
+ super(context);
+
+ followers = context.getPeerAddresses().keySet();
+
+ for (String followerId : followers) {
+ FollowerLogInformation followerLogInformation =
+ new FollowerLogInformationImpl(followerId,
+ new AtomicLong(context.getCommitIndex()),
+ new AtomicLong(-1),
+ context.getConfigParams().getElectionTimeOutInterval());
+
+ followerToLog.put(followerId, followerLogInformation);
+ }
+
+ leaderId = context.getId();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Election:Leader has following peers: {}", followers);
+ }
+
+ minReplicationCount = getMajorityVoteCount(followers.size());
+
+ // the isolated Leader peer count will be 1 less than the majority vote count.
+ // this is because the vote count has the self vote counted in it
+ // for e.g
+ // 0 peers = 1 votesRequired , minIsolatedLeaderPeerCount = 0
+ // 2 peers = 2 votesRequired , minIsolatedLeaderPeerCount = 1
+ // 4 peers = 3 votesRequired, minIsolatedLeaderPeerCount = 2
+ minIsolatedLeaderPeerCount = minReplicationCount > 0 ? (minReplicationCount - 1) : 0;
+
+ snapshot = Optional.absent();
+
+ // Immediately schedule a heartbeat
+ // Upon election: send initial empty AppendEntries RPCs
+ // (heartbeat) to each server; repeat during idle periods to
+ // prevent election timeouts (§5.2)
+ scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ }
+
+ private Optional<ByteString> getSnapshot() {
+ return snapshot;
+ }
+
+ @VisibleForTesting
+ void setSnapshot(Optional<ByteString> snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender,
+ AppendEntries appendEntries) {
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntries.toString());
+ }
+
+ return this;
+ }
+
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply) {
+
+ if(! appendEntriesReply.isSuccess()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(appendEntriesReply.toString());
+ }
+ }
+
+ // Update the FollowerLogInformation
+ String followerId = appendEntriesReply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ if(followerLogInformation == null){
+ LOG.error("Unknown follower {}", followerId);
+ return this;
+ }
+
+ followerLogInformation.markFollowerActive();
+
+ if (appendEntriesReply.isSuccess()) {
+ followerLogInformation
+ .setMatchIndex(appendEntriesReply.getLogLastIndex());
+ followerLogInformation
+ .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
+ } else {
+
+ // TODO: When we find that the follower is out of sync with the
+ // Leader we simply decrement that followers next index by 1.
+ // Would it be possible to do better than this? The RAFT spec
+ // does not explicitly deal with it but may be something for us to
+ // think about
+
+ followerLogInformation.decrNextIndex();
+ }
+
+ // Now figure out if this reply warrants a change in the commitIndex
+ // If there exists an N such that N > commitIndex, a majority
+ // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
+ // set commitIndex = N (§5.3, §5.4).
+ for (long N = context.getCommitIndex() + 1; ; N++) {
+ int replicatedCount = 1;
+
+ for (FollowerLogInformation info : followerToLog.values()) {
+ if (info.getMatchIndex().get() >= N) {
+ replicatedCount++;
+ }
+ }
+
+ if (replicatedCount >= minReplicationCount) {
+ ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(N);
+ if (replicatedLogEntry != null &&
+ replicatedLogEntry.getTerm() == currentTerm()) {
+ context.setCommitIndex(N);
+ }
+ } else {
+ break;
+ }
+ }
+
+ // Apply the change to the state machine
+ if (context.getCommitIndex() > context.getLastApplied()) {
+ applyLogToStateMachine(context.getCommitIndex());
+ }
+
+ return this;
+ }
+
+ protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+
+ ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
+ if(toRemove != null) {
+ trackerList.remove(toRemove);
+ }
+
+ return toRemove;
+ }
+
+ protected ClientRequestTracker findClientRequestTracker(long logIndex) {
+ for (ClientRequestTracker tracker : trackerList) {
+ if (tracker.getIndex() == logIndex) {
+ return tracker;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+ RequestVoteReply requestVoteReply) {
+ return this;
+ }
+
+ @Override
+ public RaftState state() {
+ return RaftState.Leader;
+ }
+
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ Preconditions.checkNotNull(sender, "sender should not be null");
+
+ Object message = fromSerializableMessage(originalMessage);
+
+ if (message instanceof RaftRPC) {
+ RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+ return switchBehavior(new Follower(context));
+ }
+ }
+
+ try {
+ if (message instanceof SendHeartBeat) {
+ sendHeartBeat();
+ return this;
+
+ } else if(message instanceof InitiateInstallSnapshot) {
+ installSnapshotIfNeeded();
+
+ } else if(message instanceof SendInstallSnapshot) {
+ // received from RaftActor
+ setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+ sendInstallSnapshot();
+
+ } else if (message instanceof Replicate) {
+ replicate((Replicate) message);
+
+ } else if (message instanceof InstallSnapshotReply){
+ handleInstallSnapshotReply((InstallSnapshotReply) message);
+
+ }
+ } finally {
+ scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+ }
+
+ return super.handleMessage(sender, message);
+ }
+
+ private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+ String followerId = reply.getFollowerId();
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ followerLogInformation.markFollowerActive();
+
+ if (followerToSnapshot != null &&
+ followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+
+ if (reply.isSuccess()) {
+ if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+ //this was the last chunk reply
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InstallSnapshotReply received, " +
+ "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
+ reply.getChunkIndex(), followerId,
+ context.getReplicatedLog().getSnapshotIndex() + 1
+ );
+ }
+
+ followerLogInformation.setMatchIndex(
+ context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation.setNextIndex(
+ context.getReplicatedLog().getSnapshotIndex() + 1);
+ mapFollowerToSnapshot.remove(followerId);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
+ followerToLog.get(followerId).getNextIndex().get());
+ }
+
+ if (mapFollowerToSnapshot.isEmpty()) {
+ // once there are no pending followers receiving snapshots
+ // we can remove snapshot from the memory
+ setSnapshot(Optional.<ByteString>absent());
+ }
+
+ } else {
+ followerToSnapshot.markSendStatus(true);
+ }
+ } else {
+ LOG.info("InstallSnapshotReply received, " +
+ "sending snapshot chunk failed, Will retry, Chunk:{}",
+ reply.getChunkIndex()
+ );
+ followerToSnapshot.markSendStatus(false);
+ }
+
+ } else {
+ LOG.error("ERROR!!" +
+ "FollowerId in InstallSnapshotReply not known to Leader" +
+ " or Chunk Index in InstallSnapshotReply not matching {} != {}",
+ followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
+ );
+ }
+ }
+
+ private void replicate(Replicate replicate) {
+ long logIndex = replicate.getReplicatedLogEntry().getIndex();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Replicate message {}", logIndex);
+ }
+
+ // Create a tracker entry we will use this later to notify the
+ // client actor
+ trackerList.add(
+ new ClientRequestTrackerImpl(replicate.getClientActor(),
+ replicate.getIdentifier(),
+ logIndex)
+ );
+
+ if (followers.size() == 0) {
+ context.setCommitIndex(logIndex);
+ applyLogToStateMachine(logIndex);
+ } else {
+ sendAppendEntries();
+ }
+ }
+
+ private void sendAppendEntries() {
+ // Send an AppendEntries to all followers
+ for (String followerId : followers) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+ if (followerActor != null) {
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ long followerNextIndex = followerLogInformation.getNextIndex().get();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ List<ReplicatedLogEntry> entries = null;
+
+ if (mapFollowerToSnapshot.get(followerId) != null) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerId);
+ } else {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntriesToFollower(followerActor, followerNextIndex,
+ Collections.<ReplicatedLogEntry>emptyList());
+ }
+
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+
+ if (isFollowerActive &&
+ context.getReplicatedLog().isPresent(followerNextIndex)) {
+ // FIXME : Sending one entry at a time
+ entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex >= followerNextIndex ) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("InitiateInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+ );
+ }
+ actor().tell(new InitiateInstallSnapshot(), actor());
+
+ // we would want to sent AE as the capture snapshot might take time
+ entries = Collections.<ReplicatedLogEntry>emptyList();
+
+ } else {
+ //we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ entries = Collections.<ReplicatedLogEntry>emptyList();
+ }
+
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+
+ }
+ }
+ }
+ }
+
+ private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
+ List<ReplicatedLogEntry> entries) {
+ followerActor.tell(
+ new AppendEntries(currentTerm(), context.getId(),
+ prevLogIndex(followerNextIndex),
+ prevLogTerm(followerNextIndex), entries,
+ context.getCommitIndex()).toSerializable(),
+ actor()
+ );
+ }
+
+ /**
+ * An installSnapshot is scheduled at a interval that is a multiple of
+ * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
+ * snapshots at every heartbeat.
+ *
+ * Install Snapshot works as follows
+ * 1. Leader sends a InitiateInstallSnapshot message to self
+ * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
+ * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
+ * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
+ * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
+ * 5. On complete, Follower sends back a InstallSnapshotReply.
+ * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
+ * and replenishes the memory by deleting the snapshot in Replicated log.
+ *
+ */
+ private void installSnapshotIfNeeded() {
+ for (String followerId : followers) {
+ ActorSelection followerActor =
+ context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if (!context.getReplicatedLog().isPresent(nextIndex) &&
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ LOG.info("{} follower needs a snapshot install", followerId);
+ if (snapshot.isPresent()) {
+ // if a snapshot is present in the memory, most likely another install is in progress
+ // no need to capture snapshot
+ sendSnapshotChunk(followerActor, followerId);
+
+ } else {
+ initiateCaptureSnapshot();
+ //we just need 1 follower who would need snapshot to be installed.
+ // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
+ // who needs an install and send to all who need
+ break;
+ }
+
+ }
+ }
+ }
+ }
+
+ // on every install snapshot, we try to capture the snapshot.
+ // Once a capture is going on, another one issued will get ignored by RaftActor.
+ private void initiateCaptureSnapshot() {
+ LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
+ ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
+ lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ boolean isInstallSnapshotInitiated = true;
+ actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
+ lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
+ actor());
+ }
+
+
+ private void sendInstallSnapshot() {
+ for (String followerId : followers) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+
+ if(followerActor != null) {
+ FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if (!context.getReplicatedLog().isPresent(nextIndex) &&
+ context.getReplicatedLog().isInSnapshot(nextIndex)) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends a snapshot chunk to a given follower
+ * InstallSnapshot should qualify as a heartbeat too.
+ */
+ private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
+ try {
+ if (snapshot.isPresent()) {
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ getNextSnapshotChunk(followerId,snapshot.get()),
+ mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks()
+ ).toSerializable(),
+ actor()
+ );
+ LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
+ followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
+ mapFollowerToSnapshot.get(followerId).getTotalChunks());
+ }
+ } catch (IOException e) {
+ LOG.error(e, "InstallSnapshot failed for Leader.");
+ }
+ }
+
+ /**
+ * Acccepts snaphot as ByteString, enters into map for future chunks
+ * creates and return a ByteString chunk
+ */
+ private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
+ FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ if (followerToSnapshot == null) {
+ followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+ mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+ }
+ ByteString nextChunk = followerToSnapshot.getNextChunk();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
+ }
+ return nextChunk;
+ }
+
+ private void sendHeartBeat() {
+ if (followers.size() > 0) {
+ sendAppendEntries();
+ }
+ }
+
+ private void stopHeartBeat() {
+ if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+ heartbeatSchedule.cancel();
+ }
+ }
+
+ private void scheduleHeartBeat(FiniteDuration interval) {
+ if(followers.size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopHeartBeat();
+
+ // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
+ // message is sent to itself.
+ // Scheduling the heartbeat only once here because heartbeats do not
+ // need to be sent if there are other messages being sent to the remote
+ // actor.
+ heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
+ interval, context.getActor(), new SendHeartBeat(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+ @Override
+ public void close() throws Exception {
+ stopHeartBeat();
+ }
+
+ @Override
+ public String getLeaderId() {
+ return context.getId();
+ }
+
+ protected boolean isLeaderIsolated() {
+ int minPresent = minIsolatedLeaderPeerCount;
+ for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ if (followerLogInformation.isFollowerActive()) {
+ --minPresent;
+ if (minPresent == 0) {
+ break;
+ }
+ }
+ }
+ return (minPresent != 0);
+ }
+
+ /**
+ * Encapsulates the snapshot bytestring and handles the logic of sending
+ * snapshot chunks
+ */
+ protected class FollowerToSnapshot {
+ private ByteString snapshotBytes;
+ private int offset = 0;
+ // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+ private int replyReceivedForOffset;
+ // if replyStatus is false, the previous chunk is attempted
+ private boolean replyStatus = false;
+ private int chunkIndex;
+ private int totalChunks;
+
+ public FollowerToSnapshot(ByteString snapshotBytes) {
+ this.snapshotBytes = snapshotBytes;
+ replyReceivedForOffset = -1;
+ chunkIndex = 1;
+ int size = snapshotBytes.size();
+ totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
+ ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot {} bytes, total chunks to send:{}",
+ size, totalChunks);
+ }
+ }
+
+ public ByteString getSnapshotBytes() {
+ return snapshotBytes;
+ }
+
+ public int incrementOffset() {
+ if(replyStatus) {
+ // if prev chunk failed, we would want to sent the same chunk again
+ offset = offset + context.getConfigParams().getSnapshotChunkSize();
+ }
+ return offset;
+ }
+
+ public int incrementChunkIndex() {
+ if (replyStatus) {
+ // if prev chunk failed, we would want to sent the same chunk again
+ chunkIndex = chunkIndex + 1;
+ }
+ return chunkIndex;
+ }
+
+ public int getChunkIndex() {
+ return chunkIndex;
+ }
+
+ public int getTotalChunks() {
+ return totalChunks;
+ }
+
+ public boolean canSendNextChunk() {
+ // we only send a false if a chunk is sent but we have not received a reply yet
+ return replyReceivedForOffset == offset;
+ }
+
+ public boolean isLastChunk(int chunkIndex) {
+ return totalChunks == chunkIndex;
+ }
+
+ public void markSendStatus(boolean success) {
+ if (success) {
+ // if the chunk sent was successful
+ replyReceivedForOffset = offset;
+ replyStatus = true;
+ } else {
+ // if the chunk sent was failure
+ replyReceivedForOffset = offset;
+ replyStatus = false;
+ }
+ }
+
+ public ByteString getNextChunk() {
+ int snapshotLength = getSnapshotBytes().size();
+ int start = incrementOffset();
+ int size = context.getConfigParams().getSnapshotChunkSize();
+ if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
+ size = snapshotLength;
+ } else {
+ if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
+ size = snapshotLength - start;
+ }
+ }
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("length={}, offset={},size={}",
+ snapshotLength, start, size);
+ }
+ return getSnapshotBytes().substring(start, start + size);
+
+ }
+ }
+
+ // called from example-actor for printing the follower-states
+ public String printFollowerStates() {
+ StringBuilder sb = new StringBuilder();
+ for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+
+ }
+ return "[" + sb.toString() + "]";
+ }
+
+ @VisibleForTesting
+ void markFollowerActive(String followerId) {
+ followerToLog.get(followerId).markFollowerActive();
+ }
+}
}
protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
- LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
+ LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state());
try {
close();
} catch (Exception e) {
return behavior;
}
+
+ protected int getMajorityVoteCount(int numPeers) {
+ // Votes are required from a majority of the peers including self.
+ // The numMajority field therefore stores a calculated value
+ // of the number of votes required for this candidate to win an
+ // election based on it's known peers.
+ // If a peer was added during normal operation and raft replicas
+ // came to know about them then the new peer would also need to be
+ // taken into consideration when calculating this value.
+ // Here are some examples for what the numMajority would be for n
+ // peers
+ // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1
+ // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2
+ // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3
+
+ int numMajority = 0;
+ if (numPeers > 0) {
+ int self = 1;
+ numMajority = (numPeers + self) / 2 + 1;
+ }
+ return numMajority;
+
+ }
}
LOG.debug("Election:Candidate has following peers: {}", peers);
}
- if(peers.size() > 0) {
- // Votes are required from a majority of the peers including self.
- // The votesRequired field therefore stores a calculated value
- // of the number of votes required for this candidate to win an
- // election based on it's known peers.
- // If a peer was added during normal operation and raft replicas
- // came to know about them then the new peer would also need to be
- // taken into consideration when calculating this value.
- // Here are some examples for what the votesRequired would be for n
- // peers
- // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
- // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
- // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
- int noOfPeers = peers.size();
- int self = 1;
- votesRequired = (noOfPeers + self) / 2 + 1;
- } else {
- votesRequired = 0;
- }
+ votesRequired = getMajorityVoteCount(peers.size());
startNewTerm();
scheduleElection(electionDuration());
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+
+/**
+ * Leader which is termed as isolated.
+ * <p/>
+ * If the reply from the majority of the followers is not received then the leader changes its behavior
+ * to IsolatedLeader. An isolated leader may have followers and they would continue to receive replicated messages.
+ * <p/>
+ * A schedule is run, at an interval of (10 * Heartbeat-time-interval), in the Leader
+ * to check if its isolated or not.
+ * <p/>
+ * In the Isolated Leader , on every AppendEntriesReply, we aggressively check if the leader is isolated.
+ * If no, then the state is switched back to Leader.
+ *
+ */
+public class IsolatedLeader extends AbstractLeader {
+ public IsolatedLeader(RaftActorContext context) {
+ super(context);
+ }
+
+ // we received an Append Entries reply, we should switch the Behavior to Leader
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+ AppendEntriesReply appendEntriesReply) {
+ RaftActorBehavior ret = super.handleAppendEntriesReply(sender, appendEntriesReply);
+
+ // it can happen that this isolated leader interacts with a new leader in the cluster and
+ // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
+ if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
+ LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", leaderId);
+ return switchBehavior(new Leader(context));
+ }
+ return ret;
+ }
+
+ @Override
+ public RaftState state() {
+ return RaftState.IsolatedLeader;
+ }
+}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
-import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
-import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
-import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
-import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
-import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import scala.concurrent.duration.FiniteDuration;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The behavior of a RaftActor when it is in the Leader state
* <p/>
* of matchIndex[i] ≥ N, and log[N].term == currentTerm:
* set commitIndex = N (§5.3, §5.4).
*/
-public class Leader extends AbstractRaftActorBehavior {
-
-
- protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
- private final Set<String> followers;
-
- private Cancellable heartbeatSchedule = null;
+public class Leader extends AbstractLeader {
private Cancellable installSnapshotSchedule = null;
-
- private List<ClientRequestTracker> trackerList = new ArrayList<>();
-
- private final int minReplicationCount;
-
- private Optional<ByteString> snapshot;
+ private Cancellable isolatedLeaderCheckSchedule = null;
public Leader(RaftActorContext context) {
super(context);
- followers = context.getPeerAddresses().keySet();
-
- for (String followerId : followers) {
- FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl(followerId,
- new AtomicLong(context.getCommitIndex()),
- new AtomicLong(-1),
- context.getConfigParams().getElectionTimeOutInterval());
-
- followerToLog.put(followerId, followerLogInformation);
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers: {}", followers);
- }
-
- if (followers.size() > 0) {
- minReplicationCount = (followers.size() + 1) / 2 + 1;
- } else {
- minReplicationCount = 0;
- }
-
- snapshot = Optional.absent();
-
- // Immediately schedule a heartbeat
- // Upon election: send initial empty AppendEntries RPCs
- // (heartbeat) to each server; repeat during idle periods to
- // prevent election timeouts (§5.2)
- scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
-
- scheduleInstallSnapshotCheck(
- new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
- context.getConfigParams().getHeartBeatInterval().unit())
- );
-
- }
-
- private Optional<ByteString> getSnapshot() {
- return snapshot;
- }
-
- @VisibleForTesting
- void setSnapshot(Optional<ByteString> snapshot) {
- this.snapshot = snapshot;
- }
-
- @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntries.toString());
- }
-
- return this;
- }
-
- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
- AppendEntriesReply appendEntriesReply) {
-
- if(! appendEntriesReply.isSuccess()) {
- if(LOG.isDebugEnabled()) {
- LOG.debug(appendEntriesReply.toString());
- }
- }
-
- // Update the FollowerLogInformation
- String followerId = appendEntriesReply.getFollowerId();
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- if(followerLogInformation == null){
- LOG.error("Unknown follower {}", followerId);
- return this;
- }
-
- followerLogInformation.markFollowerActive();
-
- if (appendEntriesReply.isSuccess()) {
- followerLogInformation
- .setMatchIndex(appendEntriesReply.getLogLastIndex());
- followerLogInformation
- .setNextIndex(appendEntriesReply.getLogLastIndex() + 1);
- } else {
-
- // TODO: When we find that the follower is out of sync with the
- // Leader we simply decrement that followers next index by 1.
- // Would it be possible to do better than this? The RAFT spec
- // does not explicitly deal with it but may be something for us to
- // think about
-
- followerLogInformation.decrNextIndex();
- }
-
- // Now figure out if this reply warrants a change in the commitIndex
- // If there exists an N such that N > commitIndex, a majority
- // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
- // set commitIndex = N (§5.3, §5.4).
- for (long N = context.getCommitIndex() + 1; ; N++) {
- int replicatedCount = 1;
-
- for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
- replicatedCount++;
- }
- }
-
- if (replicatedCount >= minReplicationCount) {
- ReplicatedLogEntry replicatedLogEntry =
- context.getReplicatedLog().get(N);
- if (replicatedLogEntry != null
- && replicatedLogEntry.getTerm()
- == currentTerm()) {
- context.setCommitIndex(N);
- }
- } else {
- break;
- }
- }
-
- // Apply the change to the state machine
- if (context.getCommitIndex() > context.getLastApplied()) {
- applyLogToStateMachine(context.getCommitIndex());
- }
-
- return this;
- }
-
- protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
- ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
- if(toRemove != null) {
- trackerList.remove(toRemove);
- }
-
- return toRemove;
- }
-
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
-
- return null;
- }
-
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
- return this;
- }
+ scheduleInstallSnapshotCheck(context.getConfigParams().getIsolatedCheckInterval());
- @Override public RaftState state() {
- return RaftState.Leader;
+ scheduleIsolatedLeaderCheck(
+ new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
+ context.getConfigParams().getHeartBeatInterval().unit()));
}
@Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
- Object message = fromSerializableMessage(originalMessage);
-
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
- return switchBehavior(new Follower(context));
- }
- }
-
- try {
- if (message instanceof SendHeartBeat) {
- sendHeartBeat();
- return this;
-
- } else if(message instanceof InitiateInstallSnapshot) {
- installSnapshotIfNeeded();
-
- } else if(message instanceof SendInstallSnapshot) {
- // received from RaftActor
- setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
- sendInstallSnapshot();
-
- } else if (message instanceof Replicate) {
- replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
- handleInstallSnapshotReply(
- (InstallSnapshotReply) message);
- }
- } finally {
- scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- }
-
- return super.handleMessage(sender, message);
- }
-
- private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
- String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- followerLogInformation.markFollowerActive();
-
- if (followerToSnapshot != null &&
- followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
- if (reply.isSuccess()) {
- if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
- //this was the last chunk reply
- if(LOG.isDebugEnabled()) {
- LOG.debug("InstallSnapshotReply received, " +
- "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
- reply.getChunkIndex(), followerId,
- context.getReplicatedLog().getSnapshotIndex() + 1
- );
- }
-
- followerLogInformation.setMatchIndex(
- context.getReplicatedLog().getSnapshotIndex());
- followerLogInformation.setNextIndex(
- context.getReplicatedLog().getSnapshotIndex() + 1);
- mapFollowerToSnapshot.remove(followerId);
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
- }
-
- if (mapFollowerToSnapshot.isEmpty()) {
- // once there are no pending followers receiving snapshots
- // we can remove snapshot from the memory
- setSnapshot(Optional.<ByteString>absent());
- }
-
- } else {
- followerToSnapshot.markSendStatus(true);
- }
- } else {
- LOG.info("InstallSnapshotReply received, " +
- "sending snapshot chunk failed, Will retry, Chunk:{}",
- reply.getChunkIndex()
- );
- followerToSnapshot.markSendStatus(false);
+ if (originalMessage instanceof IsolatedLeaderCheck) {
+ if (isLeaderIsolated()) {
+ LOG.info("At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ minIsolatedLeaderPeerCount, leaderId);
+ return switchBehavior(new IsolatedLeader(context));
}
-
- } else {
- LOG.error("ERROR!!" +
- "FollowerId in InstallSnapshotReply not known to Leader" +
- " or Chunk Index in InstallSnapshotReply not matching {} != {}",
- followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
- );
- }
- }
-
- private void replicate(Replicate replicate) {
- long logIndex = replicate.getReplicatedLogEntry().getIndex();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Replicate message {}", logIndex);
}
- // Create a tracker entry we will use this later to notify the
- // client actor
- trackerList.add(
- new ClientRequestTrackerImpl(replicate.getClientActor(),
- replicate.getIdentifier(),
- logIndex)
- );
-
- if (followers.size() == 0) {
- context.setCommitIndex(logIndex);
- applyLogToStateMachine(logIndex);
- } else {
- sendAppendEntries();
- }
+ return super.handleMessage(sender, originalMessage);
}
- private void sendAppendEntries() {
- // Send an AppendEntries to all followers
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- List<ReplicatedLogEntry> entries = null;
-
- if (mapFollowerToSnapshot.get(followerId) != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList());
- }
-
- } else {
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-
- if (isFollowerActive &&
- context.getReplicatedLog().isPresent(followerNextIndex)) {
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex ) {
- // if the followers next index is not present in the leaders log, and
- // if the follower is just not starting and if leader's index is more than followers index
- // then snapshot should be sent
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("InitiateInstallSnapshot to follower:{}," +
- "follower-nextIndex:{}, leader-snapshot-index:{}, " +
- "leader-last-index:{}", followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex
- );
- }
- actor().tell(new InitiateInstallSnapshot(), actor());
-
- // we would want to sent AE as the capture snapshot might take time
- entries = Collections.<ReplicatedLogEntry>emptyList();
-
- } else {
- //we send an AppendEntries, even if the follower is inactive
- // in-order to update the followers timestamp, in case it becomes active again
- entries = Collections.<ReplicatedLogEntry>emptyList();
- }
-
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
- }
- }
- }
- }
-
- private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
- List<ReplicatedLogEntry> entries) {
- followerActor.tell(
- new AppendEntries(currentTerm(), context.getId(),
- prevLogIndex(followerNextIndex),
- prevLogTerm(followerNextIndex), entries,
- context.getCommitIndex()).toSerializable(),
- actor()
- );
- }
-
- /**
- * An installSnapshot is scheduled at a interval that is a multiple of
- * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing
- * snapshots at every heartbeat.
- *
- * Install Snapshot works as follows
- * 1. Leader sends a InitiateInstallSnapshot message to self
- * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor
- * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
- * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
- * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
- * 5. On complete, Follower sends back a InstallSnapshotReply.
- * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
- * and replenishes the memory by deleting the snapshot in Replicated log.
- *
- */
- private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
- if (snapshot.isPresent()) {
- // if a snapshot is present in the memory, most likely another install is in progress
- // no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
-
- } else {
- initiateCaptureSnapshot();
- //we just need 1 follower who would need snapshot to be installed.
- // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
- // who needs an install and send to all who need
- break;
- }
-
- }
- }
- }
- }
-
- // on every install snapshot, we try to capture the snapshot.
- // Once a capture is going on, another one issued will get ignored by RaftActor.
- private void initiateCaptureSnapshot() {
- LOG.info("Initiating Snapshot Capture to Install Snapshot, Leader:{}", getLeaderId());
- ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
- lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
- lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
- }
-
- boolean isInstallSnapshotInitiated = true;
- actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
- lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
- actor());
- }
-
-
- private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
-
- if (!context.getReplicatedLog().isPresent(nextIndex) &&
- context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
- }
- }
- }
- }
-
- /**
- * Sends a snapshot chunk to a given follower
- * InstallSnapshot should qualify as a heartbeat too.
- */
- private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
- try {
- if (snapshot.isPresent()) {
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- context.getReplicatedLog().getSnapshotIndex(),
- context.getReplicatedLog().getSnapshotTerm(),
- getNextSnapshotChunk(followerId,snapshot.get()),
- mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks()
- ).toSerializable(),
- actor()
- );
- LOG.info("InstallSnapshot sent to follower {}, Chunk: {}/{}",
- followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
- mapFollowerToSnapshot.get(followerId).getTotalChunks());
- }
- } catch (IOException e) {
- LOG.error(e, "InstallSnapshot failed for Leader.");
- }
- }
-
- /**
- * Acccepts snaphot as ByteString, enters into map for future chunks
- * creates and return a ByteString chunk
- */
- private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
- mapFollowerToSnapshot.put(followerId, followerToSnapshot);
- }
- ByteString nextChunk = followerToSnapshot.getNextChunk();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Leader's snapshot nextChunk size:{}", nextChunk.size());
- }
- return nextChunk;
- }
-
- private void sendHeartBeat() {
- if (followers.size() > 0) {
- sendAppendEntries();
- }
- }
-
- private void stopHeartBeat() {
- if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
- heartbeatSchedule.cancel();
- }
- }
-
- private void stopInstallSnapshotSchedule() {
+ protected void stopInstallSnapshotSchedule() {
if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
installSnapshotSchedule.cancel();
}
}
- private void scheduleHeartBeat(FiniteDuration interval) {
- if(followers.size() == 0){
- // Optimization - do not bother scheduling a heartbeat as there are
- // no followers
- return;
- }
-
- stopHeartBeat();
-
- // Schedule a heartbeat. When the scheduler triggers a SendHeartbeat
- // message is sent to itself.
- // Scheduling the heartbeat only once here because heartbeats do not
- // need to be sent if there are other messages being sent to the remote
- // actor.
- heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce(
- interval, context.getActor(), new SendHeartBeat(),
- context.getActorSystem().dispatcher(), context.getActor());
- }
-
- private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
if(followers.size() == 0){
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
context.getActorSystem().dispatcher(), context.getActor());
}
-
-
- @Override public void close() throws Exception {
- stopHeartBeat();
- }
-
- @Override public String getLeaderId() {
- return context.getId();
- }
-
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private int totalChunks;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- replyReceivedForOffset = -1;
- chunkIndex = 1;
- int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot {} bytes, total chunks to send:{}",
- size, totalChunks);
- }
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public ByteString getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("length={}, offset={},size={}",
- snapshotLength, start, size);
- }
- return getSnapshotBytes().substring(start, start + size);
-
+ protected void stopIsolatedLeaderCheckSchedule() {
+ if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
+ isolatedLeaderCheckSchedule.cancel();
}
}
- // called from example-actor for printing the follower-states
- public String printFollowerStates() {
- StringBuilder sb = new StringBuilder();
- for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+ protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
+ isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
+ context.getActor(), new IsolatedLeaderCheck(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
- }
- return "[" + sb.toString() + "]";
+ @Override public void close() throws Exception {
+ stopInstallSnapshotSchedule();
+ stopIsolatedLeaderCheckSchedule();
+ super.close();
}
@VisibleForTesting
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IsolatedLeaderTest extends AbstractRaftActorBehaviorTest {
+
+ private ActorRef leaderActor =
+ getSystem().actorOf(Props.create(DoNothingActor.class));
+
+ private ActorRef senderActor =
+ getSystem().actorOf(Props.create(DoNothingActor.class));
+
+ @Override
+ protected RaftActorBehavior createBehavior(
+ RaftActorContext actorContext) {
+ return new Leader(actorContext);
+ }
+
+ @Override
+ protected RaftActorContext createActorContext() {
+ return createActorContext(leaderActor);
+ }
+
+
+ @Test
+ public void testHandleMessageWithThreeMembers() {
+ new JavaTestKit(getSystem()) {{
+ String followerAddress1 = "akka://test/user/$a";
+ String followerAddress2 = "akka://test/user/$b";
+
+ MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerAddress1);
+ peerAddresses.put("follower-2", followerAddress2);
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+ assertTrue(isolatedLeader.state() == RaftState.IsolatedLeader);
+
+ // in a 3 node cluster, even if 1 follower is returns a reply, the isolatedLeader is not isolated
+ RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+ new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
+ isolatedLeader.lastIndex() - 1, isolatedLeader.lastTerm() - 1));
+
+ assertEquals(RaftState.Leader, behavior.state());
+
+ behavior = isolatedLeader.handleMessage(senderActor,
+ new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+ assertEquals(RaftState.Leader, behavior.state());
+ }};
+ }
+
+ @Test
+ public void testHandleMessageWithFiveMembers() {
+ new JavaTestKit(getSystem()) {{
+
+ String followerAddress1 = "akka://test/user/$a";
+ String followerAddress2 = "akka://test/user/$b";
+ String followerAddress3 = "akka://test/user/$c";
+ String followerAddress4 = "akka://test/user/$d";
+
+ MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerAddress1);
+ peerAddresses.put("follower-2", followerAddress2);
+ peerAddresses.put("follower-3", followerAddress3);
+ peerAddresses.put("follower-4", followerAddress4);
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+ assertEquals(RaftState.IsolatedLeader, isolatedLeader.state());
+
+ // in a 5 member cluster, atleast 2 followers need to be active and return a reply
+ RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+ new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() - 1, true,
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+ assertEquals(RaftState.IsolatedLeader, behavior.state());
+
+ behavior = isolatedLeader.handleMessage(senderActor,
+ new AppendEntriesReply("follower-2", isolatedLeader.lastTerm() - 1, true,
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+ assertEquals(RaftState.Leader, behavior.state());
+
+ behavior = isolatedLeader.handleMessage(senderActor,
+ new AppendEntriesReply("follower-3", isolatedLeader.lastTerm() - 1, true,
+ isolatedLeader.lastIndex() -1, isolatedLeader.lastTerm() -1 ));
+
+ assertEquals(RaftState.Leader, behavior.state());
+ }};
+ }
+
+ @Test
+ public void testHandleMessageFromAnotherLeader() {
+ new JavaTestKit(getSystem()) {{
+ String followerAddress1 = "akka://test/user/$a";
+ String followerAddress2 = "akka://test/user/$b";
+
+ MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerAddress1);
+ peerAddresses.put("follower-2", followerAddress2);
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ IsolatedLeader isolatedLeader = new IsolatedLeader(leaderActorContext);
+ assertTrue(isolatedLeader.state() == RaftState.IsolatedLeader);
+
+ // if an append-entries reply is received by the isolated-leader, and that reply
+ // has a term > than its own term, then IsolatedLeader switches to Follower
+ // bowing itself to another leader in the cluster
+ RaftActorBehavior behavior = isolatedLeader.handleMessage(senderActor,
+ new AppendEntriesReply("follower-1", isolatedLeader.lastTerm() + 1, true,
+ isolatedLeader.lastIndex() + 1, isolatedLeader.lastTerm() + 1));
+
+ assertEquals(RaftState.Follower, behavior.state());
+ }};
+
+ }
+}
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }};
+ }
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef leaderActor = getTestActor();
- }};
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ Leader leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
+ }};
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef followerActor1 = getTestActor();
+ ActorRef followerActor2 = getTestActor();
+
+ MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.stopIsolatedLeaderCheckSchedule();
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ //sleep enough for all the follower stopwatches to lapse
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ //sleep enough for the remaining the follower stopwatches to lapse
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getElectionTimeOutInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+
+ }};
}
class MockLeader extends Leader {
ValueSerializer.serialize(builder, this, value);
} else if (value instanceof Iterable) {
- Iterable iterable = (Iterable) value;
+ Iterable<?> iterable = (Iterable<?>) value;
for (Object o : iterable) {
if (o instanceof NormalizedNode) {
YangInstanceIdentifier instance = (YangInstanceIdentifier) secondNode.getValue();
Iterable<YangInstanceIdentifier.PathArgument> iterable = instance.getPathArguments();
- Iterator it = iterable.iterator();
+ Iterator<YangInstanceIdentifier.PathArgument> it = iterable.iterator();
YangInstanceIdentifier.NodeIdentifier firstPath = (YangInstanceIdentifier.NodeIdentifier) it.next();
Assert.assertEquals("node", firstPath.getNodeType().getLocalName());
YangInstanceIdentifier.NodeIdentifierWithPredicates secondPath = (YangInstanceIdentifier.NodeIdentifierWithPredicates)it.next();
private Timeout shardLeaderElectionTimeout = new Timeout(30, TimeUnit.SECONDS);
private boolean persistent = true;
private ConfigurationReader configurationReader = new FileConfigurationReader();
+ private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
-
public Builder persistent(boolean persistent){
this.persistent = persistent;
return this;
}
+ public Builder shardIsolatedLeaderCheckIntervalInMillis(int shardIsolatedLeaderCheckIntervalInMillis) {
+ this.shardIsolatedLeaderCheckIntervalInMillis = shardIsolatedLeaderCheckIntervalInMillis;
+ return this;
+ }
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+ raftConfig.setIsolatedLeaderCheckInterval(
+ new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
return new DatastoreContext(dataStoreProperties, raftConfig, dataStoreMXBeanType,
operationTimeoutInSeconds, shardTransactionIdleTimeout,
.shardTransactionCommitQueueCapacity(
props.getShardTransactionCommitQueueCapacity().getValue().intValue())
.persistent(props.getPersistent().booleanValue())
+ .shardIsolatedLeaderCheckIntervalInMillis(
+ props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.build();
return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
.shardTransactionCommitQueueCapacity(
props.getShardTransactionCommitQueueCapacity().getValue().intValue())
.persistent(props.getPersistent().booleanValue())
+ .shardIsolatedLeaderCheckIntervalInMillis(
+ props.getShardIsolatedLeaderCheckIntervalInMillis().getValue())
.build();
return DistributedDataStoreFactory.createInstance("operational",
type boolean;
description "Enable or disable data persistence";
}
+
+ leaf shard-isolated-leader-check-interval-in-millis {
+ default 5000;
+ type heartbeat-interval-type;
+ description "The interval at which the leader of the shard will check if its majority
+ followers are active and term itself as isolated";
+ }
}
// Augments the 'configuration' choice node under modules/module.
while (entry.getValue().next()) {
Map rec = entry.getValue().getCurrent();
Map newRec = new HashMap();
- for (Iterator iter = rec.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<?> iter = rec.entrySet().iterator(); iter.hasNext();) {
Map.Entry e = (Map.Entry) iter.next();
String key = (String) e.getKey();
Object value = e.getValue();
import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkCondition;
import static org.opendaylight.controller.config.api.JmxAttributeValidationException.checkNotNull;
-
import com.google.common.base.Optional;
+import io.netty.util.concurrent.EventExecutor;
+import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
final NetconfClientDispatcher dispatcher = getClientDispatcherDependency();
listener.initializeRemoteConnection(dispatcher, clientConfig);
- return new AutoCloseable() {
- @Override
- public void close() throws Exception {
- listener.close();
- salFacade.close();
- }
- };
+ return new MyAutoCloseable(listener, salFacade);
}
private Optional<NetconfSessionCapabilities> getUserCapabilities() {
public NetconfReconnectingClientConfiguration getClientConfig(final NetconfDeviceCommunicator listener) {
final InetSocketAddress socketAddress = getSocketAddress();
- final ReconnectStrategy strategy = getReconnectStrategy();
final long clientConnectionTimeoutMillis = getConnectionTimeoutMillis();
+ final ReconnectStrategyFactory sf = new MyReconnectStrategyFactory(
+ getEventExecutorDependency(), getMaxConnectionAttempts(), getBetweenAttemptsTimeoutMillis(), getSleepFactor());
+ final ReconnectStrategy strategy = sf.createReconnectStrategy();
+
return NetconfReconnectingClientConfigurationBuilder.create()
.withAddress(socketAddress)
.withConnectionTimeoutMillis(clientConnectionTimeoutMillis)
.withProtocol(getTcpOnly() ?
NetconfClientConfiguration.NetconfClientProtocol.TCP :
NetconfClientConfiguration.NetconfClientProtocol.SSH)
- .withConnectStrategyFactory(new ReconnectStrategyFactory() {
- @Override
- public ReconnectStrategy createReconnectStrategy() {
- return getReconnectStrategy();
- }
- })
+ .withConnectStrategyFactory(sf)
.build();
}
- private ReconnectStrategy getReconnectStrategy() {
- final Long connectionAttempts;
- if (getMaxConnectionAttempts() != null && getMaxConnectionAttempts() > 0) {
- connectionAttempts = getMaxConnectionAttempts();
- } else {
- logger.trace("Setting {} on {} to infinity", maxConnectionAttemptsJmxAttribute, this);
- connectionAttempts = null;
+ private static final class MyAutoCloseable implements AutoCloseable {
+ private final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade;
+ private final NetconfDeviceCommunicator listener;
+
+ public MyAutoCloseable(final NetconfDeviceCommunicator listener,
+ final RemoteDeviceHandler<NetconfSessionCapabilities> salFacade) {
+ this.listener = listener;
+ this.salFacade = salFacade;
}
- final double sleepFactor = getSleepFactor().doubleValue();
- final int minSleep = getBetweenAttemptsTimeoutMillis();
- final Long maxSleep = null;
- final Long deadline = null;
- return new TimedReconnectStrategy(getEventExecutorDependency(), getBetweenAttemptsTimeoutMillis(),
- minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
+ @Override
+ public void close() {
+ listener.close();
+ salFacade.close();
+ }
+ }
+
+ private static final class MyReconnectStrategyFactory implements ReconnectStrategyFactory {
+ private final Long connectionAttempts;
+ private final EventExecutor executor;
+ private final double sleepFactor;
+ private final int minSleep;
+
+ MyReconnectStrategyFactory(final EventExecutor executor, final Long maxConnectionAttempts, final int minSleep, final BigDecimal sleepFactor) {
+ if (maxConnectionAttempts != null && maxConnectionAttempts > 0) {
+ connectionAttempts = maxConnectionAttempts;
+ } else {
+ logger.trace("Setting {} on {} to infinity", maxConnectionAttemptsJmxAttribute, this);
+ connectionAttempts = null;
+ }
+
+ this.sleepFactor = sleepFactor.doubleValue();
+ this.executor = executor;
+ this.minSleep = minSleep;
+ }
+
+ @Override
+ public ReconnectStrategy createReconnectStrategy() {
+ final Long maxSleep = null;
+ final Long deadline = null;
+
+ return new TimedReconnectStrategy(executor, minSleep,
+ minSleep, sleepFactor, maxSleep, connectionAttempts, deadline);
+ }
}
private InetSocketAddress getSocketAddress() {
if(schemaContext.isPresent()) {
if (NetconfMessageTransformUtil.isDataEditOperation(rpc)) {
final DataNodeContainer schemaForEdit = NetconfMessageTransformUtil.createSchemaForEdit(schemaContext.get());
- w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForEdit, codecProvider);
+ w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForEdit, codecProvider);
} else if (NetconfMessageTransformUtil.isGetOperation(rpc)) {
final DataNodeContainer schemaForGet = NetconfMessageTransformUtil.createSchemaForGet(schemaContext.get());
- w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForGet, codecProvider);
+ w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForGet, codecProvider);
} else if (NetconfMessageTransformUtil.isGetConfigOperation(rpc)) {
final DataNodeContainer schemaForGetConfig = NetconfMessageTransformUtil.createSchemaForGetConfig(schemaContext.get());
- w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForGetConfig, codecProvider);
+ w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForGetConfig, codecProvider);
} else {
final Optional<RpcDefinition> schemaForRpc = NetconfMessageTransformUtil.findSchemaForRpc(rpc, schemaContext.get());
if(schemaForRpc.isPresent()) {
final DataNodeContainer schemaForGetConfig = NetconfMessageTransformUtil.createSchemaForRpc(schemaForRpc.get());
- w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaForGetConfig, codecProvider);
+ w3cPayload = XmlDocumentUtils.toDocument(rpcPayload, schemaContext.get(), schemaForGetConfig, codecProvider);
} else {
w3cPayload = toRpcRequestWithoutSchema(rpcPayload, codecProvider);
}
"Value is not instance of IdentityrefTypeDefinition but is {}. Therefore NULL is used as translation of - {}",
input == null ? "null" : input.getClass(), String.valueOf(input));
return null;
- } else if (type instanceof LeafrefTypeDefinition) {
- if (input instanceof IdentityValuesDTO) {
- return LEAFREF_DEFAULT_CODEC.deserialize(((IdentityValuesDTO) input).getOriginValue());
- }
- return LEAFREF_DEFAULT_CODEC.deserialize(input);
} else if (type instanceof InstanceIdentifierTypeDefinition) {
if (input instanceof IdentityValuesDTO) {
return instanceIdentifier.deserialize(input);
IdentityValue valueWithNamespace = data.getValuesWithNamespaces().get(0);
Module module = getModuleByNamespace(valueWithNamespace.getNamespace(), mountPoint);
if (module == null) {
- logger.info("Module by namespace '{}' of first node in instance-identiefier was not found.",
+ logger.info("Module by namespace '{}' of first node in instance-identifier was not found.",
valueWithNamespace.getNamespace());
logger.info("Instance-identifier will be translated as NULL for data - {}",
String.valueOf(valueWithNamespace.getValue()));
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.opendaylight.yangtools.yang.model.api.TypeDefinition;
import org.opendaylight.yangtools.yang.model.api.type.IdentityrefTypeDefinition;
+import org.opendaylight.yangtools.yang.model.api.type.LeafrefTypeDefinition;
import org.opendaylight.yangtools.yang.model.util.EmptyType;
+import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder;
import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder;
import org.slf4j.Logger;
try {
this.normalizeNode(nodeWrap, schema, null, mountPoint);
} catch (IllegalArgumentException e) {
- throw new RestconfDocumentedException(e.getMessage(), ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ RestconfDocumentedException restconfDocumentedException = new RestconfDocumentedException(e.getMessage(), ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ restconfDocumentedException.addSuppressed(e);
+ throw restconfDocumentedException;
}
if (nodeWrap instanceof CompositeNodeWrapper) {
return ((CompositeNodeWrapper) nodeWrap).unwrap();
final Object value = simpleNode.getValue();
Object inputValue = value;
TypeDefinition<? extends Object> typeDefinition = this.typeDefinition(schema);
- if ((typeDefinition instanceof IdentityrefTypeDefinition)) {
- if ((value instanceof String)) {
- inputValue = new IdentityValuesDTO(simpleNode.getNamespace().toString(), (String) value, null,
- (String) value);
- } // else value is already instance of IdentityValuesDTO
+
+ // For leafrefs, extract the type it is pointing to
+ if(typeDefinition instanceof LeafrefTypeDefinition) {
+ typeDefinition = SchemaContextUtil.getBaseTypeForLeafRef(((LeafrefTypeDefinition) typeDefinition), mountPoint == null ? this.controllerContext.getGlobalSchema() : mountPoint.getSchemaContext(), schema);
+ }
+
+ if (typeDefinition instanceof IdentityrefTypeDefinition) {
+ inputValue = parseToIdentityValuesDTO(simpleNode, value, inputValue);
}
Object outputValue = inputValue;
simpleNode.setValue(outputValue);
}
+ private Object parseToIdentityValuesDTO(final SimpleNodeWrapper simpleNode, final Object value, Object inputValue) {
+ if ((value instanceof String)) {
+ inputValue = new IdentityValuesDTO(simpleNode.getNamespace().toString(), (String) value, null,
+ (String) value);
+ } // else value is already instance of IdentityValuesDTO
+ return inputValue;
+ }
+
private void normalizeCompositeNode(final CompositeNodeWrapper compositeNodeBuilder,
final DataNodeContainer schema, final DOMMountPoint mountPoint, final QName currentAugment) {
final List<NodeWrapper<?>> children = compositeNodeBuilder.getValues();
@Test
public void leafrefToNotLeafTest() {
String json = toJson("/cnsn-to-json/leafref/xml/data_ref_to_not_leaf.xml");
- validateJson(".*\"cont-augment-module\\p{Blank}*:\\p{Blank}*lf6\":\\p{Blank}*\"44.33\".*", json);
+ validateJson(".*\"cont-augment-module\\p{Blank}*:\\p{Blank}*lf6\":\\p{Blank}*\"44\".*", json);
}
/**
}
assertNotNull(lf2);
- assertTrue(lf2.getValue() instanceof String);
- assertEquals("121", lf2.getValue());
-
+ assertEquals(121, lf2.getValue());
}
}
import java.io.IOException;
import java.net.URISyntaxException;
+
import javax.ws.rs.WebApplicationException;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.sal.rest.impl.JsonToCompositeNodeProvider;
import org.opendaylight.controller.sal.rest.impl.XmlToCompositeNodeProvider;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class XmlAndJsonToCnSnLeafRefTest extends YangAndXmlAndDataSchemaLoader {
+ final QName refContQName = QName.create("referenced:module", "2014-04-17", "cont");
+ final QName refLf1QName = QName.create(refContQName, "lf1");
+ final QName contQName = QName.create("leafref:module", "2014-04-17", "cont");
+ final QName lf1QName = QName.create(contQName, "lf1");
+ final QName lf2QName = QName.create(contQName, "lf2");
+ final QName lf3QName = QName.create(contQName, "lf3");
+
@BeforeClass
public static void initialize() {
dataLoad("/leafref/yang", 2, "leafref-module", "cont");
CompositeNode cnSn = (CompositeNode)node;
TestUtils.normalizeCompositeNode(cnSn, modules, schemaNodePath);
- verifyContPredicate(cnSn, "/ns:cont/ns:lf1", "/cont/lf1", "/ns:cont/ns:lf1", "../lf1");
+
+ verifyContPredicate(cnSn, "lf4", YangInstanceIdentifier.builder().node(refContQName).node(refLf1QName).build());
+ verifyContPredicate(cnSn, "lf2", YangInstanceIdentifier.builder().node(contQName).node(lf1QName).build());
+ verifyContPredicate(cnSn, "lf3", YangInstanceIdentifier.builder().node(contQName).node(lf2QName).build());
+ verifyContPredicate(cnSn, "lf5", YangInstanceIdentifier.builder().node(contQName).node(lf3QName).build());
}
@Test
CompositeNode cnSn = (CompositeNode)node;
TestUtils.normalizeCompositeNode(cnSn, modules, schemaNodePath);
- verifyContPredicate(cnSn, "/leafref-module:cont/leafref-module:lf1", "/leafref-module:cont/leafref-module:lf1",
- "/referenced-module:cont/referenced-module:lf1", "/leafref-module:cont/leafref-module:lf1");
+
+ verifyContPredicate(cnSn, "lf4", YangInstanceIdentifier.builder().node(refContQName).node(refLf1QName).build());
+ verifyContPredicate(cnSn, "lf2", YangInstanceIdentifier.builder().node(contQName).node(lf1QName).build());
+ verifyContPredicate(cnSn, "lf3", YangInstanceIdentifier.builder().node(contQName).node(lf2QName).build());
+ verifyContPredicate(cnSn, "lf5", YangInstanceIdentifier.builder().node(contQName).node(lf3QName).build());
}
- private void verifyContPredicate(CompositeNode cnSn, String... values) throws URISyntaxException {
- Object lf2Value = null;
- Object lf3Value = null;
- Object lf4Value = null;
- Object lf5Value = null;
-
- for (Node<?> node : cnSn.getValue()) {
- if (node.getNodeType().getLocalName().equals("lf2")) {
- lf2Value = ((SimpleNode<?>) node).getValue();
- } else if (node.getNodeType().getLocalName().equals("lf3")) {
- lf3Value = ((SimpleNode<?>) node).getValue();
- } else if (node.getNodeType().getLocalName().equals("lf4")) {
- lf4Value = ((SimpleNode<?>) node).getValue();
- } else if (node.getNodeType().getLocalName().equals("lf5")) {
- lf5Value = ((SimpleNode<?>) node).getValue();
+ private void verifyContPredicate(CompositeNode cnSn, String leafName, Object value) throws URISyntaxException {
+ Object parsed = null;
+
+ for (final Node<?> node : cnSn.getValue()) {
+ if (node.getNodeType().getLocalName().equals(leafName)) {
+ parsed = node.getValue();
}
}
- assertEquals(values[0], lf2Value);
- assertEquals(values[1], lf3Value);
- assertEquals(values[2], lf4Value);
- assertEquals(values[3], lf5Value);
+
+ assertEquals(value, parsed);
}
}
}
assertNotNull(lf2);
- assertTrue(lf2.getValue() instanceof String);
- assertEquals("121", lf2.getValue());
+ assertEquals(121, lf2.getValue());
}
@Test
module cont-augment-module {
- namespace "cont:augment:module";
+ namespace "cont:augment:module";
prefix "cntaugmod";
-
+
import main-module {prefix mamo; revision-date 2013-12-2;}
-
+
revision 2013-12-2 {
-
+
}
-
+
augment "/mamo:cont" {
leaf-list lflst1 {
type leafref {
- path "../lf1";
+ path "../mamo:lf1";
}
- }
-
+ }
+
leaf lf4 {
type leafref {
- path "../lf1";
+ path "../mamo:lf1";
}
}
-
+
/* reference to not leaf element */
leaf lf6 {
type leafref {
path "../lflst1";
}
}
-
+
leaf lf7 {
type leafref {
path "../lf4";
}
}
}
-
-
-
+
+
+
}
\ No newline at end of file
<cont>
- <lf6>44.33</lf6>
+ <lf6>44</lf6>
</cont>
\ No newline at end of file
"leafref-module:cont" : {
"lf4" : "/referenced-module:cont/referenced-module:lf1",
"lf2" : "/leafref-module:cont/leafref-module:lf1",
- "lf3" : "/leafref-module:cont/leafref-module:lf1",
- "lf5" : "/leafref-module:cont/leafref-module:lf1"
+ "lf3" : "/leafref-module:cont/leafref-module:lf2",
+ "lf5" : "/leafref-module:cont/leafref-module:lf3"
}
}
\ No newline at end of file
-<cont xmlns="leafref:module">
- <lf4 xmlns:ns="referenced:module">/ns:cont/ns:lf1</lf4>
- <lf2 xmlns:ns="leafref:module">/ns:cont/ns:lf1</lf2>
- <lf3 xmlns:ns="leafref:module">/cont/lf1</lf3>
- <lf5 xmlns:ns="leafref:module">../lf1</lf5>
+<cont xmlnsa="leafref:module">
+ <lf4 xmlns:nsa="referenced:module">/nsa:cont/nsa:lf1</lf4>
+ <lf2 xmlns:nsa="leafref:module">/nsa:cont/nsa:lf1</lf2>
+ <lf3 xmlns:ns="leafref:module">/ns:cont/ns:lf2</lf3>
+ <lf5 xmlns:nsa="leafref:module">/nsa:cont/nsa:lf3</lf5>
</cont>
module leafref-module {
- namespace "leafref:module";
+ namespace "leafref:module";
prefix "lfrfmo";
- revision 2013-11-18 {
+ revision 2013-11-18 {
}
+ identity base {}
+
container cont {
leaf lf1 {
type int32;
}
leaf lf2 {
type leafref {
- path "/cont/lf1";
+ path "/cont/lf1";
+ }
+ }
+
+ leaf lf-ident {
+ type identityref {
+ base "lfrfmo:base";
}
}
+
+ leaf lf-ident-ref {
+ type leafref {
+ path "/cont/lf-ident";
+ }
+ }
+
+ leaf lf-ident-ref-relative {
+ type leafref {
+ path "../lf-ident";
+ }
+ }
+
+ leaf lf-ident-ref-relative-cnd {
+ type leafref {
+ path "/lfrfmo:cont/lfrfmo:lis[lfrfmo:id='abc']/lf-ident-ref";
+ }
+ }
+
+
+ list lis {
+ key "id";
+
+ leaf id {
+ type string;
+ }
+
+ leaf lf-ident-ref {
+ type leafref {
+ path "/cont/lf-ident";
+ }
+ }
+ }
+
}
-
+
}
\ No newline at end of file
<cont>
<lf1>121</lf1>
<lf2>121</lf2>
+ <lf-ident xmlns:a="leafref:module">a:base</lf-ident>
+ <lf-ident-ref xmlns:a="leafref:module">a:base</lf-ident-ref>
+ <lf-ident-ref-relative xmlns:a="leafref:module">a:base</lf-ident-ref-relative>
+ <lf-ident-ref-relative-cnd xmlns:a="leafref:module">a:base</lf-ident-ref-relative-cnd>
</cont>
\ No newline at end of file
private static final Logger logger = LoggerFactory.getLogger(Activator.class);
private BundleContext context;
- private ServiceRegistration osgiRegistration;
+ private ServiceRegistration<?> osgiRegistration;
private ConfigRegistryLookupThread configRegistryLookup = null;
@Override
package org.opendaylight.controller.netconf.persist.impl;
import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Attr;
import org.w3c.dom.Element;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
/**
* Inspects snapshot xml to be stored, remove all capabilities that are not referenced by it.
* Useful when persisting current configuration.
*/
public class CapabilityStrippingConfigSnapshotHolder implements ConfigSnapshotHolder {
- private static final Logger logger = LoggerFactory.getLogger(CapabilityStrippingConfigSnapshotHolder.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CapabilityStrippingConfigSnapshotHolder.class);
private final String configSnapshot;
private final StripCapabilitiesResult stripCapabilitiesResult;
static StripCapabilitiesResult stripCapabilities(XmlElement configElement, Set<String> allCapabilitiesFromHello) {
// collect all namespaces
Set<String> foundNamespacesInXML = getNamespaces(configElement);
- logger.trace("All capabilities {}\nFound namespaces in XML {}", allCapabilitiesFromHello, foundNamespacesInXML);
+ LOG.trace("All capabilities {}\nFound namespaces in XML {}", allCapabilitiesFromHello, foundNamespacesInXML);
// required are referenced both in xml and hello
SortedSet<String> requiredCapabilities = new TreeSet<>();
// can be removed
}
}
- logger.trace("Required capabilities {}, \nObsolete capabilities {}",
+ LOG.trace("Required capabilities {}, \nObsolete capabilities {}",
requiredCapabilities, obsoleteCapabilities);
return new StripCapabilitiesResult(requiredCapabilities, obsoleteCapabilities);
package org.opendaylight.controller.netconf.persist.impl;
-import org.opendaylight.controller.config.persist.api.Persister;
-import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
-import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
-import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.Closeable;
+import java.io.IOException;
import javax.annotation.concurrent.ThreadSafe;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection;
import javax.management.Notification;
import javax.management.NotificationListener;
import javax.management.ObjectName;
-import java.io.Closeable;
-import java.io.IOException;
+import org.opendaylight.controller.config.persist.api.Persister;
+import org.opendaylight.controller.netconf.api.jmx.CommitJMXNotification;
+import org.opendaylight.controller.netconf.api.jmx.DefaultCommitOperationMXBean;
+import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Responsible for listening for notifications from netconf (via JMX) containing latest
@ThreadSafe
public class ConfigPersisterNotificationHandler implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterNotificationHandler.class);
private final MBeanServerConnection mBeanServerConnection;
private final NotificationListener listener;
}
private static void registerAsJMXListener(final MBeanServerConnection mBeanServerConnection, final NotificationListener listener) {
- logger.trace("Called registerAsJMXListener");
+ LOG.trace("Called registerAsJMXListener");
try {
mBeanServerConnection.addNotificationListener(DefaultCommitOperationMXBean.OBJECT_NAME, listener, null, null);
} catch (InstanceNotFoundException | IOException e) {
mBeanServerConnection.removeNotificationListener(on, listener);
}
} catch (final Exception e) {
- logger.warn("Unable to unregister {} as listener for {}", listener, on, e);
+ LOG.warn("Unable to unregister {} as listener for {}", listener, on, e);
}
}
}
class ConfigPersisterNotificationListener implements NotificationListener {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterNotificationListener.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterNotificationListener.class);
private final Persister persisterAggregator;
// Socket should not be closed at this point
// Activator unregisters this as JMX listener before close is called
- logger.trace("Received notification {}", notification);
+ LOG.trace("Received notification {}", notification);
if (notification instanceof CommitJMXNotification) {
try {
handleAfterCommitNotification((CommitJMXNotification) notification);
} catch (final Exception e) {
// log exceptions from notification Handler here since
// notificationBroadcastSupport logs only DEBUG level
- logger.warn("Failed to handle notification {}", notification, e);
+ LOG.warn("Failed to handle notification {}", notification, e);
throw e;
}
} else {
try {
persisterAggregator.persistConfig(new CapabilityStrippingConfigSnapshotHolder(notification.getConfigSnapshot(),
notification.getCapabilities()));
- logger.trace("Configuration persisted successfully");
+ LOG.trace("Configuration persisted successfully");
} catch (final IOException e) {
throw new RuntimeException("Unable to persist configuration snapshot", e);
}
@Immutable
public class ConfigPusherImpl implements ConfigPusher {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPusherImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigPusherImpl.class);
private final long maxWaitForCapabilitiesMillis;
private final long conflictingVersionTimeoutMillis;
* it is good idea to perform garbage collection to prune
* any garbage we have accumulated during startup.
*/
- logger.debug("Running post-initialization garbage collection...");
+ LOG.debug("Running post-initialization garbage collection...");
System.gc();
- logger.debug("Post-initialization garbage collection completed.");
- logger.debug("ConfigPusher has pushed configs {}, gc completed", configs);
+ LOG.debug("Post-initialization garbage collection completed.");
+ LOG.debug("ConfigPusher has pushed configs {}, gc completed", configs);
}
catch (NetconfDocumentedException e) {
- logger.error("Error pushing configs {}",configs);
+ LOG.error("Error pushing configs {}",configs);
throw new IllegalStateException(e);
}
}
}
public void pushConfigs(List<? extends ConfigSnapshotHolder> configs) throws InterruptedException {
- logger.debug("Requested to push configs {}", configs);
+ LOG.debug("Requested to push configs {}", configs);
this.queue.put(configs);
}
private LinkedHashMap<? extends ConfigSnapshotHolder, EditAndCommitResponse> internalPushConfigs(List<? extends ConfigSnapshotHolder> configs) throws NetconfDocumentedException {
- logger.debug("Last config snapshots to be pushed to netconf: {}", configs);
+ LOG.debug("Last config snapshots to be pushed to netconf: {}", configs);
LinkedHashMap<ConfigSnapshotHolder, EditAndCommitResponse> result = new LinkedHashMap<>();
// start pushing snapshots:
for (ConfigSnapshotHolder configSnapshotHolder : configs) {
if(configSnapshotHolder != null) {
EditAndCommitResponse editAndCommitResponseWithRetries = pushConfigWithConflictingVersionRetries(configSnapshotHolder);
- logger.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
+ LOG.debug("Config snapshot pushed successfully: {}, result: {}", configSnapshotHolder, result);
result.put(configSnapshotHolder, editAndCommitResponseWithRetries);
}
}
- logger.debug("All configuration snapshots have been pushed successfully.");
+ LOG.debug("All configuration snapshots have been pushed successfully.");
return result;
}
return pushConfig(configSnapshotHolder, operationService);
} catch (ConflictingVersionException e) {
lastException = e;
- logger.debug("Conflicting version detected, will retry after timeout");
+ LOG.debug("Conflicting version detected, will retry after timeout");
sleep();
}
} while (stopwatch.elapsed(TimeUnit.MILLISECONDS) < conflictingVersionTimeoutMillis);
try {
return getOperationService(expectedCapabilities, idForReporting);
} catch (NotEnoughCapabilitiesException e) {
- logger.debug("Not enough capabilities: " + e.toString());
+ LOG.debug("Not enough capabilities: {}", e.toString());
lastException = e;
sleep();
}
return serviceCandidate;
} else {
serviceCandidate.close();
- logger.trace("Netconf server did not provide required capabilities for {} " +
+ LOG.trace("Netconf server did not provide required capabilities for {} ", idForReporting,
"Expected but not found: {}, all expected {}, current {}",
- idForReporting, notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
+ notFoundDiff, expectedCapabilities, serviceCandidate.getCapabilities()
);
throw new NotEnoughCapabilitiesException("Not enough capabilities for " + idForReporting + ". Expected but not found: " + notFoundDiff);
}
} catch (SAXException | IOException e) {
throw new IllegalStateException("Cannot parse " + configSnapshotHolder);
}
- logger.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
+ LOG.trace("Pushing last configuration to netconf: {}", configSnapshotHolder);
Stopwatch stopwatch = new Stopwatch().start();
NetconfMessage editConfigMessage = createEditConfigMessage(xmlToBePersisted);
Document commitResponseMessage = sendRequestGetResponseCheckIsOK(getCommitMessage(), operationService,
"commit", configSnapshotHolder.toString());
- if (logger.isTraceEnabled()) {
+ if (LOG.isTraceEnabled()) {
StringBuilder response = new StringBuilder("editConfig response = {");
response.append(XmlUtil.toString(editResponseMessage));
response.append("}");
response.append("commit response = {");
response.append(XmlUtil.toString(commitResponseMessage));
response.append("}");
- logger.trace("Last configuration loaded successfully");
- logger.trace("Detailed message {}", response);
- logger.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ LOG.trace("Last configuration loaded successfully");
+ LOG.trace("Detailed message {}", response);
+ LOG.trace("Total time spent {} ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
return new EditAndCommitResponse(editResponseMessage, commitResponseMessage);
}
package org.opendaylight.controller.netconf.persist.impl;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.persist.api.PropertiesProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
public class NoOpStorageAdapter implements StorageAdapter, Persister {
- private static final Logger logger = LoggerFactory.getLogger(NoOpStorageAdapter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NoOpStorageAdapter.class);
@Override
public Persister instantiate(PropertiesProvider propertiesProvider) {
- logger.debug("instantiate called with {}", propertiesProvider);
+ LOG.debug("instantiate called with {}", propertiesProvider);
return this;
}
@Override
public void persistConfig(ConfigSnapshotHolder holder) throws IOException {
- logger.debug("persistConfig called with {}", holder);
+ LOG.debug("persistConfig called with {}", holder);
}
@Override
public List<ConfigSnapshotHolder> loadLastConfigs() throws IOException {
- logger.debug("loadLastConfig called");
+ LOG.debug("loadLastConfig called");
return Collections.emptyList();
}
@Override
public void close() {
- logger.debug("close called");
+ LOG.debug("close called");
}
}
*
*/
public final class PersisterAggregator implements Persister {
- private static final Logger logger = LoggerFactory.getLogger(PersisterAggregator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PersisterAggregator.class);
public static class PersisterWithConfiguration {
persisterWithConfigurations.add(PersisterAggregator.loadConfiguration(index, propertiesProvider));
}
}
- logger.debug("Initialized persister with following adapters {}", persisterWithConfigurations);
+ LOG.debug("Initialized persister with following adapters {}", persisterWithConfigurations);
return new PersisterAggregator(persisterWithConfigurations);
}
public void persistConfig(ConfigSnapshotHolder holder) throws IOException {
for (PersisterWithConfiguration persisterWithConfiguration: persisterWithConfigurations){
if (!persisterWithConfiguration.readOnly){
- logger.debug("Calling {}.persistConfig", persisterWithConfiguration.getStorage());
+ LOG.debug("Calling {}.persistConfig", persisterWithConfiguration.getStorage());
persisterWithConfiguration.getStorage().persistConfig(holder);
}
}
throw new RuntimeException("Error while calling loadLastConfig on " + persisterWithConfiguration, e);
}
if (!configs.isEmpty()) {
- logger.debug("Found non empty configs using {}:{}", persisterWithConfiguration, configs);
+ LOG.debug("Found non empty configs using {}:{}", persisterWithConfiguration, configs);
return configs;
}
}
// no storage had an answer
- logger.debug("No non-empty list of configuration snapshots found");
+ LOG.debug("No non-empty list of configuration snapshots found");
return Collections.emptyList();
}
try{
persisterWithConfiguration.storage.close();
}catch(RuntimeException e) {
- logger.error("Error while closing {}", persisterWithConfiguration.storage, e);
+ LOG.error("Error while closing {}", persisterWithConfiguration.storage, e);
if (lastException == null){
lastException = e;
} else {
package org.opendaylight.controller.netconf.persist.impl.osgi;
+import com.google.common.annotations.VisibleForTesting;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
import javax.management.MBeanServer;
-
import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.netconf.mapping.api.NetconfOperationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
public class ConfigPersisterActivator implements BundleActivator {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterActivator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterActivator.class);
private static final MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
public static final String MAX_WAIT_FOR_CAPABILITIES_MILLIS_PROPERTY = "maxWaitForCapabilitiesMillis";
@Override
public void start(final BundleContext context) throws Exception {
- logger.debug("ConfigPersister starting");
+ LOG.debug("ConfigPersister starting");
this.context = context;
autoCloseables = new ArrayList<>();
long maxWaitForCapabilitiesMillis = getMaxWaitForCapabilitiesMillis(propertiesProvider);
List<ConfigSnapshotHolder> configs = persisterAggregator.loadLastConfigs();
long conflictingVersionTimeoutMillis = getConflictingVersionTimeoutMillis(propertiesProvider);
- logger.debug("Following configs will be pushed: {}", configs);
+ LOG.debug("Following configs will be pushed: {}", configs);
InnerCustomizer innerCustomizer = new InnerCustomizer(configs, maxWaitForCapabilitiesMillis,
conflictingVersionTimeoutMillis, persisterAggregator);
@Override
public NetconfOperationProvider addingService(ServiceReference<NetconfOperationProvider> reference) {
- logger.trace("Got OuterCustomizer.addingService {}", reference);
+ LOG.trace("Got OuterCustomizer.addingService {}", reference);
// JMX was registered, track config-netconf-connector
Filter filter;
try {
@Override
public NetconfOperationServiceFactory addingService(ServiceReference<NetconfOperationServiceFactory> reference) {
- logger.trace("Got InnerCustomizer.addingService {}", reference);
+ LOG.trace("Got InnerCustomizer.addingService {}", reference);
NetconfOperationServiceFactory service = reference.getBundle().getBundleContext().getService(reference);
- logger.debug("Creating new job queue");
+ LOG.debug("Creating new job queue");
final ConfigPusherImpl configPusher = new ConfigPusherImpl(service, maxWaitForCapabilitiesMillis, conflictingVersionTimeoutMillis);
- logger.debug("Configuration Persister got {}", service);
- logger.debug("Context was {}", context);
- logger.debug("Registration was {}", registration);
+ LOG.debug("Configuration Persister got {}", service);
+ LOG.debug("Context was {}", context);
+ LOG.debug("Registration was {}", registration);
final Thread pushingThread = new Thread(new Runnable() {
@Override
registration = context.registerService(ConfigPusher.class.getName(), configPusher, null);
configPusher.process(autoCloseables, platformMBeanServer, persisterAggregator);
} else {
- logger.warn("Unable to process configs as BundleContext is null");
+ LOG.warn("Unable to process configs as BundleContext is null");
}
} catch (InterruptedException e) {
- logger.info("ConfigPusher thread stopped",e);
+ LOG.info("ConfigPusher thread stopped",e);
}
- logger.info("Configuration Persister initialization completed.");
+ LOG.info("Configuration Persister initialization completed.");
}
}, "config-pusher");
synchronized (autoCloseables) {
public class PropertiesProviderBaseImpl implements PropertiesProvider {
- private static final Logger logger = LoggerFactory.getLogger(PropertiesProviderBaseImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PropertiesProviderBaseImpl.class);
private final BundleContext bundleContext;
public PropertiesProviderBaseImpl(BundleContext bundleContext) {
}
public String getPropertyWithoutPrefix(String fullKey){
- logger.trace("Full key {}", fullKey);
+ LOG.trace("Full key {}", fullKey);
return bundleContext.getProperty(fullKey);
}
*/
package org.opendaylight.controller.netconf.persist.impl;
+import static org.junit.Assert.assertEquals;
+
import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.w3c.dom.Element;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
public class CapabilityStrippingConfigSnapshotHolderTest {
@Test
import static org.mockito.Mockito.verify;
import javax.management.MBeanServerConnection;
-
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
package org.opendaylight.controller.netconf.persist.impl;
+import com.google.common.collect.Lists;
import java.util.Collections;
-
import javax.management.Notification;
-
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.jmx.NetconfJMXNotification;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import com.google.common.collect.Lists;
-
public class ConfigPersisterNotificationListenerTest {
@Mock
*/
package org.opendaylight.controller.netconf.persist.impl;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.persist.api.PropertiesProvider;
import org.opendaylight.controller.config.persist.api.StorageAdapter;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
public class DummyAdapter implements StorageAdapter, Persister {
static int persist = 0;
package org.opendaylight.controller.netconf.persist.impl;
-import com.google.common.collect.Lists;
-
-import org.junit.Test;
-import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
-import org.opendaylight.controller.config.persist.api.Persister;
-import org.opendaylight.controller.config.persist.storage.file.xml.XmlFileStorageAdapter;
-import org.opendaylight.controller.netconf.persist.impl.osgi.ConfigPersisterActivator;
-import org.opendaylight.controller.netconf.persist.impl.osgi.PropertiesProviderBaseImpl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.netconf.persist.impl.PersisterAggregator.PersisterWithConfiguration;
-import static org.opendaylight.controller.netconf.persist.impl.PersisterAggregatorTest.TestingPropertiesProvider.loadFile;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.Test;
+import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
+import org.opendaylight.controller.config.persist.api.Persister;
+import org.opendaylight.controller.config.persist.storage.file.xml.XmlFileStorageAdapter;
+import org.opendaylight.controller.netconf.persist.impl.osgi.ConfigPersisterActivator;
+import org.opendaylight.controller.netconf.persist.impl.osgi.PropertiesProviderBaseImpl;
public class PersisterAggregatorTest {
@Test
public void testDummyAdapter() throws Exception {
- PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(loadFile("test1.properties"));
+ PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(TestingPropertiesProvider.loadFile("test1.properties"));
List<PersisterWithConfiguration> persisters = persisterAggregator.getPersisterWithConfigurations();
assertEquals(1, persisters.size());
PersisterWithConfiguration persister = persisters.get(0);
@Test
public void testLoadFromPropertyFile() throws Exception {
- PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(loadFile("test2.properties"));
+ PersisterAggregator persisterAggregator = PersisterAggregator.createFromProperties(TestingPropertiesProvider.loadFile("test2.properties"));
List<PersisterWithConfiguration> persisters = persisterAggregator.getPersisterWithConfigurations();
assertEquals(1, persisters.size());
PersisterWithConfiguration persister = persisters.get(0);
@Test
public void testFileStorageNumberOfBackups() throws Exception {
try {
- PersisterAggregator.createFromProperties(loadFile("test3.properties"));
+ PersisterAggregator.createFromProperties(TestingPropertiesProvider.loadFile("test3.properties"));
fail();
} catch (RuntimeException e) {
assertThat(
*/
package org.opendaylight.controller.netconf.persist.impl.osgi;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
import com.google.common.collect.Sets;
+import java.io.IOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;
-import java.io.IOException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-
public class ConfigPersisterTest {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigPersisterTest.class);
private MockedBundleContext ctx;
private ConfigPersisterActivator configPersisterActivator;
doReturn(getConflictingService()).when(ctx.serviceFactory).createService(anyString());
Thread.sleep(500);
// working service:
- logger.info("Switching to working service **");
+ LOG.info("Switching to working service **");
doReturn(getWorkingService(getOKDocument())).when(ctx.serviceFactory).createService(anyString());
Thread.sleep(1000);
assertCannotRegisterAsJMXListener_pushWasSuccessful();
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
-
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.config.persist.api.ConfigPusher;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
final class MockedBundleContext {
@Mock
private BundleContext context;
final class TestingExceptionHandler implements Thread.UncaughtExceptionHandler {
- private static final Logger logger = LoggerFactory.getLogger(ConfigPersisterTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestingExceptionHandler.class);
private Throwable t;
@Override
public void uncaughtException(Thread t, Throwable e) {
- logger.debug("Uncaught exception in thread {}", t, e);
+ LOG.debug("Uncaught exception in thread {}", t, e);
this.t = e;
}
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
-
import org.opendaylight.controller.config.api.ConflictingVersionException;
import org.opendaylight.controller.config.api.ValidationException;
import org.slf4j.Logger;
package org.opendaylight.controller.netconf.api;
import io.netty.channel.ChannelFuture;
-
import org.opendaylight.protocol.framework.ProtocolSession;
public interface NetconfSession extends ProtocolSession<NetconfMessage> {
package org.opendaylight.controller.netconf.api.jmx;
-import org.w3c.dom.Element;
-
-import javax.management.NotificationBroadcasterSupport;
import java.util.Set;
+import javax.management.NotificationBroadcasterSupport;
+import org.w3c.dom.Element;
public class CommitJMXNotification extends NetconfJMXNotification {
package org.opendaylight.controller.netconf.api.jmx;
-import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
-
import javax.management.ObjectName;
+import org.opendaylight.controller.config.api.jmx.ObjectNameUtil;
public interface DefaultCommitOperationMXBean {
package org.opendaylight.controller.netconf.api.jmx;
import java.util.Set;
-
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
-
import org.w3c.dom.Element;
public abstract class NetconfJMXNotification extends Notification {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.Iterator;
-
import javax.xml.namespace.NamespaceContext;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfDocumentedException.ErrorSeverity;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
-import com.google.common.collect.ImmutableMap;
-
/**
* Unit tests for NetconfDocumentedException.
return false;
}
- final String unprefixedFilterContent = filter.getTextContent().substring(prefix.length());
- final String unprefixedSrcCOntnet = src.getTextContent().substring(prefix.length());
+ final String unprefixedFilterContent = filter.getTextContent().substring(prefixToNamespaceOfFilter.getKey().length() + 1);
+ final String unprefixedSrcContnet = src.getTextContent().substring(prefixToNamespaceOfSrc.getKey().length() + 1);
// Finally compare unprefixed content
- return unprefixedFilterContent.equals(unprefixedSrcCOntnet);
+ return unprefixedFilterContent.equals(unprefixedSrcContnet);
}
enum MatchingResult {
@Parameters
public static Collection<Object[]> data() {
List<Object[]> result = new ArrayList<>();
- for (int i = 0; i <= 9; i++) {
+ for (int i = 0; i <= 10; i++) {
result.add(new Object[]{i});
}
return result;
@Mock
private Filter filter;
@Mock
- private ServiceReference reference;
+ private ServiceReference<?> reference;
@Mock
- private ServiceRegistration registration;
+ private ServiceRegistration<?> registration;
@Before
public void setUp() throws Exception {
doReturn(filter).when(bundle).createFilter(anyString());
doNothing().when(bundle).addServiceListener(any(ServiceListener.class), anyString());
- ServiceReference[] refs = new ServiceReference[0];
+ ServiceReference<?>[] refs = new ServiceReference[0];
doReturn(refs).when(bundle).getServiceReferences(anyString(), anyString());
doReturn(Arrays.asList(refs)).when(bundle).getServiceReferences(any(Class.class), anyString());
doReturn("").when(bundle).getProperty(anyString());
@Mock
private NetconfOperationServiceFactory factory;
@Mock
- private ServiceReference reference;
+ private ServiceReference<NetconfOperationServiceFactory> reference;
private NetconfOperationServiceFactoryTracker tracker;
--- /dev/null
+<!--
+ ~ Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="m-10">
+ <data>
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">prefix:sal-netconf-connector</type>
+ <name>controller-config</name>
+ <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1830</port>
+ <connection-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">20000</connection-timeout-millis>
+ <between-attempts-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">2000</between-attempts-timeout-millis>
+ <sleep-factor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1.5</sleep-factor>
+ <password xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</password>
+ <dom-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-registry>
+ <client-dispatcher xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf">prefix:netconf-client-dispatcher</type>
+ <name>global-netconf-dispatcher</name>
+ </client-dispatcher>
+ <username xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</username>
+ <address xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">127.0.0.1</address>
+ <processing-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+ <name>global-netconf-processing-executor</name>
+ </processing-executor>
+ <tcp-only xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">false</tcp-only>
+ <binding-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+ <name>binding-osgi-broker</name>
+ </binding-registry>
+ <max-connection-attempts xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">0</max-connection-attempts>
+ <event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
+ <name>global-event-executor</name>
+ </event-executor>
+ </module>
+ </modules>
+ </data>
+</rpc-reply>
\ No newline at end of file
--- /dev/null
+<!--
+ ~ Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ ~
+ ~ This program and the accompanying materials are made available under the
+ ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+ -->
+
+<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="m-10">
+ <data>
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">prefix:sal-netconf-connector</type>
+ <name>controller-config</name>
+ <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1830</port>
+ <connection-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">20000</connection-timeout-millis>
+ <between-attempts-timeout-millis xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">2000</between-attempts-timeout-millis>
+ <sleep-factor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">1.5</sleep-factor>
+ <password xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</password>
+ <dom-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-registry>
+ <client-dispatcher xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf">prefix:netconf-client-dispatcher</type>
+ <name>global-netconf-dispatcher</name>
+ </client-dispatcher>
+ <username xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">admin</username>
+ <address xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">127.0.0.1</address>
+ <processing-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadpool</type>
+ <name>global-netconf-processing-executor</name>
+ </processing-executor>
+ <tcp-only xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">false</tcp-only>
+ <binding-registry xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">prefix:binding-broker-osgi-registry</type>
+ <name>binding-osgi-broker</name>
+ </binding-registry>
+ <max-connection-attempts xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">0</max-connection-attempts>
+ <event-executor xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:netty">prefix:netty-event-executor</type>
+ <name>global-event-executor</name>
+ </event-executor>
+ </module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:shutdown:impl">prefix:shutdown</type>
+ <name>shutdown</name>
+ <secret xmlns="urn:opendaylight:params:xml:ns:yang:controller:shutdown:impl"/>
+ </module>
+ </modules>
+ </data>
+</rpc-reply>
\ No newline at end of file
--- /dev/null
+<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="m-9">
+ <get-config>
+ <source>
+ <running/>
+ </source>
+ <filter xmlns:a="urn:ietf:params:xml:ns:netconf:base:1.0" a:type="subtree">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:x="urn:opendaylight:params:xml:ns:yang:controller:md:sal:connector:netconf">x:sal-netconf-connector</type>
+ <name>controller-config</name>
+ </module>
+ </modules>
+ </filter>
+ </get-config>
+</rpc>