Merge "Created Sample Feature Test Class for Base Feature Repository"
authorMadhu Venugopal <mavenugo@gmail.com>
Sat, 2 Aug 2014 08:34:32 +0000 (08:34 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 2 Aug 2014 08:34:32 +0000 (08:34 +0000)
213 files changed:
.gitignore
features/config-netty/src/main/resources/features.xml
features/nsf/src/main/resources/features.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/commons/protocol-framework/pom.xml
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/AbstractDispatcher.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ProtocolSessionPromise.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/SessionListener.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/SimpleDispatcher.java
opendaylight/config/config-manager/src/main/java/org/opendaylight/controller/config/manager/impl/osgi/mapping/CodecRegistryProvider.java
opendaylight/config/config-netty-config/pom.xml [new file with mode: 0644]
opendaylight/config/config-netty-config/src/main/resources/initial/00-netty.xml [moved from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/00-netty.xml with 100% similarity]
opendaylight/config/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/hosttracker/api/src/main/java/org/opendaylight/controller/hosttracker/IHostTrackerShell.java [new file with mode: 0644]
opendaylight/hosttracker/implementation/src/main/java/org/opendaylight/controller/hosttracker/internal/Activator.java
opendaylight/hosttracker/implementation/src/main/java/org/opendaylight/controller/hosttracker/internal/HostTracker.java
opendaylight/hosttracker/shell/pom.xml [new file with mode: 0644]
opendaylight/hosttracker/shell/src/main/java/org/opendaylight/controller/hosttracker/shell/DumpFailedARPReqList.java [new file with mode: 0644]
opendaylight/hosttracker/shell/src/main/java/org/opendaylight/controller/hosttracker/shell/DumpPendingARPReqList.java [new file with mode: 0644]
opendaylight/hosttracker/shell/src/main/resources/OSGI-INF/blueprint/blueprint.xml [new file with mode: 0644]
opendaylight/hosttracker/shell/src/test/java/org/opendaylight/controller/hosttracker/shell/HostTrackerShellTest.java [new file with mode: 0644]
opendaylight/md-sal/md-sal-config/pom.xml [new file with mode: 0644]
opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml [moved from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml with 99% similarity]
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft/README-FIRST [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/run.sh [new file with mode: 0755]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/Main.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/TestDriver.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValueSaved.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintRole.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/PrintState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/protobuff/messages/KeyValueMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplySnapshot.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplySnapshot.java with 77% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ApplyState.java with 87% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/CommitEntry.java with 71% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ElectionTimeout.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/ElectionTimeout.java with 67% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/PersistEntry.java with 72% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/Replicate.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/Replicate.java with 87% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SaveSnapshot.java with 74% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendHeartBeat.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendHeartBeat.java with 75% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java [moved from opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/internal/messages/SendInstallSnapshot.java with 67% similarity]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AbstractRaftRPC.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RaftRPC.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/AppendEntriesMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/VotingMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/AppendEntriesMessages.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/KeyValueMessages.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/VotingMessages.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/resources/application.conf
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/protobuff/messages/MockPayloadMessages.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/resources/MockPayload.proto [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/resources/application.conf
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/RuntimeMappingModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/MountPointManagerImpl.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/impl/test/Bug1333DataChangeListenerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/MockSchemaService.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/PrimaryNotFoundException.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/TimeoutException.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SampleModelsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/modules.conf
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/model/SchemaService.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomBrokerImplModule.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/SchemaServiceImplSingletonModule.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/compat/BackwardsCompatibleDataBroker.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BackwardsCompatibleMountPoint.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/GlobalBundleScanningSchemaServiceImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/SchemaServiceProxy.java
opendaylight/md-sal/sal-dom-xsql/src/main/java/org/opendaylight/yang/gen/v1/http/netconfcentral/org/ns/xsql/rev140626/XSQLModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicator.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilities.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceDataBroker.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceReadOnlyTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTx.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/util/NetconfMessageTransformUtil.java
opendaylight/md-sal/sal-netconf-connector/src/main/yang/odl-sal-netconf-connector-cfg.yang
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfSessionCapabilitiesTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/sal/tx/NetconfDeviceWriteOnlyTxTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/pom.xml
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/mdsal/CompositeModificationPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/common/SimpleNormalizedNodeMessage.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/datachange/notification/DataChangeListenerMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/persistent/PersistentMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/shard/ShardManagerMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionChainMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/CompositeModificationPayload.proto [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/Persistent.proto
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModule.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/config/yang/config/remote_rpc_connector/RemoteRPCBrokerModuleFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorConstants.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcImplementation.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderFactory.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RouteIdentifierImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RoutedRpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcListener.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/XmlUtils.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRoutedRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/AddRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ErrorResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRoutedRpcReply.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/GetRpcReply.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRoutedRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRoutedRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RemoveRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RoutingTableData.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/UpdateSchemaContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java [moved from opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/ActorUtil.java with 97% similarity]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/InstanceIdentifierForXmlCodec.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/RandomPrefix.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlDocumentUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlStreamUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/XmlUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcBrokerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RoutingTableTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/XmlUtilsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/org/opendaylight/controller/remote/rpc/utils/rpcTest.yang [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProviderImpl.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java
opendaylight/md-sal/samples/pom.xml
opendaylight/md-sal/samples/toaster-config/pom.xml [new file with mode: 0644]
opendaylight/md-sal/samples/toaster-config/src/main/resources/initial/03-toaster-sample.xml [moved from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/03-toaster-sample.xml with 99% similarity]
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/CliOutputFromNormalizedNodeSerializerFactory.java
opendaylight/netconf/netconf-config/pom.xml [new file with mode: 0644]
opendaylight/netconf/netconf-config/src/main/resources/initial/01-netconf.xml [moved from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml with 100% similarity]
opendaylight/netconf/netconf-connector-config/pom.xml [new file with mode: 0644]
opendaylight/netconf/netconf-connector-config/src/main/resources/initial/99-netconf-connector.xml [moved from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/99-netconf-connector.xml with 100% similarity]
opendaylight/netconf/pom.xml
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronSubnet.java
opendaylight/northbound/commons/pom.xml
opendaylight/northbound/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/northbound/NeutronRoutersNorthbound.java
opendaylight/topologymanager/implementation/pom.xml
opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/ITopologyManagerShell.java [new file with mode: 0644]
opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/Activator.java
opendaylight/topologymanager/implementation/src/main/java/org/opendaylight/controller/topologymanager/internal/TopologyManagerImpl.java
opendaylight/topologymanager/shell/pom.xml [new file with mode: 0644]
opendaylight/topologymanager/shell/src/main/java/org/opendaylight/controller/topologymanager/shell/AddUserLink.java [new file with mode: 0644]
opendaylight/topologymanager/shell/src/main/java/org/opendaylight/controller/topologymanager/shell/DeleteUserLink.java [new file with mode: 0644]
opendaylight/topologymanager/shell/src/main/java/org/opendaylight/controller/topologymanager/shell/PrintNodeEdges.java [new file with mode: 0644]
opendaylight/topologymanager/shell/src/main/java/org/opendaylight/controller/topologymanager/shell/PrintUserLink.java [new file with mode: 0644]
opendaylight/topologymanager/shell/src/main/resources/OSGI-INF/blueprint/blueprint.xml [new file with mode: 0644]

index f8cf74f..9144cda 100644 (file)
@@ -27,3 +27,5 @@ out/
 maven-eclipse.xml
 .DS_STORE
 .metadata
+opendaylight/md-sal/sal-distributed-datastore/journal
+
index 3121ca0..f1b2d1f 100644 (file)
@@ -12,5 +12,6 @@
     <bundle>mvn:org.opendaylight.controller/threadpool-config-api/${project.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/threadpool-config-impl/${project.version}</bundle>
     <feature version='${project.version}'>odl-config-startup</feature>
+    <configfile finalname="configuration/initial/00-netty.xml">mvn:org.opendaylight.controller/config-netty-config/${config.version}/xml/config</configfile>
   </feature>
 </features>
\ No newline at end of file
index fb61f33..130d72e 100644 (file)
         <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}</bundle>
 
         <bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
 
         <bundle>mvn:org.opendaylight.controller/networkconfig.neutron/${networkconfig.neutron.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/networkconfig.neutron.implementation/${networkconfig.neutron.implementation.version}</bundle>
 
         <bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
 
         <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting</bundle>
 
@@ -70,4 +72,4 @@
         <bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
     </feature>
-</features>
\ No newline at end of file
+</features>
index ce014fc..c4ab2ed 100644 (file)
@@ -95,6 +95,7 @@
     <hosttracker.api.version>0.5.2-SNAPSHOT</hosttracker.api.version>
     <hosttracker.implementation.version>0.5.2-SNAPSHOT</hosttracker.implementation.version>
     <hosttracker.northbound.version>0.4.2-SNAPSHOT</hosttracker.northbound.version>
+    <hosttracker.shell.version>1.0.0-SNAPSHOT</hosttracker.shell.version>
     <hosttracker_new.api.version>0.4.2-SNAPSHOT</hosttracker_new.api.version>
     <hosttracker_new.implementation.version>0.4.2-SNAPSHOT</hosttracker_new.implementation.version>
     <httpservice-bridge.northbound.version>0.0.2-SNAPSHOT</httpservice-bridge.northbound.version>
     <topology.northbound.version>0.4.2-SNAPSHOT</topology.northbound.version>
     <topology.web.version>0.4.2-SNAPSHOT</topology.web.version>
     <topologymanager.version>0.4.2-SNAPSHOT</topologymanager.version>
+    <topologymanager.shell.version>1.0.0-SNAPSHOT</topologymanager.shell.version>
     <troubleshoot.web.version>0.4.2-SNAPSHOT</troubleshoot.web.version>
     <typesafe.config.version>1.2.0</typesafe.config.version>
     <uncommons.maths.version>1.2.2</uncommons.maths.version>
         <artifactId>sal-rest-connector-config</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>config-netty-config</artifactId>
+        <version>${config.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>md-sal-config</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>netconf-config</artifactId>
+        <version>${netconf.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>netconf-connector-config</artifactId>
+        <version>${netconf.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>sal-rest-docgen</artifactId>
         <artifactId>sample-toaster-provider</artifactId>
         <version>${mdsal.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.opendaylight.controller.samples</groupId>
+        <artifactId>toaster-config</artifactId>
+        <version>${mdsal.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller.thirdparty</groupId>
         <artifactId>com.sun.jersey.jersey-servlet</artifactId>
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <sourceDirectory>${project.basedir}</sourceDirectory>
           <includes>**\/*.java,**\/*.xml,**\/*.ini,**\/*.sh,**\/*.bat</includes>
-          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/xtend-gen\/</excludes>
+          <excludes>**\/target\/,**\/bin\/,**\/target-ide\/,**\/${jmxGeneratorPath}\/,**\/${salGeneratorPath}\/,**\/xtend-gen\/,**\/protobuff\/</excludes>
         </configuration>
         <dependencies>
           <dependency>
index f706987..774bc7c 100644 (file)
       <artifactId>netty-event-executor-config</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index a62bd7d..a05d02c 100644 (file)
@@ -7,12 +7,19 @@
  */
 package org.opendaylight.protocol.framework;
 
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -28,13 +35,6 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
 
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Dispatcher class for creating servers and clients. The idea is to first create servers and clients and the run the
  * start method that will handle sockets in different thread.
@@ -155,7 +155,7 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
      */
     protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final PipelineInitializer<S> initializer) {
         final Bootstrap b = new Bootstrap();
-        final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<S>(executor, address, strategy, b);
+        final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<>(executor, address, strategy, b);
         b.option(ChannelOption.SO_KEEPALIVE, true).handler(
                 new ChannelInitializer<SocketChannel>() {
                     @Override
@@ -165,18 +165,36 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
                 });
 
         customizeBootstrap(b);
+        setWorkerGroup(b);
+        setChannelFactory(b);
+
+        p.connect();
+        LOG.debug("Client created.");
+        return p;
+    }
 
+    private void setWorkerGroup(final Bootstrap b) {
         if (b.group() == null) {
             b.group(workerGroup);
         }
+    }
 
-        // There is no way to detect if this was already set by
-        // customizeBootstrap()
-        try {
-            b.channel(NioSocketChannel.class);
-        } catch (IllegalStateException e) {
-            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
-        }
+    /**
+     * Create a client but use a pre-configured bootstrap.
+     * This method however replaces the ChannelInitializer in the bootstrap. All other configuration is preserved.
+     *
+     * @param address remote address
+     */
+    protected Future<S> createClient(final InetSocketAddress address, final ReconnectStrategy strategy, final Bootstrap bootstrap, final PipelineInitializer<S> initializer) {
+        final ProtocolSessionPromise<S> p = new ProtocolSessionPromise<>(executor, address, strategy, bootstrap);
+
+        bootstrap.handler(
+                new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(final SocketChannel ch) {
+                        initializer.initializeChannel(ch, p);
+                    }
+                });
 
         p.connect();
         LOG.debug("Client created.");
@@ -195,6 +213,9 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
     }
 
     /**
+     *
+     * @deprecated use {@link org.opendaylight.protocol.framework.AbstractDispatcher#createReconnectingClient(java.net.InetSocketAddress, ReconnectStrategyFactory, org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer)} with only one reconnectStrategyFactory instead.
+     *
      * Creates a client.
      *
      * @param address remote address
@@ -204,15 +225,47 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
      * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
      *         success if it indicates no further attempts should be made and failure if it reports an error
      */
+    @Deprecated
     protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
             final ReconnectStrategy reestablishStrategy, final PipelineInitializer<S> initializer) {
+        return createReconnectingClient(address, connectStrategyFactory, initializer);
+    }
 
-        final ReconnectPromise<S, L> p = new ReconnectPromise<S, L>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, reestablishStrategy, initializer);
-        p.connect();
+    /**
+     * Creates a reconnecting client.
+     *
+     * @param address remote address
+     * @param connectStrategyFactory Factory for creating reconnection strategy for every reconnect attempt
+     *
+     * @return Future representing the reconnection task. It will report completion based on reestablishStrategy, e.g.
+     *         success if it indicates no further attempts should be made and failure if it reports an error
+     */
+    protected Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory connectStrategyFactory,
+            final PipelineInitializer<S> initializer) {
+        final Bootstrap b = new Bootstrap();
+
+        final ReconnectPromise<S, L> p = new ReconnectPromise<>(GlobalEventExecutor.INSTANCE, this, address, connectStrategyFactory, b, initializer);
+
+        b.option(ChannelOption.SO_KEEPALIVE, true);
 
+        customizeBootstrap(b);
+        setWorkerGroup(b);
+        setChannelFactory(b);
+
+        p.connect();
         return p;
     }
 
+    private void setChannelFactory(final Bootstrap b) {
+        // There is no way to detect if this was already set by
+        // customizeBootstrap()
+        try {
+            b.channel(NioSocketChannel.class);
+        } catch (final IllegalStateException e) {
+            LOG.trace("Not overriding channelFactory on bootstrap {}", b, e);
+        }
+    }
+
     /**
      * @deprecated Should only be used with {@link AbstractDispatcher#AbstractDispatcher()}
      */
@@ -225,5 +278,4 @@ public abstract class AbstractDispatcher<S extends ProtocolSession<?>, L extends
             this.bossGroup.shutdownGracefully();
         }
     }
-
 }
index a78274c..a38db61 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.protocol.framework;
 
+import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -16,17 +17,12 @@ import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
-
 import java.net.InetSocketAddress;
-
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 @ThreadSafe
 final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends DefaultPromise<S> {
     private static final Logger LOG = LoggerFactory.getLogger(ProtocolSessionPromise.class);
@@ -54,72 +50,12 @@ final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends Default
             LOG.debug("Promise {} attempting connect for {}ms", lock, timeout);
 
             this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
-            this.pending = this.b.connect(this.address).addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(final ChannelFuture cf) throws Exception {
-                    synchronized (lock) {
-
-                        LOG.debug("Promise {} connection resolved", lock);
-
-                        // Triggered when a connection attempt is resolved.
-                        Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
-
-                        /*
-                         * The promise we gave out could have been cancelled,
-                         * which cascades to the connect getting cancelled,
-                         * but there is a slight race window, where the connect
-                         * is already resolved, but the listener has not yet
-                         * been notified -- cancellation at that point won't
-                         * stop the notification arriving, so we have to close
-                         * the race here.
-                         */
-                        if (isCancelled()) {
-                            if (cf.isSuccess()) {
-                                LOG.debug("Closing channel for cancelled promise {}", lock);
-                                cf.channel().close();
-                            }
-                            return;
-                        }
-
-                        if (!cf.isSuccess()) {
-                            LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
-
-                            final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
-                            rf.addListener(new FutureListener<Void>() {
-                                @Override
-                                public void operationComplete(final Future<Void> sf) {
-                                    synchronized (lock) {
-                                        // Triggered when a connection attempt is to be made.
-                                        Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
-
-                                        /*
-                                         * The promise we gave out could have been cancelled,
-                                         * which cascades to the reconnect attempt getting
-                                         * cancelled, but there is a slight race window, where
-                                         * the reconnect attempt is already enqueued, but the
-                                         * listener has not yet been notified -- if cancellation
-                                         * happens at that point, we need to catch it here.
-                                         */
-                                        if (!isCancelled()) {
-                                            if (sf.isSuccess()) {
-                                                connect();
-                                            } else {
-                                                setFailure(sf.cause());
-                                            }
-                                        }
-                                    }
-                                }
-                            });
-
-                            ProtocolSessionPromise.this.pending = rf;
-                        } else {
-                            LOG.debug("Promise {} connection successful", lock);
-                        }
-                    }
-                }
-            });
+            final ChannelFuture connectFuture = this.b.connect(this.address);
+            // Add listener that attempts reconnect by invoking this method again.
+            connectFuture.addListener(new BootstrapConnectListener(lock));
+            this.pending = connectFuture;
         } catch (final Exception e) {
-            LOG.info("Failed to connect to {}", e);
+            LOG.info("Failed to connect to {}", address, e);
             setFailure(e);
         }
     }
@@ -140,4 +76,79 @@ final class ProtocolSessionPromise<S extends ProtocolSession<?>> extends Default
         this.strategy.reconnectSuccessful();
         return super.setSuccess(result);
     }
+
+    private class BootstrapConnectListener implements ChannelFutureListener {
+        private final Object lock;
+
+        public BootstrapConnectListener(final Object lock) {
+            this.lock = lock;
+        }
+
+        @Override
+        public void operationComplete(final ChannelFuture cf) throws Exception {
+            synchronized (lock) {
+
+                LOG.debug("Promise {} connection resolved", lock);
+
+                // Triggered when a connection attempt is resolved.
+                Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(cf));
+
+                /*
+                 * The promise we gave out could have been cancelled,
+                 * which cascades to the connect getting cancelled,
+                 * but there is a slight race window, where the connect
+                 * is already resolved, but the listener has not yet
+                 * been notified -- cancellation at that point won't
+                 * stop the notification arriving, so we have to close
+                 * the race here.
+                 */
+                if (isCancelled()) {
+                    if (cf.isSuccess()) {
+                        LOG.debug("Closing channel for cancelled promise {}", lock);
+                        cf.channel().close();
+                    }
+                    return;
+                }
+
+                if(cf.isSuccess()) {
+                    LOG.debug("Promise {} connection successful", lock);
+                    return;
+                }
+
+                LOG.debug("Attempt to connect to {} failed", ProtocolSessionPromise.this.address, cf.cause());
+
+                final Future<Void> rf = ProtocolSessionPromise.this.strategy.scheduleReconnect(cf.cause());
+                rf.addListener(new ReconnectingStrategyListener());
+                ProtocolSessionPromise.this.pending = rf;
+            }
+        }
+
+        private class ReconnectingStrategyListener implements FutureListener<Void> {
+            @Override
+            public void operationComplete(final Future<Void> sf) {
+                synchronized (lock) {
+                    // Triggered when a connection attempt is to be made.
+                    Preconditions.checkState(ProtocolSessionPromise.this.pending.equals(sf));
+
+                    /*
+                     * The promise we gave out could have been cancelled,
+                     * which cascades to the reconnect attempt getting
+                     * cancelled, but there is a slight race window, where
+                     * the reconnect attempt is already enqueued, but the
+                     * listener has not yet been notified -- if cancellation
+                     * happens at that point, we need to catch it here.
+                     */
+                    if (!isCancelled()) {
+                        if (sf.isSuccess()) {
+                            connect();
+                        } else {
+                            setFailure(sf.cause());
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
 }
index 1fa6a81..fe1012f 100644 (file)
  */
 package org.opendaylight.protocol.framework;
 
-import io.netty.channel.ChannelFuture;
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
-
-import java.io.Closeable;
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.opendaylight.protocol.framework.AbstractDispatcher.PipelineInitializer;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
+
     private final AbstractDispatcher<S, L> dispatcher;
     private final InetSocketAddress address;
     private final ReconnectStrategyFactory strategyFactory;
-    private final ReconnectStrategy strategy;
-    private final PipelineInitializer<S> initializer;
+    private final Bootstrap b;
+    private final AbstractDispatcher.PipelineInitializer<S> initializer;
     private Future<?> pending;
 
-    private final AtomicBoolean negotiationFinished = new AtomicBoolean(false);
-
     public ReconnectPromise(final EventExecutor executor, final AbstractDispatcher<S, L> dispatcher, final InetSocketAddress address,
-            final ReconnectStrategyFactory connectStrategyFactory, final ReconnectStrategy reestablishStrategy,
-            final PipelineInitializer<S> initializer) {
+                            final ReconnectStrategyFactory connectStrategyFactory, final Bootstrap b, final AbstractDispatcher.PipelineInitializer<S> initializer) {
         super(executor);
+        this.b = b;
+        this.initializer = Preconditions.checkNotNull(initializer);
         this.dispatcher = Preconditions.checkNotNull(dispatcher);
         this.address = Preconditions.checkNotNull(address);
         this.strategyFactory = Preconditions.checkNotNull(connectStrategyFactory);
-        this.strategy = Preconditions.checkNotNull(reestablishStrategy);
-        this.initializer = Preconditions.checkNotNull(initializer);
     }
 
-    // FIXME: BUG-190: refactor
-
     synchronized void connect() {
-        negotiationFinished.set(false);
-
         final ReconnectStrategy cs = this.strategyFactory.createReconnectStrategy();
-        final ReconnectStrategy rs = new ReconnectStrategy() {
-            @Override
-            public Future<Void> scheduleReconnect(final Throwable cause) {
-                return cs.scheduleReconnect(cause);
-            }
 
-            @Override
-            public void reconnectSuccessful() {
-                cs.reconnectSuccessful();
-            }
-
-            @Override
-            public int getConnectTimeout() throws Exception {
-                final int cst = cs.getConnectTimeout();
-                final int rst = ReconnectPromise.this.strategy.getConnectTimeout();
-
-                if (cst == 0) {
-                    return rst;
-                }
-                if (rst == 0) {
-                    return cst;
-                }
-                return Math.min(cst, rst);
-            }
-        };
-
-        final Future<S> cf = this.dispatcher.createClient(this.address, rs, new PipelineInitializer<S>() {
+        // Set up a client with pre-configured bootstrap, but add a closed channel handler into the pipeline to support reconnect attempts
+        pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                addChannelClosedListener(channel.closeFuture());
                 initializer.initializeChannel(channel, promise);
+
+                // add closed channel handler
+                channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
             }
         });
+    }
 
-        final Object lock = this;
-        this.pending = cf;
+    /**
+     *
+     * @return true if initial connection was established successfully, false if initial connection failed due to e.g. Connection refused, Negotiation failed
+     */
+    private boolean isInitialConnectFinished() {
+        Preconditions.checkNotNull(pending);
+        return pending.isDone() && pending.isSuccess();
+    }
 
-        cf.addListener(new FutureListener<S>() {
+    @Override
+    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+        if (super.cancel(mayInterruptIfRunning)) {
+            Preconditions.checkNotNull(pending);
+            this.pending.cancel(mayInterruptIfRunning);
+            return true;
+        }
 
-            @Override
-            public void operationComplete(final Future<S> future) {
-                synchronized (lock) {
-                    if (!future.isSuccess()) {
-                        final Future<Void> rf = ReconnectPromise.this.strategy.scheduleReconnect(cf.cause());
-
-                        if(rf == null) {
-                            // This should reflect: no more reconnecting strategies, enough
-                            // Currently all reconnect strategies fail with exception, should return null
-                            return;
-                        }
-
-                        ReconnectPromise.this.pending = rf;
-
-                        rf.addListener(new FutureListener<Void>() {
-                            @Override
-                            public void operationComplete(final Future<Void> sf) {
-                                synchronized (lock) {
-                                    /*
-                                     * The promise we gave out could have been cancelled,
-                                     * which cascades to the reconnect attempt getting
-                                     * cancelled, but there is a slight race window, where
-                                     * the reconnect attempt is already enqueued, but the
-                                     * listener has not yet been notified -- if cancellation
-                                     * happens at that point, we need to catch it here.
-                                     */
-                                    if (!isCancelled()) {
-                                        if (sf.isSuccess()) {
-                                            connect();
-                                        } else {
-                                            setFailure(sf.cause());
-                                        }
-                                    }
-                                }
-                            }
-                        });
-                    } else {
-                        /*
-                         *  FIXME: BUG-190: we have a slight race window with cancellation
-                         *         here. Analyze and define its semantics.
-                         */
-                        ReconnectPromise.this.strategy.reconnectSuccessful();
-                        negotiationFinished.set(true);
-                    }
-                }
-            }
-        });
+        return false;
     }
 
-    private final ClosedChannelListener closedChannelListener = new ClosedChannelListener();
-
-    class ClosedChannelListener implements Closeable, FutureListener<Void> {
+    /**
+     * Channel handler that responds to channelInactive event and reconnects the session.
+     * Only if the initial connection was successfully established and promise was not canceled.
+     */
+    private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
+        private final ReconnectPromise<?, ?> promise;
 
-        private final AtomicBoolean stop = new AtomicBoolean(false);
+        public ClosedChannelHandler(final ReconnectPromise<?, ?> promise) {
+            this.promise = promise;
+        }
 
         @Override
-        public void operationComplete(final Future<Void> future) throws Exception {
-            if (stop.get()) {
+        public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+            if (promise.isCancelled()) {
                 return;
             }
 
-            // Start reconnecting crashed session after negotiation was successful
-            if (!negotiationFinished.get()) {
+            // Check if initial connection was fully finished. If the session was dropped during negotiation, reconnect will not happen.
+            // Session can be dropped during negotiation on purpose by the client side and would make no sense to initiate reconnect
+            if (promise.isInitialConnectFinished() == false) {
                 return;
             }
 
-            connect();
-        }
-
-        @Override
-        public void close() {
-            this.stop.set(true);
+            LOG.debug("Reconnecting after connection to {} was dropped", promise.address);
+            promise.connect();
         }
     }
 
-    private void addChannelClosedListener(final ChannelFuture channelFuture) {
-        channelFuture.addListener(closedChannelListener);
-    }
-
-    @Override
-    public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
-        closedChannelListener.close();
-
-        if (super.cancel(mayInterruptIfRunning)) {
-            this.pending.cancel(mayInterruptIfRunning);
-            return true;
-        }
-
-        return false;
-    }
 }
index 3c429fc..a756a0d 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.protocol.framework;
 import java.util.EventListener;
 
 /**
- * Listener that receives session state informations. This interface should be
+ * Listener that receives session state information. This interface should be
  * implemented by a protocol specific abstract class, that is extended by
  * a final class that implements the methods.
  */
index bead1ee..63026e3 100644 (file)
@@ -9,6 +9,14 @@ package org.opendaylight.protocol.framework;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -16,50 +24,139 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
-
+import io.netty.util.concurrent.SucceededFuture;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class ServerTest {
     SimpleDispatcher clientDispatcher, dispatcher;
 
-    final SimpleSessionListener pce = new SimpleSessionListener();
-
     SimpleSession session = null;
 
     ChannelFuture server = null;
 
     InetSocketAddress serverAddress;
     private NioEventLoopGroup eventLoopGroup;
-
+    // Dedicated loop group for server, needed for testing reconnection client
+    // With dedicated server group we can simulate session drop by shutting only the server group down
+    private NioEventLoopGroup serverLoopGroup;
 
     @Before
     public void setUp() {
         final int port = 10000 + (int)(10000 * Math.random());
         serverAddress = new InetSocketAddress("127.0.0.1", port);
         eventLoopGroup = new NioEventLoopGroup();
+        serverLoopGroup = new NioEventLoopGroup();
+    }
+
+    @After
+    public void tearDown() throws IOException, InterruptedException, ExecutionException {
+        if(server != null) {
+            this.server.channel().close();
+        }
+        this.eventLoopGroup.shutdownGracefully().get();
+        this.serverLoopGroup.shutdownGracefully().get();
+        try {
+            Thread.sleep(500);
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Test
-    public void testConnectionEstablished() throws Exception {
+    public void testConnectionRefused() throws Exception {
+        this.clientDispatcher = getClientDispatcher();
+
+        final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
+
+        this.clientDispatcher.createClient(this.serverAddress,
+                mockReconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
+                    @Override
+                    public SimpleSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+
+        Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
+    }
+
+    @Test
+    public void testConnectionReestablishInitial() throws Exception {
+        this.clientDispatcher = getClientDispatcher();
+
+        final ReconnectStrategy mockReconnectStrategy = getMockedReconnectStrategy();
+
+        this.clientDispatcher.createClient(this.serverAddress,
+                mockReconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
+                    @Override
+                    public SimpleSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+
+        Mockito.verify(mockReconnectStrategy, timeout(5000).atLeast(2)).scheduleReconnect(any(Throwable.class));
+
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+        this.dispatcher = getServerDispatcher(p);
+
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        });
+
+        this.server.get();
+
+        assertEquals(true, p.get(3, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testConnectionDrop() throws Exception {
         final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
 
-        this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+        this.dispatcher = getServerDispatcher(p);
 
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
             @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                    final Channel channel, final Promise<SimpleSession> promise) {
-                p.setSuccess(true);
-                return new SimpleSessionNegotiator(promise, channel);
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
             }
-        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+        });
+
+        this.server.get();
+
+        this.clientDispatcher = getClientDispatcher();
+
+        final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+        this.session = this.clientDispatcher.createClient(this.serverAddress,
+                reconnectStrategy, new SessionListenerFactory<SimpleSessionListener>() {
+                    @Override
+                    public SimpleSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                }).get(6, TimeUnit.SECONDS);
+
+        assertEquals(true, p.get(3, TimeUnit.SECONDS));
+
+        shutdownServer();
+
+        // No reconnect should be scheduled after server drops connection with not-reconnecting client
+        verify(reconnectStrategy, times(0)).scheduleReconnect(any(Throwable.class));
+    }
+
+    @Test
+    public void testConnectionReestablishAfterDrop() throws Exception {
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+        this.dispatcher = getServerDispatcher(p);
 
         this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
             @Override
@@ -70,13 +167,42 @@ public class ServerTest {
 
         this.server.get();
 
-        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+        this.clientDispatcher = getClientDispatcher();
+
+        final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
+        final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+        doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
+
+        this.clientDispatcher.createReconnectingClient(this.serverAddress,
+                reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
+                    @Override
+                    public SimpleSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+
+        assertEquals(true, p.get(3, TimeUnit.SECONDS));
+        shutdownServer();
+
+        verify(reconnectStrategyFactory, timeout(20000).atLeast(2)).createReconnectStrategy();
+    }
+
+    @Test
+    public void testConnectionEstablished() throws Exception {
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+        this.dispatcher = getServerDispatcher(p);
+
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
             @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                    final Channel channel, final Promise<SimpleSession> promise) {
-                return new SimpleSessionNegotiator(promise, channel);
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
             }
-        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+        });
+
+        this.server.get();
+
+        this.clientDispatcher = getClientDispatcher();
 
         this.session = this.clientDispatcher.createClient(this.serverAddress,
                 new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
@@ -93,15 +219,7 @@ public class ServerTest {
     public void testConnectionFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException {
         final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
 
-        this.dispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-
-            @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                    final Channel channel, final Promise<SimpleSession> promise) {
-                p.setSuccess(true);
-                return new SimpleSessionNegotiator(promise, channel);
-            }
-        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+        this.dispatcher = getServerDispatcher(p);
 
         this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
             @Override
@@ -112,13 +230,7 @@ public class ServerTest {
 
         this.server.get();
 
-        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
-            @Override
-            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
-                    final Channel channel, final Promise<SimpleSession> promise) {
-                return new SimpleSessionNegotiator(promise, channel);
-            }
-        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+        this.clientDispatcher = getClientDispatcher();
 
         this.session = this.clientDispatcher.createClient(this.serverAddress,
                 new NeverReconnectStrategy(GlobalEventExecutor.INSTANCE, 5000), new SessionListenerFactory<SimpleSessionListener>() {
@@ -138,14 +250,89 @@ public class ServerTest {
         assertFalse(session.isSuccess());
     }
 
-    @After
-    public void tearDown() throws IOException, InterruptedException {
-        this.server.channel().close();
-        this.eventLoopGroup.shutdownGracefully();
-        try {
-            Thread.sleep(500);
-        } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
-        }
+    @Test
+    public void testNegotiationFailedNoReconnect() throws Exception {
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+        this.dispatcher = getServerDispatcher(p);
+
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        });
+
+        this.server.get();
+
+        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                                                         final Channel channel, final Promise<SimpleSession> promise) {
+
+                return new SimpleSessionNegotiator(promise, channel) {
+                    @Override
+                    protected void startNegotiation() throws Exception {
+                        negotiationFailed(new IllegalStateException("Negotiation failed"));
+                    }
+                };
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+        final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
+        final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+        doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
+
+        this.clientDispatcher.createReconnectingClient(this.serverAddress,
+                reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
+                    @Override
+                    public SimpleSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+
+
+        // Only one strategy should be created for initial connect, no more = no reconnects
+        verify(reconnectStrategyFactory, times(1)).createReconnectStrategy();
     }
+
+    private SimpleDispatcher getClientDispatcher() {
+        return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                                                         final Channel channel, final Promise<SimpleSession> promise) {
+                return new SimpleSessionNegotiator(promise, channel);
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+    }
+
+    private ReconnectStrategy getMockedReconnectStrategy() throws Exception {
+        final ReconnectStrategy mockReconnectStrategy = mock(ReconnectStrategy.class);
+        final Future<Void> future = new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null);
+        doReturn(future).when(mockReconnectStrategy).scheduleReconnect(any(Throwable.class));
+        doReturn(5000).when(mockReconnectStrategy).getConnectTimeout();
+        doNothing().when(mockReconnectStrategy).reconnectSuccessful();
+        return mockReconnectStrategy;
+    }
+
+
+    private void shutdownServer() throws InterruptedException, ExecutionException {
+        // Shutdown server
+        server.channel().close().get();
+        // Closing server channel does not close established connections, eventLoop has to be closed as well to simulate dropped session
+        serverLoopGroup.shutdownGracefully().get();
+    }
+
+    private SimpleDispatcher getServerDispatcher(final Promise<Boolean> p) {
+        return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                                                         final Channel channel, final Promise<SimpleSession> promise) {
+                p.setSuccess(true);
+                return new SimpleSessionNegotiator(promise, channel);
+            }
+        }, null, serverLoopGroup);
+    }
+
 }
index 12aac9e..d837385 100644 (file)
@@ -54,6 +54,10 @@ public class SimpleDispatcher extends AbstractDispatcher<SimpleSession, SimpleSe
         return super.createClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
     }
 
+    public Future<Void> createReconnectingClient(final InetSocketAddress address, final ReconnectStrategyFactory strategy, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
+        return super.createReconnectingClient(address, strategy, new SimplePipelineInitializer(listenerFactory));
+    }
+
     public ChannelFuture createServer(final InetSocketAddress address, final SessionListenerFactory<SimpleSessionListener> listenerFactory) {
         return super.createServer(address, new SimplePipelineInitializer(listenerFactory));
     }
index ec46219..6050f7c 100644 (file)
@@ -14,7 +14,7 @@ import org.opendaylight.yangtools.sal.binding.generator.api.ClassLoadingStrategy
 import org.opendaylight.yangtools.sal.binding.generator.impl.RuntimeGeneratedMappingServiceImpl;
 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
 import org.opendaylight.yangtools.yang.data.impl.codec.CodecRegistry;
-import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.osgi.framework.BundleContext;
 
 /**
@@ -31,7 +31,7 @@ public class CodecRegistryProvider implements AutoCloseable {
     public CodecRegistryProvider(final ClassLoadingStrategy classLoadingStrategy, final BundleContext context) {
         service = new RuntimeGeneratedMappingServiceImpl(CLASS_POOL, classLoadingStrategy);
         registration = OsgiRegistrationUtil.registerService(context, service,
-                SchemaServiceListener.class, BindingIndependentMappingService.class);
+                SchemaContextListener.class, BindingIndependentMappingService.class);
     }
 
     public CodecRegistry getCodecRegistry() {
diff --git a/opendaylight/config/config-netty-config/pom.xml b/opendaylight/config/config-netty-config/pom.xml
new file mode 100644 (file)
index 0000000..8dc31dc
--- /dev/null
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>config-subsystem</artifactId>
+    <version>0.2.5-SNAPSHOT</version>
+  </parent>
+  <artifactId>config-netty-config</artifactId>
+  <description>Configuration files for sal-rest-connector</description>
+  <packaging>jar</packaging>
+  <build>
+       <plugins>
+               <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/00-netty.xml</file>
+                  <type>xml</type>
+                  <classifier>config</classifier>
+                </artifact>
+              </artifacts>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
index 66bb01f..26fac47 100644 (file)
@@ -39,6 +39,7 @@
     <module>shutdown-impl</module>
     <module>netconf-config-dispatcher</module>
     <module>config-module-archetype</module>
+    <module>config-netty-config</module>
   </modules>
 
   <dependencies>
index 541c130..4d0770f 100644 (file)
             <phase>generate-resources</phase>
             <configuration>
                <outputDirectory>${project.build.directory}/configuration</outputDirectory>
-               <includeArtifactIds>sal-rest-connector-config</includeArtifactIds>
+               <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config</includeArtifactIds>
                <includes>**\/*.xml</includes>
                <excludeTransitive>true</excludeTransitive>
                <ignorePermissions>false</ignorePermissions>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-rest-connector-config</artifactId>
         </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>config-netty-config</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>md-sal-config</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>netconf-config</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>netconf-connector-config</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller.samples</groupId>
+          <artifactId>toaster-config</artifactId>
+        </dependency>
         <dependency>
           <groupId>org.opendaylight.controller</groupId>
           <artifactId>sal-rest-docgen</artifactId>
                 <phase>generate-resources</phase>
                 <configuration>
                    <outputDirectory>${project.build.directory}/configuration</outputDirectory>
-                   <includeArtifactIds>sal-rest-connector-config</includeArtifactIds>
+                   <includeArtifactIds>sal-rest-connector-config,config-netty-config,md-sal-config,netconf-config,toaster-config,netconf-connector-config</includeArtifactIds>
                    <includes>**\/*.xml</includes>
                    <excludeTransitive>true</excludeTransitive>
                    <ignorePermissions>false</ignorePermissions>
diff --git a/opendaylight/hosttracker/api/src/main/java/org/opendaylight/controller/hosttracker/IHostTrackerShell.java b/opendaylight/hosttracker/api/src/main/java/org/opendaylight/controller/hosttracker/IHostTrackerShell.java
new file mode 100644 (file)
index 0000000..5d11be4
--- /dev/null
@@ -0,0 +1,9 @@
+package org.opendaylight.controller.hosttracker;
+
+import java.util.List;
+
+public interface IHostTrackerShell{
+
+    public List<String> dumpPendingArpReqList();
+    public List<String> dumpFailedArpReqList();
+}
\ No newline at end of file
index 825b245..7e52771 100644 (file)
@@ -16,6 +16,7 @@ import java.util.Set;
 import org.apache.felix.dm.Component;
 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
+import org.opendaylight.controller.hosttracker.IHostTrackerShell;
 import org.opendaylight.controller.hosttracker.IfHostListener;
 import org.opendaylight.controller.hosttracker.IfIptoHost;
 import org.opendaylight.controller.hosttracker.IfNewHostNotify;
@@ -78,6 +79,7 @@ public class Activator extends ComponentActivatorAbstractBase {
                             IInventoryListener.class.getName(),
                             IfIptoHost.class.getName(),
                             IfHostListener.class.getName(),
+                            IHostTrackerShell.class.getName(),
                             ITopologyManagerAware.class.getName(),
                             ICacheUpdateAware.class.getName() }, props);
 
index ce49b59..f728b35 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.controller.clustering.services.IClusterContainerServices
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.hosttracker.HostIdFactory;
 import org.opendaylight.controller.hosttracker.IHostId;
+import org.opendaylight.controller.hosttracker.IHostTrackerShell;
 import org.opendaylight.controller.hosttracker.IPHostId;
 import org.opendaylight.controller.hosttracker.IPMacHostId;
 import org.opendaylight.controller.hosttracker.IfHostListener;
@@ -100,7 +101,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 
-public class HostTracker implements IfIptoHost, IfHostListener, ISwitchManagerAware, IInventoryListener,
+public class HostTracker implements IfIptoHost, IfHostListener, IHostTrackerShell, ISwitchManagerAware, IInventoryListener,
         ITopologyManagerAware, ICacheUpdateAware<IHostId, HostNodeConnector>, CommandProvider {
     static final String ACTIVE_HOST_CACHE = "hosttracker.ActiveHosts";
     static final String INACTIVE_HOST_CACHE = "hosttracker.InactiveHosts";
@@ -1618,4 +1619,24 @@ public class HostTracker implements IfIptoHost, IfHostListener, ISwitchManagerAw
         IHostId id = HostIdFactory.create(addr, null);
         return getHostNetworkHierarchy(id);
     }
+
+    @Override
+    public List<String> dumpPendingArpReqList() {
+        ARPPending arphost;
+        List<String> arpReq = new ArrayList<String>();
+        for (Entry<IHostId, ARPPending> entry : ARPPendingList.entrySet()) {
+            arpReq.add(entry.getValue().getHostId().toString());
+        }
+        return arpReq;
+    }
+
+    @Override
+    public List<String> dumpFailedArpReqList() {
+        ARPPending arphost;
+        List<String> arpReq = new ArrayList<String>();
+        for (Entry<IHostId, ARPPending> entry : failedARPReqList.entrySet()) {
+            arpReq.add(entry.getValue().getHostId().toString());
+        }
+        return arpReq;
+    }
 }
diff --git a/opendaylight/hosttracker/shell/pom.xml b/opendaylight/hosttracker/shell/pom.xml
new file mode 100644 (file)
index 0000000..3f73303
--- /dev/null
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.opendaylight</artifactId>
+    <version>1.4.2-SNAPSHOT</version>
+    <relativePath>../../commons/opendaylight</relativePath>
+  </parent>
+  <artifactId>hosttracker.shell</artifactId>
+  <version>${hosttracker.shell.version}</version>
+  <packaging>bundle</packaging>
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.karaf.shell</groupId>
+      <artifactId>org.apache.karaf.shell.console</artifactId>
+      <version>3.0.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>hosttracker.implementation</artifactId>
+      <version>${hosttracker.implementation.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>${bundle.plugin.version}</version>
+        <configuration>
+          <instructions>
+            <Import-Package>org.apache.felix.service.command,
+              org.apache.karaf.shell.commands,
+              org.apache.karaf.shell.console,
+              *</Import-Package>
+          </instructions>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/opendaylight/hosttracker/shell/src/main/java/org/opendaylight/controller/hosttracker/shell/DumpFailedARPReqList.java b/opendaylight/hosttracker/shell/src/main/java/org/opendaylight/controller/hosttracker/shell/DumpFailedARPReqList.java
new file mode 100644 (file)
index 0000000..ec9971f
--- /dev/null
@@ -0,0 +1,28 @@
+package org.opendaylight.controller.hosttracker.shell;
+/**
+* Copyright (c) 2014 Inocybe Technologies, and others. All rights reserved.
+*
+* This program and the accompanying materials are made available under the
+* terms of the Eclipse Public License v1.0 which accompanies this distribution,
+* and is available at http://www.eclipse.org/legal/epl-v10.html
+*/
+
+import org.apache.felix.gogo.commands.Command;
+import org.apache.karaf.shell.console.OsgiCommandSupport;
+import org.opendaylight.controller.hosttracker.IHostTrackerShell;
+
+@Command(scope = "hosttracker", name = "dumpFailedARPReqList", description="Display the dump failed ARPReqList")
+public class DumpFailedARPReqList extends OsgiCommandSupport{
+
+    private IHostTrackerShell hostTracker;
+
+    @Override
+    protected Object doExecute() throws Exception {
+        System.out.print(hostTracker.dumpFailedArpReqList());
+        return null;
+    }
+
+    public void setHostTracker(IHostTrackerShell hostTracker){
+        this.hostTracker = hostTracker;
+    }
+}
diff --git a/opendaylight/hosttracker/shell/src/main/java/org/opendaylight/controller/hosttracker/shell/DumpPendingARPReqList.java b/opendaylight/hosttracker/shell/src/main/java/org/opendaylight/controller/hosttracker/shell/DumpPendingARPReqList.java
new file mode 100644 (file)
index 0000000..7f52a55
--- /dev/null
@@ -0,0 +1,28 @@
+package org.opendaylight.controller.hosttracker.shell;
+/**
+* Copyright (c) 2014 Inocybe Technologies, and others. All rights reserved.
+*
+* This program and the accompanying materials are made available under the
+* terms of the Eclipse Public License v1.0 which accompanies this distribution,
+* and is available at http://www.eclipse.org/legal/epl-v10.html
+*/
+
+import org.apache.felix.gogo.commands.Command;
+import org.apache.karaf.shell.console.OsgiCommandSupport;
+import org.opendaylight.controller.hosttracker.IHostTrackerShell;
+
+@Command(scope = "hosttracker", name = "dumpPendingARPReqList", description="Display the dump pending ARPReqList")
+public class DumpPendingARPReqList extends OsgiCommandSupport{
+
+    private IHostTrackerShell hostTracker;
+
+    @Override
+    protected Object doExecute() throws Exception {
+        System.out.print(hostTracker.dumpPendingArpReqList());
+        return null;
+    }
+
+    public void setHostTracker(IHostTrackerShell hostTracker){
+        this.hostTracker = hostTracker;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/hosttracker/shell/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/opendaylight/hosttracker/shell/src/main/resources/OSGI-INF/blueprint/blueprint.xml
new file mode 100644 (file)
index 0000000..ba79b5d
--- /dev/null
@@ -0,0 +1,17 @@
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+    <reference id="HostTrackerRef" interface="org.opendaylight.controller.hosttracker.IHostTrackerShell"/>
+
+    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
+
+    <command>
+       <action class="org.opendaylight.controller.hosttracker.shell.DumpFailedARPReqList">
+          <property name="hostTracker" ref="HostTrackerRef"/>
+       </action>
+    </command>
+    <command>
+       <action class="org.opendaylight.controller.hosttracker.shell.DumpPendingARPReqList">
+          <property name="hostTracker" ref="HostTrackerRef"/>
+       </action>
+    </command>
+    </command-bundle>
+</blueprint>
diff --git a/opendaylight/hosttracker/shell/src/test/java/org/opendaylight/controller/hosttracker/shell/HostTrackerShellTest.java b/opendaylight/hosttracker/shell/src/test/java/org/opendaylight/controller/hosttracker/shell/HostTrackerShellTest.java
new file mode 100644 (file)
index 0000000..a0a5a2e
--- /dev/null
@@ -0,0 +1,47 @@
+package org.opendaylight.controller.hosttracker.shell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.opendaylight.controller.hosttracker.IHostTrackerShell;
+
+public class HostTrackerShellTest {
+
+    private final long COMMAND_TIMEOUT = 1000;
+    private IHostTrackerShell hostTracker;
+
+    @Test
+    public void testDumpPendingARPReqList() throws Exception {
+        DumpPendingARPReqList dumpPendTest = new DumpPendingARPReqList();
+        hostTracker = mock(IHostTrackerShell.class);
+        List<String> failedList = new ArrayList<String>(Arrays.asList("a", "b", "c"));
+        when(hostTracker.dumpPendingArpReqList()).thenReturn(failedList);
+        dumpPendTest.setHostTracker(hostTracker);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(baos));
+        dumpPendTest.doExecute();
+        //Assert.assertTrue(true);
+        Assert.assertEquals("[a, b, c]", baos.toString());
+    }
+
+    @Test
+    public void testDumpFailedARPReqList() throws Exception {
+        DumpFailedARPReqList dumpFailTest = new DumpFailedARPReqList();
+        hostTracker = mock(IHostTrackerShell.class);
+        List<String> failedList = new ArrayList<String>(Arrays.asList("a", "b", "c"));
+        when(hostTracker.dumpFailedArpReqList()).thenReturn(failedList);
+        dumpFailTest.setHostTracker(hostTracker);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(baos));
+        dumpFailTest.doExecute();
+        //Assert.assertTrue(true);
+        Assert.assertEquals("[a, b, c]", baos.toString());
+    }
+}
diff --git a/opendaylight/md-sal/md-sal-config/pom.xml b/opendaylight/md-sal/md-sal-config/pom.xml
new file mode 100644 (file)
index 0000000..2e19b5a
--- /dev/null
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+
+ This program and the accompanying materials are made available under the
+ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.1-SNAPSHOT</version>
+  </parent>
+  <artifactId>md-sal-config</artifactId>
+  <description>Configuration files for md-sal</description>
+  <packaging>jar</packaging>
+  <build>
+    <plugins>
+        <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>attach-artifacts</id>
+            <goals>
+              <goal>attach-artifact</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <artifacts>
+                <artifact>
+                  <file>${project.build.directory}/classes/initial/01-md-sal.xml</file>
+                  <type>xml</type>
+                  <classifier>config</classifier>
+                </artifact>
+              </artifacts>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
                     </schema-service>
                 </module>
                 -->
-                <!-- Cluster RPC -->
-                <!-- Enable the following module if you want to use remote rpc connector
-                <module>
-                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
-                    <name>remote-rpc-connector</name>
-                    <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
-                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
-                        <name>dom-broker</name>
-                    </dom-broker>
-                </module>
-                -->
+
                 <module>
                     <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:inmemory-datastore-provider">prefix:inmemory-operational-datastore-provider</type>
                     <name>operational-store-service</name>
                         </binding-mapping-service>
                     </binding-forwarded-data-broker>
                 </module>
+                <!-- Cluster RPC -->
+                <!-- Enable the following module if you want to use remote rpc connector
+                <module>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
+                    <name>remote-rpc-connector</name>
+                    <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
+                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
+                        <name>dom-broker</name>
+                    </dom-broker>
+                </module>
+                -->
             </modules>
             <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
                     <service>
index bfa7e26..02fbde8 100644 (file)
@@ -32,6 +32,9 @@
 
     <module>sal-binding-util</module>
 
+    <!-- Configuration File -->
+    <module>md-sal-config</module>
+
     <!-- Samples -->
     <module>samples</module>
 
     <!-- Documentation -->
     <module>sal-rest-docgen</module>
 
+    <module>sal-akka-raft</module>
+
     <!--InMemory DOM DataStore-->
     <module>sal-inmemory-datastore</module>
 
     <!--sal-protocolbuffer-encoding-->
     <module>sal-protocolbuffer-encoding</module>
 
-    <!--sal-distributed-datastore-->
+    <!-- sal-distributed-datastore -->
     <module>sal-distributed-datastore</module>
 
+    <!-- XSQL -->
+    <module>sal-dom-xsql</module>
+
     <!-- Yang Test Models for MD-SAL -->
     <module>sal-test-model</module>
+
+    <!-- Clustering -->
+    <module>sal-remoterpc-connector</module>
   </modules>
 
   <build>
diff --git a/opendaylight/md-sal/sal-akka-raft/README-FIRST b/opendaylight/md-sal/sal-akka-raft/README-FIRST
new file mode 100644 (file)
index 0000000..d0be2cb
--- /dev/null
@@ -0,0 +1,28 @@
+Instructions on generating the protocol buffer Java source files
+
+These instructions are developers who are planning to generate the protocolbuffer java source files.
+
+1. We are using protocol buffer version 2.5.0 - you need to install the exact same version on your box.
+The download link is https://code.google.com/p/protobuf/downloads/list. Download .tar/zip based on
+your OS.
+
+2. Once downloaded the tar/zip and extracted follow the README instructions to compile protoc on your
+machine
+
+3. Create your .proto (IDL) file in resources folder. Give appropriate package name so that the source
+   get generation in proper packages. For more information  check
+   https://developers.google.com/protocol-buffers/docs/javatutorial
+
+   For detailed information https://developers.google.com/protocol-buffers/docs/reference/java-generated
+
+4. To generate the java source files execute in sal-protocolbuffer-encoding directory
+   just run.sh in sal-protocolbuffer-encoding or execute the following command
+
+ protoc --proto_path=src/main/resources --java_out=src/main/java src/main/resources/*.proto
+
+5. Run mvn clean install  & build the .jar
+
+6. !!!WARNING!!! - never edit the generated sources files of protocol buffer
+
+7. !!!NOTE!!! if you are planning to add new .proto file  option java_package should begin with
+   org.opendaylight.controller.protobuff.messages to properly exclude from sonar.
index 50442bd..3dcc546 100644 (file)
   <packaging>bundle</packaging>
 
   <dependencies>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+    </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
           </execution>
         </executions>
       </plugin>
+
       <plugin>
         <groupId>org.jacoco</groupId>
         <artifactId>jacoco-maven-plugin</artifactId>
           </execution>
         </executions>
       </plugin>
+
+     <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+            <excludes>**/protobuff/**/*</excludes>
+        </configuration>
+      </plugin>
+
     </plugins>
   </build>
   <scm>
diff --git a/opendaylight/md-sal/sal-akka-raft/run.sh b/opendaylight/md-sal/sal-akka-raft/run.sh
new file mode 100755 (executable)
index 0000000..7c588f0
--- /dev/null
@@ -0,0 +1,42 @@
+#!/bin/bash
+
+#####################################################################################################
+#Instructions on generating the protocol buffer Java source files
+#
+#These instructions are developers who are planning to generate the protocolbuffer java source files.
+#
+#1. We are using protocol buffer version 2.5.0 - you need to install the exact same version on your box.
+#The download link is https://code.google.com/p/protobuf/downloads/list. Download .tar/zip based on
+#your OS.
+#
+#2. Once downloaded the tar/zip and extracted follow the README instructions to compile protoc on your
+#machine
+#
+#3. Create your .proto (IDL) file in resources folder. Give appropriate package name so that the source
+#   get generation in proper packages. For more information  check
+#   https://developers.google.com/protocol-buffers/docs/javatutorial
+#
+#   For detailed information https://developers.google.com/protocol-buffers/docs/reference/java-generated
+#
+#4. To generate the java source files execute in sal-protocolbuffer-encoding execute ./run.sh i.e. this script
+# or run command
+#       protoc --proto_path=src/main/resources --java_out=src/main/java src/main/resources/*.proto
+#
+#5. Run mvn clean install & build the .jar
+#
+#6  !!!WARNING!!!! never edit the source files generated
+#
+#7. !!!NOTE!!! if you are planning to add new .proto file  option java_package should begin with
+#   org.opendaylight.controller.protobuff.messages to properly exclude from sonar.
+########################################################################################################
+
+protoc --proto_path=src/main/resources --java_out=src/main/java src/main/resources/*.proto
+
+protoc --proto_path=src/main/resources --proto_path=src/test/resources --java_out=src/test/java src/test/resources/*.proto
+
+echo "Done generating Java source files."
+
+#to remove trailing spaces in the code files
+find src/main/java -type f -name '*.java' -exec sed --in-place 's/[[:space:]]\+$//' {} \+
+
+find src/test/java -type f -name '*.java' -exec sed --in-place 's/[[:space:]]\+$//' {} \+
index 8d4d5e4..aa100df 100644 (file)
@@ -11,11 +11,14 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -30,25 +33,26 @@ public class ExampleActor extends RaftActor {
     private long persistIdentifier = 1;
 
 
-    public ExampleActor(String id, Map<String, String> peerAddresses) {
-        super(id, peerAddresses);
+    public ExampleActor(String id, Map<String, String> peerAddresses,
+        Optional<ConfigParams> configParams) {
+        super(id, peerAddresses, configParams);
     }
 
-    public static Props props(final String id, final Map<String, String> peerAddresses){
+    public static Props props(final String id, final Map<String, String> peerAddresses,
+        final Optional<ConfigParams> configParams){
         return Props.create(new Creator<ExampleActor>(){
 
             @Override public ExampleActor create() throws Exception {
-                return new ExampleActor(id, peerAddresses);
+                return new ExampleActor(id, peerAddresses, configParams);
             }
         });
     }
 
     @Override public void onReceiveCommand(Object message){
         if(message instanceof KeyValue){
-
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
-                persistData(getSender(), persistId, message);
+                persistData(getSender(), persistId, (Payload) message);
             } else {
                 if(getLeader() != null) {
                     getLeader().forward(message, getContext());
@@ -56,12 +60,15 @@ public class ExampleActor extends RaftActor {
             }
 
         } else if (message instanceof PrintState) {
-            LOG.debug("State of the node:"+getId() + " is="+state.size());
+            LOG.debug("State of the node:{} has entries={}, {}",
+                getId(), state.size(), getReplicatedLogState());
 
         } else if (message instanceof PrintRole) {
-            LOG.debug(getId() + " = " + getRaftState());
+            LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers());
+
+        } else {
+            super.onReceiveCommand(message);
         }
-        super.onReceiveCommand(message);
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,
@@ -82,6 +89,7 @@ public class ExampleActor extends RaftActor {
     @Override protected void applySnapshot(Object snapshot) {
         state.clear();
         state.putAll((HashMap) snapshot);
+        LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size());
     }
 
     @Override public void onReceiveRecover(Object message) {
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleConfigParamsImpl.java
new file mode 100644 (file)
index 0000000..d11377d
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.example;
+
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+
+/**
+ * Implementation of ConfigParams for Example
+ */
+public class ExampleConfigParamsImpl extends DefaultConfigParamsImpl {
+    @Override
+    public long getSnapshotBatchCount() {
+        return 50;
+    }
+}
index a148ed4..0e5d643 100644 (file)
@@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
 
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
@@ -34,15 +36,15 @@ public class Main {
     public static void main(String[] args) throws Exception{
         ActorRef example1Actor =
             actorSystem.actorOf(ExampleActor.props("example-1",
-                withoutPeer("example-1")), "example-1");
+                withoutPeer("example-1"), Optional.<ConfigParams>absent()), "example-1");
 
         ActorRef example2Actor =
             actorSystem.actorOf(ExampleActor.props("example-2",
-                withoutPeer("example-2")), "example-2");
+                withoutPeer("example-2"), Optional.<ConfigParams>absent()), "example-2");
 
         ActorRef example3Actor =
             actorSystem.actorOf(ExampleActor.props("example-3",
-                withoutPeer("example-3")), "example-3");
+                withoutPeer("example-3"), Optional.<ConfigParams>absent()), "example-3");
 
 
         List<ActorRef> examples = Arrays.asList(example1Actor, example2Actor, example3Actor);
@@ -74,7 +76,8 @@ public class Main {
                         String actorName = "example-" + i;
                         examples.add(i - 1,
                             actorSystem.actorOf(ExampleActor.props(actorName,
-                                withoutPeer(actorName)), actorName));
+                                withoutPeer(actorName), Optional.<ConfigParams>absent()),
+                                actorName));
                         System.out.println("Created actor : " + actorName);
                         continue;
                     }
index c8a7835..fd6e192 100644 (file)
@@ -2,8 +2,10 @@ package org.opendaylight.controller.cluster.example;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 
@@ -11,17 +13,23 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
-
+/**
+ * This is a test driver for testing akka-raft implementation
+ * Its uses ExampleActors and threads to push content(key-vals) to these actors
+ * Each ExampleActor can have one or more ClientActors. Each ClientActor spawns
+ * a thread and starts push logs to the actor its assigned to.
+ */
 public class TestDriver {
 
     private static final ActorSystem actorSystem = ActorSystem.create();
     private static Map<String, String> allPeers = new HashMap<>();
     private static Map<String, ActorRef> clientActorRefs  = new HashMap<String, ActorRef>();
     private static Map<String, ActorRef> actorRefs = new HashMap<String, ActorRef>();
-    private static LogGenerator logGenerator = new LogGenerator();;
+    private static LogGenerator logGenerator = new LogGenerator();
+    private int nameCounter = 0;
+    private static ConfigParams configParams = new ExampleConfigParamsImpl();
 
     /**
      * Create nodes, add clients and start logging.
@@ -30,6 +38,7 @@ public class TestDriver {
      *  createNodes:{num}
      *  addNodes:{num}
      *  stopNode:{nodeName}
+     *  reinstateNode:{nodeName}
      *  addClients:{num}
      *  addClientsToNode:{nodeName, num}
      *  startLogging
@@ -78,6 +87,10 @@ public class TestDriver {
                 String[] arr = command.split(":");
                 td.stopNode(arr[1]);
 
+            } else if (command.startsWith("reinstateNode")) {
+                String[] arr = command.split(":");
+                td.reinstateNode(arr[1]);
+
             } else if (command.startsWith("startLogging")) {
                 td.startAllLogging();
 
@@ -101,15 +114,19 @@ public class TestDriver {
         }
     }
 
+    public static ActorRef createExampleActor(String name) {
+        return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
+            Optional.of(configParams)), name);
+    }
+
     public void createNodes(int num) {
         for (int i=0; i < num; i++)  {
-            int rand = getUnusedRandom(num);
-            allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+            nameCounter = nameCounter + 1;
+            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
         }
 
         for (String s : allPeers.keySet())  {
-            ActorRef exampleActor = actorSystem.actorOf(
-                ExampleActor.props(s, withoutPeer(s)), s);
+            ActorRef exampleActor = createExampleActor(s);
             actorRefs.put(s, exampleActor);
             System.out.println("Created node:"+s);
 
@@ -120,15 +137,14 @@ public class TestDriver {
     public void addNodes(int num) {
         Map<String, String> newPeers = new HashMap<>();
         for (int i=0; i < num; i++)  {
-            int rand = getUnusedRandom(num);
-            newPeers.put("example-"+rand, "akka://default/user/example-"+rand);
-            allPeers.put("example-"+rand, "akka://default/user/example-"+rand);
+            nameCounter = nameCounter + 1;
+            newPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
+            allPeers.put("example-"+nameCounter, "akka://default/user/example-"+nameCounter);
 
         }
         Map<String, ActorRef> newActorRefs = new HashMap<String, ActorRef>(num);
         for (Map.Entry<String, String> entry : newPeers.entrySet())  {
-            ActorRef exampleActor = actorSystem.actorOf(
-                ExampleActor.props(entry.getKey(), withoutPeer(entry.getKey())), entry.getKey());
+            ActorRef exampleActor = createExampleActor(entry.getKey());
             newActorRefs.put(entry.getKey(), exampleActor);
 
             //now also add these new nodes as peers from the previous nodes
@@ -160,7 +176,7 @@ public class TestDriver {
     public void addClientsToNode(String actorName, int num) {
         ActorRef actorRef = actorRefs.get(actorName);
         for (int i=0; i < num; i++) {
-            String clientName = "client-" + i + "-" + actorRef;
+            String clientName = "client-" + i + "-" + actorName;
             clientActorRefs.put(clientName,
                 actorSystem.actorOf(ClientActor.props(actorRef), clientName));
             System.out.println("Added client-node:" + clientName);
@@ -169,11 +185,13 @@ public class TestDriver {
 
     public void stopNode(String actorName) {
         ActorRef actorRef = actorRefs.get(actorName);
-        String clientName = "client-"+actorName;
-        if(clientActorRefs.containsKey(clientName)) {
-            actorSystem.stop(clientActorRefs.get(clientName));
-            clientActorRefs.remove(clientName);
+
+        for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
+            if (entry.getKey().endsWith(actorName)) {
+                actorSystem.stop(entry.getValue());
+            }
         }
+
         actorSystem.stop(actorRef);
         actorRefs.remove(actorName);
 
@@ -182,7 +200,21 @@ public class TestDriver {
         }
 
         allPeers.remove(actorName);
+    }
 
+    public void reinstateNode(String actorName) {
+        String address = "akka://default/user/"+actorName;
+        allPeers.put(actorName, address);
+
+        ActorRef exampleActor = createExampleActor(actorName);
+
+        for (ActorRef actor : actorRefs.values()) {
+            actor.tell(new AddRaftPeer(actorName, address), null);
+        }
+
+        actorRefs.put(actorName, exampleActor);
+
+        addClientsToNode(actorName, 1);
     }
 
     public void startAllLogging() {
@@ -227,14 +259,6 @@ public class TestDriver {
         return null;
     }
 
-    private int getUnusedRandom(int num) {
-        int rand = -1;
-        do {
-            rand = (new Random()).nextInt(num * num);
-        } while (allPeers.keySet().contains("example-"+rand));
-
-        return rand;
-    }
 
     private static Map<String, String> withoutPeer(String peerId) {
         Map<String, String> without = new ConcurrentHashMap<>(allPeers);
index 00cc09a..560d5fc 100644 (file)
@@ -8,11 +8,21 @@
 
 package org.opendaylight.controller.cluster.example.messages;
 
+import com.google.protobuf.GeneratedMessage;
+import org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KeyValue extends Payload implements Serializable {
+    private String key;
+    private String value;
 
-public class KeyValue implements Serializable{
-    private final String key;
-    private final String value;
+    public KeyValue() {
+    }
 
     public KeyValue(String key, String value){
         this.key = key;
@@ -27,10 +37,37 @@ public class KeyValue implements Serializable{
         return value;
     }
 
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
     @Override public String toString() {
         return "KeyValue{" +
             "key='" + key + '\'' +
             ", value='" + value + '\'' +
             '}';
     }
+
+    // override this method to return  the protobuff related extension fields and their values
+    @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
+        Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<>();
+        map.put(KeyValueMessages.key, getKey());
+        map.put(KeyValueMessages.value, getValue());
+        return map;
+    }
+
+    // override this method to assign the values from protobuff
+    @Override public Payload decode(
+        AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) {
+        String key = payloadProtoBuff.getExtension(KeyValueMessages.key);
+        String value = payloadProtoBuff.getExtension(KeyValueMessages.value);
+        this.setKey(key);
+        this.setValue(value);
+        return this;
+    }
+
 }
index c9d4bfa..a5105f0 100644 (file)
@@ -1,7 +1,14 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.example.messages;
 
-/**
- * Created by kramesha on 7/17/14.
- */
-public class PrintRole {
+import java.io.Serializable;
+
+public class PrintRole implements Serializable {
 }
index dbf863d..20ed142 100644 (file)
@@ -1,7 +1,14 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.example.messages;
 
-/**
- * Created by kramesha on 7/17/14.
- */
-public class PrintState {
+import java.io.Serializable;
+
+public class PrintState implements Serializable {
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/protobuff/messages/KeyValueMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/protobuff/messages/KeyValueMessages.java
new file mode 100644 (file)
index 0000000..c3d4bbf
--- /dev/null
@@ -0,0 +1,73 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: KeyValueMessages.proto
+
+package org.opendaylight.controller.cluster.example.protobuff.messages;
+
+public final class KeyValueMessages {
+  private KeyValueMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+    registry.add(org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages.key);
+    registry.add(org.opendaylight.controller.cluster.example.protobuff.messages.KeyValueMessages.value);
+  }
+  public static final int KEY_FIELD_NUMBER = 2;
+  /**
+   * <code>extend .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload { ... }</code>
+   */
+  public static final
+    com.google.protobuf.GeneratedMessage.GeneratedExtension<
+      org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload,
+      java.lang.String> key = com.google.protobuf.GeneratedMessage
+          .newFileScopedGeneratedExtension(
+        java.lang.String.class,
+        null);
+  public static final int VALUE_FIELD_NUMBER = 3;
+  /**
+   * <code>extend .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload { ... }</code>
+   */
+  public static final
+    com.google.protobuf.GeneratedMessage.GeneratedExtension<
+      org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload,
+      java.lang.String> value = com.google.protobuf.GeneratedMessage
+          .newFileScopedGeneratedExtension(
+        java.lang.String.class,
+        null);
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    java.lang.String[] descriptorData = {
+      "\n\026KeyValueMessages.proto\022(org.opendaylig" +
+      "ht.controller.cluster.raft\032\033AppendEntrie" +
+      "sMessages.proto:_\n\003key\022R.org.opendayligh" +
+      "t.controller.cluster.raft.AppendEntries." +
+      "ReplicatedLogEntry.Payload\030\002 \001(\t:a\n\005valu" +
+      "e\022R.org.opendaylight.controller.cluster." +
+      "raft.AppendEntries.ReplicatedLogEntry.Pa" +
+      "yload\030\003 \001(\tBT\n>org.opendaylight.controll" +
+      "er.cluster.example.protobuff.messagesB\020K" +
+      "eyValueMessagesH\001"
+    };
+    com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+      new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+        public com.google.protobuf.ExtensionRegistry assignDescriptors(
+            com.google.protobuf.Descriptors.FileDescriptor root) {
+          descriptor = root;
+          key.internalInit(descriptor.getExtensions().get(0));
+          value.internalInit(descriptor.getExtensions().get(1));
+          return null;
+        }
+      };
+    com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+          org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.getDescriptor(),
+        }, assigner);
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
new file mode 100644 (file)
index 0000000..24bfa3d
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract class handling the mapping of
+ * logical LogEntry Index and the physical list index.
+ */
+public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+
+    protected final List<ReplicatedLogEntry> journal;
+    protected final Object snapshot;
+    protected long snapshotIndex = -1;
+    protected long snapshotTerm = -1;
+
+    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+        long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+        this.snapshot = state;
+        this.snapshotIndex = snapshotIndex;
+        this.snapshotTerm = snapshotTerm;
+        this.journal = new ArrayList<>(unAppliedEntries);
+    }
+
+
+    public AbstractReplicatedLogImpl() {
+        this.snapshot = null;
+        this.journal = new ArrayList<>();
+    }
+
+    protected int adjustedIndex(long logEntryIndex) {
+        if(snapshotIndex < 0){
+            return (int) logEntryIndex;
+        }
+        return (int) (logEntryIndex - (snapshotIndex + 1));
+    }
+
+    @Override
+    public ReplicatedLogEntry get(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+
+        if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+            // physical index should be less than list size and >= 0
+            return null;
+        }
+
+        return journal.get(adjustedIndex);
+    }
+
+    @Override
+    public ReplicatedLogEntry last() {
+        if (journal.isEmpty()) {
+            return null;
+        }
+        // get the last entry directly from the physical index
+        return journal.get(journal.size() - 1);
+    }
+
+    @Override
+    public long lastIndex() {
+        if (journal.isEmpty()) {
+            // it can happen that after snapshot, all the entries of the
+            // journal are trimmed till lastApplied, so lastIndex = snapshotIndex
+            return snapshotIndex;
+        }
+        return last().getIndex();
+    }
+
+    @Override
+    public long lastTerm() {
+        if (journal.isEmpty()) {
+            // it can happen that after snapshot, all the entries of the
+            // journal are trimmed till lastApplied, so lastTerm = snapshotTerm
+            return snapshotTerm;
+        }
+        return last().getTerm();
+    }
+
+    @Override
+    public void removeFrom(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+        if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+            // physical index should be less than list size and >= 0
+            return;
+        }
+        journal.subList(adjustedIndex , journal.size()).clear();
+    }
+
+    @Override
+    public void append(ReplicatedLogEntry replicatedLogEntry) {
+        journal.add(replicatedLogEntry);
+    }
+
+    @Override
+    public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+        int size = journal.size();
+        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+        if (adjustedIndex >= 0 && adjustedIndex < size) {
+            // physical index should be less than list size and >= 0
+            entries.addAll(journal.subList(adjustedIndex, size));
+        }
+        return entries;
+    }
+
+    @Override
+    public long size() {
+       return journal.size();
+    }
+
+    @Override
+    public boolean isPresent(long logEntryIndex) {
+        if (logEntryIndex > lastIndex()) {
+            // if the request logical index is less than the last present in the list
+            return false;
+        }
+        int adjustedIndex = adjustedIndex(logEntryIndex);
+        return (adjustedIndex >= 0);
+    }
+
+    @Override
+    public boolean isInSnapshot(long logEntryIndex) {
+        return logEntryIndex <= snapshotIndex;
+    }
+
+    @Override
+    public Object getSnapshot() {
+        return snapshot;
+    }
+
+    @Override
+    public long getSnapshotIndex() {
+        return snapshotIndex;
+    }
+
+    @Override
+    public long getSnapshotTerm() {
+        return snapshotTerm;
+    }
+
+    @Override
+    public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
+
+    @Override
+    public abstract void removeFromAndPersist(long index);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
new file mode 100644 (file)
index 0000000..4c6434a
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Configuration Parameter interface for configuring the Raft consensus system
+ * <p/>
+ * Any component using this implementation might want to provide an implementation of
+ * this interface to configure
+ *
+ * A default implementation will be used if none is provided.
+ *
+ * @author Kamal Rameshan
+ */
+public interface ConfigParams {
+    /**
+     * The minimum number of entries to be present in the in-memory Raft log
+     * for a snapshot to be taken
+     *
+     * @return long
+     */
+    public long getSnapshotBatchCount();
+
+    /**
+     * The interval at which a heart beat message will be sent to the remote
+     * RaftActor
+     *
+     * @return FiniteDuration
+     */
+    public FiniteDuration getHeartBeatInterval();
+
+    /**
+     * The interval in which a new election would get triggered if no leader is found
+     *
+     * Normally its set to atleast twice the heart beat interval
+     *
+     * @return FiniteDuration
+     */
+    public FiniteDuration getElectionTimeOutInterval();
+
+    /**
+     * The maximum election time variance. The election is scheduled using both
+     * the Election Timeout and Variance
+     *
+     * @return int
+     */
+    public int getElectionTimeVariance();
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
new file mode 100644 (file)
index 0000000..c633337
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default implementation of the ConfigParams
+ *
+ * If no implementation is provided for ConfigParams, then this will be used.
+ */
+public class DefaultConfigParamsImpl implements ConfigParams {
+
+    private static final int SNAPSHOT_BATCH_COUNT = 100000;
+
+    /**
+     * The maximum election time variance
+     */
+    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
+
+
+    /**
+     * The interval at which a heart beat message will be sent to the remote
+     * RaftActor
+     * <p/>
+     * Since this is set to 100 milliseconds the Election timeout should be
+     * at least 200 milliseconds
+     */
+    protected static final FiniteDuration HEART_BEAT_INTERVAL =
+        new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
+
+    @Override
+    public long getSnapshotBatchCount() {
+        return SNAPSHOT_BATCH_COUNT;
+    }
+
+    @Override
+    public FiniteDuration getHeartBeatInterval() {
+        return HEART_BEAT_INTERVAL;
+    }
+
+
+    @Override
+    public FiniteDuration getElectionTimeOutInterval() {
+        // returns 2 times the heart beat interval
+        return HEART_BEAT_INTERVAL.$times(2);
+    }
+
+    @Override
+    public int getElectionTimeVariance() {
+        return ELECTION_TIME_MAX_VARIANCE;
+    }
+}
index 0ff2341..70b85b4 100644 (file)
@@ -19,6 +19,10 @@ import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.UntypedPersistentActor;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -26,13 +30,10 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -99,14 +100,22 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
+        this(id, peerAddresses, Optional.<ConfigParams>absent());
+    }
+
+    public RaftActor(String id, Map<String, String> peerAddresses,
+         Optional<ConfigParams> configParams) {
+
         context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(),
-            id, new ElectionTermImpl(),
-            -1, -1, replicatedLog, peerAddresses, LOG);
+            this.getContext(), id, new ElectionTermImpl(),
+            -1, -1, replicatedLog, peerAddresses,
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+            LOG);
     }
 
     @Override public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
+            LOG.debug("SnapshotOffer called..");
             SnapshotOffer offer = (SnapshotOffer) message;
             Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -115,6 +124,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             // when we need to install it on a peer
             replicatedLog = new ReplicatedLogImpl(snapshot);
 
+            context.setReplicatedLog(replicatedLog);
+
+            LOG.debug("Applied snapshot to replicatedLog. " +
+                "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+                replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
+                replicatedLog.size());
+
             // Apply the snapshot to the actors state
             applySnapshot(snapshot.getState());
 
@@ -126,7 +142,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
-                "Last index in log : " + replicatedLog.lastIndex());
+                "RecoveryCompleted - Switching actor to Follower - " +
+                    "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "journal-size={}",
+                replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+                replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
         }
     }
@@ -141,14 +161,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             applyState(applyState.getClientActor(), applyState.getIdentifier(),
                 applyState.getReplicatedLogEntry().getData());
+
         } else if(message instanceof ApplySnapshot ) {
             applySnapshot(((ApplySnapshot) message).getSnapshot());
+
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(
                     context.getPeerAddress(currentBehavior.getLeaderId())),
                 getSelf()
             );
+
         } else if (message instanceof SaveSnapshotSuccess) {
             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
 
@@ -158,7 +181,12 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if (message instanceof SaveSnapshotFailure) {
 
             // TODO: Handle failure in saving the snapshot
-            // Maybe do retries on failure
+
+        } else if (message instanceof FindLeader){
+
+            getSender().tell(new FindLeaderReply(
+                context.getPeerAddress(currentBehavior.getLeaderId())),
+                getSelf());
 
         } else if (message instanceof AddRaftPeer){
 
@@ -182,6 +210,15 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
     }
 
+    public java.util.Set<String> getPeers() {
+        return context.getPeerAddresses().keySet();
+    }
+
+    protected String getReplicatedLogState() {
+        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
+            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
+            + ", im-mem journal size=" + context.getReplicatedLog().size();
+    }
 
 
     /**
@@ -193,7 +230,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * @param data
      */
     protected void persistData(ActorRef clientActor, String identifier,
-        Object data) {
+        Payload data) {
 
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
@@ -241,6 +278,24 @@ public abstract class RaftActor extends UntypedPersistentActor {
         return currentBehavior.state();
     }
 
+    /**
+     * setPeerAddress sets the address of a known peer at a later time.
+     * <p>
+     * This is to account for situations where a we know that a peer
+     * exists but we do not know an address up-front. This may also be used in
+     * situations where a known peer starts off in a different location and we
+     * need to change it's address
+     * <p>
+     * Note that if the peerId does not match the list of peers passed to
+     * this actor during construction an IllegalStateException will be thrown.
+     *
+     * @param peerId
+     * @param peerAddress
+     */
+    protected void setPeerAddress(String peerId, String peerAddress){
+        context.setPeerAddress(peerId, peerAddress);
+    }
+
 
 
     /**
@@ -316,85 +371,33 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
     private void trimPersistentData(long sequenceNumber) {
-        // Trim snapshots
+        // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
         // For now guessing that it is ANDed.
         deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - 100000, 43200000));
+            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-        // Trim journal
+        // Trim akka journal
         deleteMessages(sequenceNumber);
     }
 
 
-    private class ReplicatedLogImpl implements ReplicatedLog {
-        private final List<ReplicatedLogEntry> journal;
-        private final Object snapshot;
-        private long snapshotIndex = -1;
-        private long snapshotTerm = -1;
+    private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            this.snapshot = snapshot.getState();
-            this.snapshotIndex = snapshot.getLastAppliedIndex();
-            this.snapshotTerm = snapshot.getLastAppliedTerm();
-
-            this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
+            super(snapshot.getState(),
+                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+                snapshot.getUnAppliedEntries());
         }
 
         public ReplicatedLogImpl() {
-            this.snapshot = null;
-            this.journal = new ArrayList<>();
-        }
-
-        @Override public ReplicatedLogEntry get(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return null;
-            }
-
-            return journal.get(adjustedIndex);
+            super();
         }
 
-        @Override public ReplicatedLogEntry last() {
-            if (journal.size() == 0) {
-                return null;
-            }
-            return get(journal.size() - 1);
-        }
-
-        @Override public long lastIndex() {
-            if (journal.size() == 0) {
-                return -1;
-            }
-
-            return last().getIndex();
-        }
-
-        @Override public long lastTerm() {
-            if (journal.size() == 0) {
-                return -1;
-            }
-
-            return last().getTerm();
-        }
-
-
-        @Override public void removeFrom(long index) {
-            int adjustedIndex = adjustedIndex(index);
+        @Override public void removeFromAndPersist(long logEntryIndex) {
+            int adjustedIndex = adjustedIndex(logEntryIndex);
 
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return;
-            }
-
-            journal.subList(adjustedIndex , journal.size()).clear();
-        }
-
-
-        @Override public void removeFromAndPersist(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+            if (adjustedIndex < 0) {
                 return;
             }
 
@@ -408,29 +411,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
                     //FIXME : Doing nothing for now
                 }
             });
-
-
-        }
-
-        @Override public void append(
-            final ReplicatedLogEntry replicatedLogEntry) {
-            journal.add(replicatedLogEntry);
-        }
-
-        @Override public List<ReplicatedLogEntry> getFrom(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            List<ReplicatedLogEntry> entries = new ArrayList<>(100);
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return entries;
-            }
-
-
-            for (int i = adjustedIndex;
-                 i < journal.size(); i++) {
-                entries.add(journal.get(i));
-            }
-            return entries;
         }
 
         @Override public void appendAndPersist(
@@ -442,7 +422,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
             context.getLogger().debug(
-                "Append log entry and persist " + replicatedLogEntry);
+                "Append log entry and persist {} ", replicatedLogEntry);
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
@@ -455,20 +435,42 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (size() > 100000) {
-                            ReplicatedLogEntry lastAppliedEntry =
-                                get(context.getLastApplied());
+                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+                            LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
+
+                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
                             if (lastAppliedEntry != null) {
                                 lastAppliedIndex = lastAppliedEntry.getIndex();
                                 lastAppliedTerm = lastAppliedEntry.getTerm();
                             }
 
-                            saveSnapshot(Snapshot.create(createSnapshot(),
+                            LOG.debug("Snapshot Capture logSize: {}", journal.size());
+                            LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
+                            LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+                            LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+
+                            // create a snapshot object from the state provided and save it
+                            // when snapshot is saved async, SaveSnapshotSuccess is raised.
+                            Snapshot sn = Snapshot.create(createSnapshot(),
                                 getFrom(context.getLastApplied() + 1),
                                 lastIndex(), lastTerm(), lastAppliedIndex,
-                                lastAppliedTerm));
+                                lastAppliedTerm);
+                            saveSnapshot(sn);
+
+                            LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+                            //be greedy and remove entries from in-mem journal which are in the snapshot
+                            // and update snapshotIndex and snapshotTerm without waiting for the success,
+                            // TODO: damage-recovery to be done on failure
+                            journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
+                            snapshotIndex = lastAppliedIndex;
+                            snapshotTerm = lastAppliedTerm;
+
+                            LOG.info("Removed in-memory snapshotted entries, " +
+                                "adjusted snaphsotIndex:{}" +
+                                "and term:{}", snapshotIndex, lastAppliedTerm);
                         }
                         // Send message for replication
                         if (clientActor != null) {
@@ -482,76 +484,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
             );
         }
 
-        @Override public long size() {
-            return journal.size() + snapshotIndex + 1;
-        }
-
-        @Override public boolean isPresent(long index) {
-            int adjustedIndex = adjustedIndex(index);
-
-            if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override public boolean isInSnapshot(long index) {
-            return index <= snapshotIndex;
-        }
-
-        @Override public Object getSnapshot() {
-            return snapshot;
-        }
-
-        @Override public long getSnapshotIndex() {
-            return snapshotIndex;
-        }
-
-        @Override public long getSnapshotTerm() {
-            return snapshotTerm;
-        }
-
-        private int adjustedIndex(long index) {
-            if(snapshotIndex < 0){
-                return (int) index;
-            }
-            return (int) (index - snapshotIndex);
-        }
-    }
-
-
-    private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
-        Serializable {
-
-        private final long index;
-        private final long term;
-        private final Object payload;
-
-        public ReplicatedLogImplEntry(long index, long term, Object payload) {
-
-            this.index = index;
-            this.term = term;
-            this.payload = payload;
-        }
-
-        @Override public Object getData() {
-            return payload;
-        }
-
-        @Override public long getTerm() {
-            return term;
-        }
-
-        @Override public long getIndex() {
-            return index;
-        }
-
-        @Override public String toString() {
-            return "Entry{" +
-                "index=" + index +
-                ", term=" + term +
-                '}';
-        }
     }
 
     private static class DeleteEntries implements Serializable {
@@ -614,6 +546,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public long getLastAppliedTerm() {
             return lastAppliedTerm;
         }
+
+        public String getLogMessage() {
+            StringBuilder sb = new StringBuilder();
+            return sb.append("Snapshot={")
+                .append("lastTerm:" + this.getLastTerm()  + ", ")
+                .append("LastAppliedIndex:" + this.getLastAppliedIndex()  + ", ")
+                .append("LastAppliedTerm:" + this.getLastAppliedTerm()  + ", ")
+                .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size()  + "}")
+                .toString();
+
+        }
     }
 
     private class ElectionTermImpl implements ElectionTerm {
index 7150ec0..0eb4b73 100644 (file)
@@ -85,6 +85,12 @@ public interface RaftActorContext {
      */
     void setLastApplied(long lastApplied);
 
+    /**
+     *
+     * @param replicatedLog
+     */
+    public void setReplicatedLog(ReplicatedLog replicatedLog);
+
     /**
      * @return A representation of the log
      */
@@ -96,21 +102,27 @@ public interface RaftActorContext {
     ActorSystem getActorSystem();
 
     /**
+     * Get the logger to be used for logging messages
      *
      * @return
      */
     LoggingAdapter getLogger();
 
     /**
-     * Get a mapping of peer id's their addresses
+     * Get a mapping of peerId's to their addresses
+     *
      * @return
+     *
      */
     Map<String, String> getPeerAddresses();
 
     /**
+     * Get the address of the peer as a String. This is the same format in
+     * which a consumer would provide the address
      *
      * @param peerId
-     * @return
+     * @return The address of the peer or null if the address has not yet been
+     *         resolved
      */
     String getPeerAddress(String peerId);
 
@@ -126,4 +138,32 @@ public interface RaftActorContext {
      * @param name
      */
     public void removePeer(String name);
+
+    /**
+     * Given a peerId return the corresponding actor
+     * <p>
+     *
+     *
+     * @param peerId
+     * @return The actorSelection corresponding to the peer or null if the
+     *         address has not yet been resolved
+     */
+    ActorSelection getPeerActorSelection(String peerId);
+
+    /**
+     * Set Peer Address can be called at a later time to change the address of
+     * a known peer.
+     *
+     * <p>
+     * Throws an IllegalStateException if the peer is unknown
+     *
+     * @param peerId
+     * @param peerAddress
+     */
+    void setPeerAddress(String peerId, String peerAddress);
+
+    /**
+     * @return ConfigParams
+     */
+    public ConfigParams getConfigParams();
 }
index a0f1328..25da371 100644 (file)
@@ -17,7 +17,9 @@ import akka.event.LoggingAdapter;
 
 import java.util.Map;
 
-public class RaftActorContextImpl implements RaftActorContext{
+import static com.google.common.base.Preconditions.checkState;
+
+public class RaftActorContextImpl implements RaftActorContext {
 
     private final ActorRef actor;
 
@@ -31,17 +33,20 @@ public class RaftActorContextImpl implements RaftActorContext{
 
     private long lastApplied;
 
-    private final ReplicatedLog replicatedLog;
+    private ReplicatedLog replicatedLog;
 
     private final Map<String, String> peerAddresses;
 
     private final LoggingAdapter LOG;
 
+    private final ConfigParams configParams;
 
     public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
         String id,
         ElectionTerm termInformation, long commitIndex,
-        long lastApplied, ReplicatedLog replicatedLog, Map<String, String> peerAddresses, LoggingAdapter logger) {
+        long lastApplied, ReplicatedLog replicatedLog,
+        Map<String, String> peerAddresses, ConfigParams configParams,
+        LoggingAdapter logger) {
         this.actor = actor;
         this.context = context;
         this.id = id;
@@ -50,6 +55,7 @@ public class RaftActorContextImpl implements RaftActorContext{
         this.lastApplied = lastApplied;
         this.replicatedLog = replicatedLog;
         this.peerAddresses = peerAddresses;
+        this.configParams = configParams;
         this.LOG = logger;
     }
 
@@ -89,6 +95,10 @@ public class RaftActorContextImpl implements RaftActorContext{
         this.lastApplied = lastApplied;
     }
 
+    @Override public void setReplicatedLog(ReplicatedLog replicatedLog) {
+        this.replicatedLog = replicatedLog;
+    }
+
     @Override public ReplicatedLog getReplicatedLog() {
         return replicatedLog;
     }
@@ -109,13 +119,30 @@ public class RaftActorContextImpl implements RaftActorContext{
         return peerAddresses.get(peerId);
     }
 
+    @Override public ConfigParams getConfigParams() {
+        return configParams;
+    }
+
     @Override public void addToPeers(String name, String address) {
-        LOG.debug("Kamal--> addToPeer for:"+name);
         peerAddresses.put(name, address);
     }
 
     @Override public void removePeer(String name) {
-        LOG.debug("Kamal--> removePeer for:"+name);
         peerAddresses.remove(name);
     }
+
+    @Override public ActorSelection getPeerActorSelection(String peerId) {
+        String peerAddress = getPeerAddress(peerId);
+        if(peerAddress != null){
+            return actorSelection(peerAddress);
+        }
+        return null;
+    }
+
+    @Override public void setPeerAddress(String peerId, String peerAddress) {
+        LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
+        checkState(peerAddresses.containsKey(peerId), peerId + " is unknown");
+
+        peerAddresses.put(peerId, peerAddress);
+    }
 }
index 3bbaa22..f501c4d 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
 /**
  * Represents one entry in the replicated log
  */
@@ -17,7 +19,7 @@ public interface ReplicatedLogEntry {
      *
      * @return
      */
-    Object getData();
+    Payload getData();
 
     /**
      * The term stored in that entry
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java
new file mode 100644 (file)
index 0000000..fc2ec5c
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+import java.io.Serializable;
+
+public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
+    Serializable {
+
+    private final long index;
+    private final long term;
+    private final Payload payload;
+
+    public ReplicatedLogImplEntry(long index, long term, Payload payload) {
+
+        this.index = index;
+        this.term = term;
+        this.payload = payload;
+    }
+
+    @Override public Payload getData() {
+        return payload;
+    }
+
+    @Override public long getTerm() {
+        return term;
+    }
+
+    @Override public long getIndex() {
+        return index;
+    }
+
+    @Override public String toString() {
+        return "Entry{" +
+            "index=" + index +
+            ", term=" + term +
+            '}';
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SerializationUtils.java
new file mode 100644 (file)
index 0000000..374e0fa
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+
+public class SerializationUtils {
+
+    public static Object fromSerializable(Object serializable){
+        if(serializable.getClass().equals(AppendEntries.SERIALIZABLE_CLASS)){
+            return AppendEntries.fromSerializable(serializable);
+        }
+        return serializable;
+    }
+
+}
@@ -6,9 +6,11 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
 
-public class ApplySnapshot {
+import java.io.Serializable;
+
+public class ApplySnapshot implements Serializable {
     private final Object snapshot;
 
     public ApplySnapshot(Object snapshot) {
@@ -6,12 +6,14 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
 
 import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 
-public class ApplyState {
+import java.io.Serializable;
+
+public class ApplyState implements Serializable {
     private final ActorRef clientActor;
     private final String identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
@@ -6,10 +6,12 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
 
 /**
  * Message sent to commit an entry to the log
  */
-public class CommitEntry {
+public class CommitEntry implements Serializable {
 }
@@ -6,10 +6,12 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
 
 /**
  * Message sent to Persist an entry into the transaction journal
  */
-public class PersistEntry {
+public class PersistEntry implements Serializable {
 }
@@ -6,12 +6,14 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
 
 import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 
-public class Replicate {
+import java.io.Serializable;
+
+public class Replicate implements Serializable {
     private final ActorRef clientActor;
     private final String identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
@@ -6,11 +6,13 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
 
 /**
  * This message is sent by a RaftActor to itself so that a subclass can process
  * it and use it to save it's state
  */
-public class SaveSnapshot {
+public class SaveSnapshot implements Serializable {
 }
@@ -6,7 +6,9 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.cluster.raft.internal.messages;
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
 
 /**
  * This messages is sent to the Leader to prompt it to send a heartbeat
@@ -14,5 +16,5 @@ package org.opendaylight.controller.cluster.raft.internal.messages;
  *
  * Typically the Leader to itself on a schedule
  */
-public class SendHeartBeat {
+public class SendHeartBeat implements Serializable {
 }
index 1d78bb0..0a553b4 100644 (file)
@@ -14,8 +14,9 @@ import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
@@ -42,27 +43,6 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected final RaftActorContext context;
 
-    /**
-     * The maximum election time variance
-     */
-    private static final int ELECTION_TIME_MAX_VARIANCE = 100;
-
-    /**
-     * The interval at which a heart beat message will be sent to the remote
-     * RaftActor
-     * <p/>
-     * Since this is set to 100 milliseconds the Election timeout should be
-     * at least 200 milliseconds
-     */
-    protected static final FiniteDuration HEART_BEAT_INTERVAL =
-        new FiniteDuration(100, TimeUnit.MILLISECONDS);
-
-    /**
-     * The interval in which a new election would get triggered if no leader is found
-     */
-    private static final long ELECTION_TIME_INTERVAL =
-        HEART_BEAT_INTERVAL.toMillis() * 2;
-
     /**
      *
      */
@@ -207,9 +187,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @return
      */
     protected FiniteDuration electionDuration() {
-        long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
-        return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
-            TimeUnit.MILLISECONDS);
+        long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
+        return context.getConfigParams().getElectionTimeOutInterval().$plus(
+            new FiniteDuration(variance, TimeUnit.MILLISECONDS));
     }
 
     /**
@@ -349,6 +329,10 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         context.setLastApplied(index);
     }
 
+    protected Object fromSerializableMessage(Object serializable){
+        return SerializationUtils.fromSerializable(serializable);
+    }
+
     @Override
     public RaftState handleMessage(ActorRef sender, Object message) {
         if (message instanceof AppendEntries) {
index ecd4901..c125bd3 100644 (file)
@@ -12,16 +12,14 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * The behavior of a RaftActor when it is in the CandidateState
@@ -43,24 +41,20 @@ import java.util.Map;
  */
 public class Candidate extends AbstractRaftActorBehavior {
 
-    private final Map<String, ActorSelection> peerToActor = new HashMap<>();
-
     private int voteCount;
 
     private final int votesRequired;
 
+    private final Set<String> peers;
+
     public Candidate(RaftActorContext context) {
         super(context);
 
-        Collection<String> peerPaths = context.getPeerAddresses().values();
+        peers = context.getPeerAddresses().keySet();
 
-        for (String peerPath : peerPaths) {
-            peerToActor.put(peerPath,
-                context.actorSelection(peerPath));
-        }
+        context.getLogger().debug("Election:Candidate has following peers:"+ peers);
 
-        context.getLogger().debug("Election:Candidate has following peers:"+peerToActor.keySet());
-        if(peerPaths.size() > 0) {
+        if(peers.size() > 0) {
             // Votes are required from a majority of the peers including self.
             // The votesRequired field therefore stores a calculated value
             // of the number of votes required for this candidate to win an
@@ -73,7 +67,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             // 0 peers = 1 votesRequired (0 + 1) / 2 + 1 = 1
             // 2 peers = 2 votesRequired (2 + 1) / 2 + 1 = 2
             // 4 peers = 3 votesRequired (4 + 1) / 2 + 1 = 3
-            int noOfPeers = peerPaths.size();
+            int noOfPeers = peers.size();
             int self = 1;
             votesRequired = (noOfPeers + self) / 2 + 1;
         } else {
@@ -87,6 +81,8 @@ public class Candidate extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        context.getLogger().info("Candidate: Received {}", appendEntries.toString());
+
         return state();
     }
 
@@ -115,10 +111,16 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
+    public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
 
         if (message instanceof RaftRPC) {
+
             RaftRPC rpc = (RaftRPC) message;
+
+            context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
+
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
@@ -141,6 +143,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             scheduleElection(electionDuration());
             return state();
         }
+
         return super.handleMessage(sender, message);
     }
 
@@ -153,21 +156,25 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().updateAndPersist(currentTerm + 1, context.getId());
+        context.getTermInformation().updateAndPersist(currentTerm + 1,
+            context.getId());
 
-        context.getLogger().debug("Starting new term " + (currentTerm+1));
+        context.getLogger().debug("Starting new term " + (currentTerm + 1));
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
         // amount of time TBD
-        for (ActorSelection peerActor : peerToActor.values()) {
-            peerActor.tell(new RequestVote(
-                    context.getTermInformation().getCurrentTerm(),
-                    context.getId(),
-                    context.getReplicatedLog().lastIndex(),
-                    context.getReplicatedLog().lastTerm()),
-                context.getActor()
-            );
+        for (String peerId : peers) {
+            ActorSelection peerActor = context.getPeerActorSelection(peerId);
+            if(peerActor != null) {
+                peerActor.tell(new RequestVote(
+                        context.getTermInformation().getCurrentTerm(),
+                        context.getId(),
+                        context.getReplicatedLog().lastIndex(),
+                        context.getReplicatedLog().lastTerm()),
+                    context.getActor()
+                );
+            }
         }
 
 
index 532201b..c8cd41d 100644 (file)
@@ -12,8 +12,8 @@ import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -40,6 +40,11 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+            context.getLogger()
+                .info("Follower: Received {}", appendEntries.toString());
+        }
+
         // TODO : Refactor this method into a bunch of smaller methods
         // to make it easier to read. Before refactoring ensure tests
         // cover the code properly
@@ -121,15 +126,10 @@ public class Follower extends AbstractRaftActorBehavior {
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
 
-                // Find the entry up until which the one that is not in the
-                // follower's log
-                for (int i = 0;
-                     i < appendEntries.getEntries()
-                         .size(); i++, addEntriesFrom++) {
-                    ReplicatedLogEntry matchEntry =
-                        appendEntries.getEntries().get(i);
-                    ReplicatedLogEntry newEntry = context.getReplicatedLog()
-                        .get(matchEntry.getIndex());
+                // Find the entry up until which the one that is not in the follower's log
+                for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
+                    ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
+                    ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
 
                     if (newEntry == null) {
                         //newEntry not found in the log
@@ -162,8 +162,9 @@ public class Follower extends AbstractRaftActorBehavior {
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
 
-                context.getLogger().debug(
-                    "Append entry to log " + appendEntries.getEntries().get(i).getData()
+                context.getLogger().info(
+                    "Append entry to log " + appendEntries.getEntries().get(
+                        i).getData()
                         .toString()
                 );
                 context.getReplicatedLog()
@@ -214,7 +215,10 @@ public class Follower extends AbstractRaftActorBehavior {
         return RaftState.Follower;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+
+        Object message = fromSerializableMessage(originalMessage);
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -227,9 +231,10 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (message instanceof ElectionTimeout) {
             return RaftState.Candidate;
+
         } else if (message instanceof InstallSnapshot) {
-            InstallSnapshot snapshot = (InstallSnapshot) message;
-            actor().tell(new ApplySnapshot(snapshot), actor());
+            InstallSnapshot installSnapshot = (InstallSnapshot) message;
+            actor().tell(new ApplySnapshot(installSnapshot.getData()), actor());
         }
 
         scheduleElection(electionDuration());
index fb8be8b..2a44e8b 100644 (file)
@@ -19,10 +19,10 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -67,7 +68,7 @@ public class Leader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog =
         new HashMap();
 
-    private final Map<String, ActorSelection> followerToActor = new HashMap<>();
+    private final Set<String> followers;
 
     private Cancellable heartbeatSchedule = null;
     private Cancellable appendEntriesSchedule = null;
@@ -84,22 +85,21 @@ public class Leader extends AbstractRaftActorBehavior {
             context.setCommitIndex(lastIndex());
         }
 
-        for (String followerId : context.getPeerAddresses().keySet()) {
+        followers = context.getPeerAddresses().keySet();
+
+        for (String followerId : followers) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId,
                     new AtomicLong(lastIndex()),
                     new AtomicLong(-1));
 
-            followerToActor.put(followerId,
-                context.actorSelection(context.getPeerAddress(followerId)));
-
             followerToLog.put(followerId, followerLogInformation);
         }
 
-        context.getLogger().debug("Election:Leader has following peers:"+followerToActor.keySet());
+        context.getLogger().debug("Election:Leader has following peers:"+ followers);
 
-        if (followerToActor.size() > 0) {
-            minReplicationCount = (followerToActor.size() + 1) / 2 + 1;
+        if (followers.size() > 0) {
+            minReplicationCount = (followers.size() + 1) / 2 + 1;
         } else {
             minReplicationCount = 0;
         }
@@ -112,8 +112,8 @@ public class Leader extends AbstractRaftActorBehavior {
         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
 
         scheduleInstallSnapshotCheck(
-            new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
-                HEART_BEAT_INTERVAL.unit())
+            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 1000,
+                context.getConfigParams().getHeartBeatInterval().unit())
         );
 
     }
@@ -121,16 +121,29 @@ public class Leader extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
+        context.getLogger().info("Leader: Received {}", appendEntries.toString());
+
         return state();
     }
 
     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
+        if(! appendEntriesReply.isSuccess()) {
+            context.getLogger()
+                .info("Leader: Received {}", appendEntriesReply.toString());
+        }
+
         // Update the FollowerLogInformation
         String followerId = appendEntriesReply.getFollowerId();
         FollowerLogInformation followerLogInformation =
             followerToLog.get(followerId);
+
+        if(followerLogInformation == null){
+            context.getLogger().error("Unknown follower {}", followerId);
+            return state();
+        }
+
         if (appendEntriesReply.isSuccess()) {
             followerLogInformation
                 .setMatchIndex(appendEntriesReply.getLogLastIndex());
@@ -200,9 +213,11 @@ public class Leader extends AbstractRaftActorBehavior {
         return RaftState.Leader;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object message) {
+    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
+        Object message = fromSerializableMessage(originalMessage);
+
         if (message instanceof RaftRPC) {
             RaftRPC rpc = (RaftRPC) message;
             // If RPC request or response contains term T > currentTerm:
@@ -226,7 +241,7 @@ public class Leader extends AbstractRaftActorBehavior {
                     (InstallSnapshotReply) message);
             }
         } finally {
-            scheduleHeartBeat(HEART_BEAT_INTERVAL);
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         }
 
         return super.handleMessage(sender, message);
@@ -249,7 +264,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
         context.getLogger().debug("Replicate message " + logIndex);
 
-        if (followerToActor.size() == 0) {
+        if (followers.size() == 0) {
             context.setCommitIndex(
                 replicate.getReplicatedLogEntry().getIndex());
 
@@ -275,35 +290,37 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-        for (String followerId : followerToActor.keySet()) {
+        for (String followerId : followers) {
             ActorSelection followerActor =
-                followerToActor.get(followerId);
+                context.getPeerActorSelection(followerId);
 
-            FollowerLogInformation followerLogInformation =
-                followerToLog.get(followerId);
+            if (followerActor != null) {
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(followerId);
 
-            long nextIndex = followerLogInformation.getNextIndex().get();
+                long nextIndex = followerLogInformation.getNextIndex().get();
 
-            List<ReplicatedLogEntry> entries = Collections.emptyList();
+                List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-            if(context.getReplicatedLog().isPresent(nextIndex)){
-                // TODO: Instead of sending all entries from nextIndex
-                // only send a fixed number of entries to each follower
-                // This is to avoid the situation where there are a lot of
-                // entries to install for a fresh follower or to a follower
-                // that has fallen too far behind with the log but yet is not
-                // eligible to receive a snapshot
-                entries =
-                    context.getReplicatedLog().getFrom(nextIndex);
-            }
+                if (context.getReplicatedLog().isPresent(nextIndex)) {
+                    // TODO: Instead of sending all entries from nextIndex
+                    // only send a fixed number of entries to each follower
+                    // This is to avoid the situation where there are a lot of
+                    // entries to install for a fresh follower or to a follower
+                    // that has fallen too far behind with the log but yet is not
+                    // eligible to receive a snapshot
+                    entries =
+                        context.getReplicatedLog().getFrom(nextIndex);
+                }
 
-            followerActor.tell(
-                new AppendEntries(currentTerm(), context.getId(),
-                    prevLogIndex(nextIndex), prevLogTerm(nextIndex),
-                    entries, context.getCommitIndex()
-                ),
-                actor()
-            );
+                followerActor.tell(
+                    new AppendEntries(currentTerm(), context.getId(),
+                        prevLogIndex(nextIndex),
+                        prevLogTerm(nextIndex), entries,
+                        context.getCommitIndex()).toSerializable(),
+                    actor()
+                );
+            }
         }
     }
 
@@ -313,30 +330,33 @@ public class Leader extends AbstractRaftActorBehavior {
      * snapshots at every heartbeat.
      */
     private void installSnapshotIfNeeded(){
-        for (String followerId : followerToActor.keySet()) {
+        for (String followerId : followers) {
             ActorSelection followerActor =
-                followerToActor.get(followerId);
-
-            FollowerLogInformation followerLogInformation =
-                followerToLog.get(followerId);
-
-            long nextIndex = followerLogInformation.getNextIndex().get();
-
-            if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        context.getReplicatedLog().getSnapshotIndex(),
-                        context.getReplicatedLog().getSnapshotTerm(),
-                        context.getReplicatedLog().getSnapshot()
-                    ),
-                    actor()
-                );
+                context.getPeerActorSelection(followerId);
+
+            if(followerActor != null) {
+                FollowerLogInformation followerLogInformation =
+                    followerToLog.get(followerId);
+
+                long nextIndex = followerLogInformation.getNextIndex().get();
+
+                if (!context.getReplicatedLog().isPresent(nextIndex) && context
+                    .getReplicatedLog().isInSnapshot(nextIndex)) {
+                    followerActor.tell(
+                        new InstallSnapshot(currentTerm(), context.getId(),
+                            context.getReplicatedLog().getSnapshotIndex(),
+                            context.getReplicatedLog().getSnapshotTerm(),
+                            context.getReplicatedLog().getSnapshot()
+                        ),
+                        actor()
+                    );
+                }
             }
         }
     }
 
     private RaftState sendHeartBeat() {
-        if (followerToActor.size() > 0) {
+        if (followers.size() > 0) {
             sendAppendEntries();
         }
         return state();
@@ -355,7 +375,7 @@ public class Leader extends AbstractRaftActorBehavior {
     }
 
     private void scheduleHeartBeat(FiniteDuration interval) {
-        if(followerToActor.keySet().size() == 0){
+        if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
@@ -377,7 +397,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
 
     private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
-        if(followerToActor.keySet().size() == 0){
+        if(followers.size() == 0){
             // Optimization - do not bother scheduling a heartbeat as there are
             // no followers
             return;
index 3cafda9..33d8a68 100644 (file)
@@ -16,9 +16,15 @@ public class AbstractRaftRPC implements RaftRPC {
         this.term = term;
     }
 
+    // added for testing while serialize-messages=on
+    public AbstractRaftRPC() {
+    }
+
     public long getTerm() {
         return term;
     }
 
-
+    public void setTerm(long term) {
+        this.term = term;
+    }
 }
index 9bb5029..94366ef 100644 (file)
@@ -8,15 +8,27 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
+import com.google.protobuf.GeneratedMessage;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Invoked by leader to replicate log entries (§5.3); also used as
  * heartbeat (§5.2).
  */
 public class AppendEntries extends AbstractRaftRPC {
+
+    public static final Class SERIALIZABLE_CLASS = AppendEntriesMessages.AppendEntries.class;
+
+    private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(AppendEntries.class);
+
     // So that follower can redirect clients
     private final String leaderId;
 
@@ -64,12 +76,88 @@ public class AppendEntries extends AbstractRaftRPC {
     }
 
     @Override public String toString() {
-        return "AppendEntries{" +
-            "leaderId='" + leaderId + '\'' +
-            ", prevLogIndex=" + prevLogIndex +
-            ", prevLogTerm=" + prevLogTerm +
-            ", entries=" + entries +
-            ", leaderCommit=" + leaderCommit +
-            '}';
+        final StringBuilder sb =
+            new StringBuilder("AppendEntries{");
+        sb.append("term=").append(getTerm());
+        sb.append("leaderId='").append(leaderId).append('\'');
+        sb.append(", prevLogIndex=").append(prevLogIndex);
+        sb.append(", prevLogTerm=").append(prevLogTerm);
+        sb.append(", entries=").append(entries);
+        sb.append(", leaderCommit=").append(leaderCommit);
+        sb.append('}');
+        return sb.toString();
+    }
+
+    public <T extends Object> Object toSerializable(){
+        AppendEntriesMessages.AppendEntries.Builder to = AppendEntriesMessages.AppendEntries.newBuilder();
+        to.setTerm(this.getTerm())
+            .setLeaderId(this.getLeaderId())
+            .setPrevLogTerm(this.getPrevLogTerm())
+            .setPrevLogIndex(this.getPrevLogIndex())
+            .setLeaderCommit(this.getLeaderCommit());
+
+        for (ReplicatedLogEntry logEntry : this.getEntries()) {
+
+            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder arBuilder =
+                AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.newBuilder();
+
+            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder arpBuilder =
+                AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder();
+
+            //get the client specific payload extensions and add them to the payload builder
+            Map<GeneratedMessage.GeneratedExtension, T> map = logEntry.getData().encode();
+            Iterator<Map.Entry<GeneratedMessage.GeneratedExtension, T>> iter = map.entrySet().iterator();
+
+            while (iter.hasNext()) {
+                Map.Entry<GeneratedMessage.GeneratedExtension, T> entry = iter.next();
+                arpBuilder.setExtension(entry.getKey(), entry.getValue());
+            }
+
+            arpBuilder.setClientPayloadClassName(logEntry.getData().getClientPayloadClassName());
+
+            arBuilder.setData(arpBuilder).setIndex(logEntry.getIndex()).setTerm(logEntry.getTerm());
+            to.addLogEntries(arBuilder);
+        }
+
+        return to.build();
+    }
+
+    public static AppendEntries fromSerializable(Object o){
+        AppendEntriesMessages.AppendEntries from = (AppendEntriesMessages.AppendEntries) o;
+
+        List<ReplicatedLogEntry> logEntryList = new ArrayList<>();
+        for (AppendEntriesMessages.AppendEntries.ReplicatedLogEntry leProtoBuff : from.getLogEntriesList()) {
+
+            Payload payload = null ;
+            try {
+                if(leProtoBuff.getData() != null && leProtoBuff.getData().getClientPayloadClassName() != null) {
+                    String clientPayloadClassName = leProtoBuff.getData().getClientPayloadClassName();
+                    payload = (Payload)Class.forName(clientPayloadClassName).newInstance();
+                    payload = payload.decode(leProtoBuff.getData());
+                    payload.setClientPayloadClassName(clientPayloadClassName);
+                } else {
+                    LOG.error("Payload is null or payload does not have client payload class name");
+                }
+
+            } catch (InstantiationException e) {
+                LOG.error("InstantiationException when instantiating "+leProtoBuff.getData().getClientPayloadClassName(), e);
+            } catch (IllegalAccessException e) {
+                LOG.error("IllegalAccessException when accessing "+leProtoBuff.getData().getClientPayloadClassName(), e);
+            } catch (ClassNotFoundException e) {
+                LOG.error("ClassNotFoundException when loading "+leProtoBuff.getData().getClientPayloadClassName(), e);
+            }
+            ReplicatedLogEntry logEntry = new ReplicatedLogImplEntry(
+                leProtoBuff.getIndex(), leProtoBuff.getTerm(), payload);
+            logEntryList.add(logEntry);
+        }
+
+        AppendEntries to = new AppendEntries(from.getTerm(),
+            from.getLeaderId(),
+            from.getPrevLogIndex(),
+            from.getPrevLogTerm(),
+            logEntryList,
+            from.getLeaderCommit());
+
+        return to;
     }
 }
index 7524d8f..b923baa 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.controller.cluster.raft.messages;
 /**
  * Reply for the AppendEntriesRpc message
  */
-public class AppendEntriesReply extends AbstractRaftRPC{
+public class AppendEntriesReply extends AbstractRaftRPC {
 
     // true if follower contained entry matching
     // prevLogIndex and prevLogTerm
@@ -56,4 +56,16 @@ public class AppendEntriesReply extends AbstractRaftRPC{
     public String getFollowerId() {
         return followerId;
     }
+
+    @Override public String toString() {
+        final StringBuilder sb =
+            new StringBuilder("AppendEntriesReply{");
+        sb.append("term=").append(term);
+        sb.append(", success=").append(success);
+        sb.append(", logLastIndex=").append(logLastIndex);
+        sb.append(", logLastTerm=").append(logLastTerm);
+        sb.append(", followerId='").append(followerId).append('\'');
+        sb.append('}');
+        return sb.toString();
+    }
 }
index 981da17..6ef2a06 100644 (file)
@@ -11,16 +11,16 @@ package org.opendaylight.controller.cluster.raft.messages;
 /**
  * Invoked by candidates to gather votes (§5.2).
  */
-public class RequestVote extends AbstractRaftRPC{
+public class RequestVote extends AbstractRaftRPC {
 
     // candidate requesting vote
-    private final String candidateId;
+    private String candidateId;
 
     // index of candidate’s last log entry (§5.4)
-    private final long lastLogIndex;
+    private long lastLogIndex;
 
     // term of candidate’s last log entry (§5.4)
-    private final long lastLogTerm;
+    private long lastLogTerm;
 
     public RequestVote(long term, String candidateId, long lastLogIndex,
         long lastLogTerm) {
@@ -30,6 +30,10 @@ public class RequestVote extends AbstractRaftRPC{
         this.lastLogTerm = lastLogTerm;
     }
 
+    // added for testing while serialize-messages=on
+    public RequestVote() {
+    }
+
     public long getTerm() {
         return term;
     }
@@ -45,4 +49,27 @@ public class RequestVote extends AbstractRaftRPC{
     public long getLastLogTerm() {
         return lastLogTerm;
     }
+
+    public void setCandidateId(String candidateId) {
+        this.candidateId = candidateId;
+    }
+
+    public void setLastLogIndex(long lastLogIndex) {
+        this.lastLogIndex = lastLogIndex;
+    }
+
+    public void setLastLogTerm(long lastLogTerm) {
+        this.lastLogTerm = lastLogTerm;
+    }
+
+    @Override public String toString() {
+        final StringBuilder sb =
+            new StringBuilder("RequestVote{");
+        sb.append("term='").append(getTerm()).append('\'');
+        sb.append("candidateId='").append(candidateId).append('\'');
+        sb.append(", lastLogIndex=").append(lastLogIndex);
+        sb.append(", lastLogTerm=").append(lastLogTerm);
+        sb.append('}');
+        return sb.toString();
+    }
 }
index 816120c..df80b4e 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft.messages;
 
-public class RequestVoteReply extends AbstractRaftRPC{
+public class RequestVoteReply extends AbstractRaftRPC {
 
     // true means candidate received vot
     private final boolean voteGranted;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
new file mode 100644 (file)
index 0000000..9a251cd
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+
+import com.google.protobuf.GeneratedMessage;
+import org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages;
+
+import java.util.Map;
+
+/**
+ * An instance of a Payload class is meant to be used as the Payload for
+ * AppendEntries.
+ * <p>
+ *
+ * When an actor which is derived from RaftActor attempts to persistData it
+ * must pass an instance of the Payload class. Similarly when state needs to
+ * be applied to the derived RaftActor it will be passed an instance of the
+ * Payload class.
+ * <p>
+ *
+ * To define your own payload do the following,
+ * <ol>
+ *     <li>Create your own protocol buffer message which extends the AppendEntries Payload</li>
+ *     <li>Extend this Payload class</li>
+ *     <li>Implement encode</li>
+ *     <li>Implement decode</li>
+ * </ol>
+ *
+ * Your own protocol buffer message can be create like so, <br/>
+ * <pre>
+ * {@code
+ *
+ * import "AppendEntriesMessages.proto";
+ *
+ * package org.opendaylight.controller.cluster.raft;
+ *
+ * option java_package = "org.opendaylight.controller.cluster.raft.protobuff.messages";
+ * option java_outer_classname = "MockPayloadMessages";
+ *
+ * extend AppendEntries.ReplicatedLogEntry.Payload {
+ *      optional string value = 2;
+ * }
+ * }
+ * </pre>
+ *
+ */
+public abstract class Payload {
+    private String clientPayloadClassName;
+
+    public String getClientPayloadClassName() {
+        return this.getClass().getName();
+    }
+
+    public void setClientPayloadClassName(String clientPayloadClassName) {
+        this.clientPayloadClassName = clientPayloadClassName;
+    }
+
+    /**
+     * Encode the payload data as a protocol buffer extension.
+     * <p>
+     * TODO: Add more meat in here
+     * @param <T>
+     * @return Map of <GeneratedMessage.GeneratedExtension, T>
+     */
+    public abstract <T extends Object> Map<GeneratedMessage.GeneratedExtension, T> encode();
+
+    /**
+     * Decode the protocol buffer payload into a specific Payload as defined
+     * by the class extending RaftActor
+     *
+     * @param payload The payload in protocol buffer format
+     * @return
+     */
+    public abstract Payload decode(
+        AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
+
+
+
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/AppendEntriesMessages.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/messages/AppendEntriesMessages.java
new file mode 100644 (file)
index 0000000..00aeaf2
--- /dev/null
@@ -0,0 +1,2420 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: AppendEntriesMessages.proto
+
+package org.opendaylight.controller.cluster.raft.protobuff.messages;
+
+public final class AppendEntriesMessages {
+  private AppendEntriesMessages() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+  }
+  public interface AppendEntriesOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 term = 1;
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    boolean hasTerm();
+    /**
+     * <code>optional int64 term = 1;</code>
+     */
+    long getTerm();
+
+    // optional string leaderId = 2;
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    boolean hasLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    java.lang.String getLeaderId();
+    /**
+     * <code>optional string leaderId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getLeaderIdBytes();
+
+    // optional int64 prevLogIndex = 3;
+    /**
+     * <code>optional int64 prevLogIndex = 3;</code>
+     */
+    boolean hasPrevLogIndex();
+    /**
+     * <code>optional int64 prevLogIndex = 3;</code>
+     */
+    long getPrevLogIndex();
+
+    // optional int64 prevLogTerm = 4;
+    /**
+     * <code>optional int64 prevLogTerm = 4;</code>
+     */
+    boolean hasPrevLogTerm();
+    /**
+     * <code>optional int64 prevLogTerm = 4;</code>
+     */
+    long getPrevLogTerm();
+
+    // repeated .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry logEntries = 5;
+    /**
+     * <code>repeated .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry logEntries = 5;</code>
+     */
+    java.util.List<org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry> 
+        getLogEntriesList();
+    /**
+     * <code>repeated .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry logEntries = 5;</code>
+     */
+    org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry getLogEntries(int index);
+    /**
+     * <code>repeated .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry logEntries = 5;</code>
+     */
+    int getLogEntriesCount();
+    /**
+     * <code>repeated .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry logEntries = 5;</code>
+     */
+    java.util.List<? extends org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntryOrBuilder> 
+        getLogEntriesOrBuilderList();
+    /**
+     * <code>repeated .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry logEntries = 5;</code>
+     */
+    org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntryOrBuilder getLogEntriesOrBuilder(
+        int index);
+
+    // optional int64 leaderCommit = 6;
+    /**
+     * <code>optional int64 leaderCommit = 6;</code>
+     */
+    boolean hasLeaderCommit();
+    /**
+     * <code>optional int64 leaderCommit = 6;</code>
+     */
+    long getLeaderCommit();
+  }
+  /**
+   * Protobuf type {@code org.opendaylight.controller.cluster.raft.AppendEntries}
+   */
+  public static final class AppendEntries extends
+      com.google.protobuf.GeneratedMessage
+      implements AppendEntriesOrBuilder {
+    // Use AppendEntries.newBuilder() to construct.
+    private AppendEntries(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private AppendEntries(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final AppendEntries defaultInstance;
+    public static AppendEntries getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public AppendEntries getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private AppendEntries(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              term_ = input.readInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              leaderId_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              prevLogIndex_ = input.readInt64();
+              break;
+            }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              prevLogTerm_ = input.readInt64();
+              break;
+            }
+            case 42: {
+              if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+                logEntries_ = new java.util.ArrayList<org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry>();
+                mutable_bitField0_ |= 0x00000010;
+              }
+              logEntries_.add(input.readMessage(org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.PARSER, extensionRegistry));
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000010;
+              leaderCommit_ = input.readInt64();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+          logEntries_ = java.util.Collections.unmodifiableList(logEntries_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.class, org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<AppendEntries> PARSER =
+        new com.google.protobuf.AbstractParser<AppendEntries>() {
+      public AppendEntries parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new AppendEntries(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<AppendEntries> getParserForType() {
+      return PARSER;
+    }
+
+    public interface ReplicatedLogEntryOrBuilder
+        extends com.google.protobuf.MessageOrBuilder {
+
+      // optional int64 term = 1;
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      boolean hasTerm();
+      /**
+       * <code>optional int64 term = 1;</code>
+       */
+      long getTerm();
+
+      // optional int64 index = 2;
+      /**
+       * <code>optional int64 index = 2;</code>
+       */
+      boolean hasIndex();
+      /**
+       * <code>optional int64 index = 2;</code>
+       */
+      long getIndex();
+
+      // optional .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload data = 3;
+      /**
+       * <code>optional .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload data = 3;</code>
+       */
+      boolean hasData();
+      /**
+       * <code>optional .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload data = 3;</code>
+       */
+      org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload getData();
+      /**
+       * <code>optional .org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload data = 3;</code>
+       */
+      org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.PayloadOrBuilder getDataOrBuilder();
+    }
+    /**
+     * Protobuf type {@code org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry}
+     */
+    public static final class ReplicatedLogEntry extends
+        com.google.protobuf.GeneratedMessage
+        implements ReplicatedLogEntryOrBuilder {
+      // Use ReplicatedLogEntry.newBuilder() to construct.
+      private ReplicatedLogEntry(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+        super(builder);
+        this.unknownFields = builder.getUnknownFields();
+      }
+      private ReplicatedLogEntry(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+      private static final ReplicatedLogEntry defaultInstance;
+      public static ReplicatedLogEntry getDefaultInstance() {
+        return defaultInstance;
+      }
+
+      public ReplicatedLogEntry getDefaultInstanceForType() {
+        return defaultInstance;
+      }
+
+      private final com.google.protobuf.UnknownFieldSet unknownFields;
+      @java.lang.Override
+      public final com.google.protobuf.UnknownFieldSet
+          getUnknownFields() {
+        return this.unknownFields;
+      }
+      private ReplicatedLogEntry(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        initFields();
+        int mutable_bitField0_ = 0;
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+            com.google.protobuf.UnknownFieldSet.newBuilder();
+        try {
+          boolean done = false;
+          while (!done) {
+            int tag = input.readTag();
+            switch (tag) {
+              case 0:
+                done = true;
+                break;
+              default: {
+                if (!parseUnknownField(input, unknownFields,
+                                       extensionRegistry, tag)) {
+                  done = true;
+                }
+                break;
+              }
+              case 8: {
+                bitField0_ |= 0x00000001;
+                term_ = input.readInt64();
+                break;
+              }
+              case 16: {
+                bitField0_ |= 0x00000002;
+                index_ = input.readInt64();
+                break;
+              }
+              case 26: {
+                org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder subBuilder = null;
+                if (((bitField0_ & 0x00000004) == 0x00000004)) {
+                  subBuilder = data_.toBuilder();
+                }
+                data_ = input.readMessage(org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.PARSER, extensionRegistry);
+                if (subBuilder != null) {
+                  subBuilder.mergeFrom(data_);
+                  data_ = subBuilder.buildPartial();
+                }
+                bitField0_ |= 0x00000004;
+                break;
+              }
+            }
+          }
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          throw e.setUnfinishedMessage(this);
+        } catch (java.io.IOException e) {
+          throw new com.google.protobuf.InvalidProtocolBufferException(
+              e.getMessage()).setUnfinishedMessage(this);
+        } finally {
+          this.unknownFields = unknownFields.build();
+          makeExtensionsImmutable();
+        }
+      }
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.class, org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Builder.class);
+      }
+
+      public static com.google.protobuf.Parser<ReplicatedLogEntry> PARSER =
+          new com.google.protobuf.AbstractParser<ReplicatedLogEntry>() {
+        public ReplicatedLogEntry parsePartialFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          return new ReplicatedLogEntry(input, extensionRegistry);
+        }
+      };
+
+      @java.lang.Override
+      public com.google.protobuf.Parser<ReplicatedLogEntry> getParserForType() {
+        return PARSER;
+      }
+
+      public interface PayloadOrBuilder extends
+          com.google.protobuf.GeneratedMessage.
+              ExtendableMessageOrBuilder<Payload> {
+
+        // optional string clientPayloadClassName = 1;
+        /**
+         * <code>optional string clientPayloadClassName = 1;</code>
+         */
+        boolean hasClientPayloadClassName();
+        /**
+         * <code>optional string clientPayloadClassName = 1;</code>
+         */
+        java.lang.String getClientPayloadClassName();
+        /**
+         * <code>optional string clientPayloadClassName = 1;</code>
+         */
+        com.google.protobuf.ByteString
+            getClientPayloadClassNameBytes();
+      }
+      /**
+       * Protobuf type {@code org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload}
+       */
+      public static final class Payload extends
+          com.google.protobuf.GeneratedMessage.ExtendableMessage<
+            Payload> implements PayloadOrBuilder {
+        // Use Payload.newBuilder() to construct.
+        private Payload(com.google.protobuf.GeneratedMessage.ExtendableBuilder<org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload, ?> builder) {
+          super(builder);
+          this.unknownFields = builder.getUnknownFields();
+        }
+        private Payload(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+        private static final Payload defaultInstance;
+        public static Payload getDefaultInstance() {
+          return defaultInstance;
+        }
+
+        public Payload getDefaultInstanceForType() {
+          return defaultInstance;
+        }
+
+        private final com.google.protobuf.UnknownFieldSet unknownFields;
+        @java.lang.Override
+        public final com.google.protobuf.UnknownFieldSet
+            getUnknownFields() {
+          return this.unknownFields;
+        }
+        private Payload(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          initFields();
+          int mutable_bitField0_ = 0;
+          com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+              com.google.protobuf.UnknownFieldSet.newBuilder();
+          try {
+            boolean done = false;
+            while (!done) {
+              int tag = input.readTag();
+              switch (tag) {
+                case 0:
+                  done = true;
+                  break;
+                default: {
+                  if (!parseUnknownField(input, unknownFields,
+                                         extensionRegistry, tag)) {
+                    done = true;
+                  }
+                  break;
+                }
+                case 10: {
+                  bitField0_ |= 0x00000001;
+                  clientPayloadClassName_ = input.readBytes();
+                  break;
+                }
+              }
+            }
+          } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+            throw e.setUnfinishedMessage(this);
+          } catch (java.io.IOException e) {
+            throw new com.google.protobuf.InvalidProtocolBufferException(
+                e.getMessage()).setUnfinishedMessage(this);
+          } finally {
+            this.unknownFields = unknownFields.build();
+            makeExtensionsImmutable();
+          }
+        }
+        public static final com.google.protobuf.Descriptors.Descriptor
+            getDescriptor() {
+          return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_Payload_descriptor;
+        }
+
+        protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+            internalGetFieldAccessorTable() {
+          return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_Payload_fieldAccessorTable
+              .ensureFieldAccessorsInitialized(
+                  org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.class, org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder.class);
+        }
+
+        public static com.google.protobuf.Parser<Payload> PARSER =
+            new com.google.protobuf.AbstractParser<Payload>() {
+          public Payload parsePartialFrom(
+              com.google.protobuf.CodedInputStream input,
+              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+              throws com.google.protobuf.InvalidProtocolBufferException {
+            return new Payload(input, extensionRegistry);
+          }
+        };
+
+        @java.lang.Override
+        public com.google.protobuf.Parser<Payload> getParserForType() {
+          return PARSER;
+        }
+
+        private int bitField0_;
+        // optional string clientPayloadClassName = 1;
+        public static final int CLIENTPAYLOADCLASSNAME_FIELD_NUMBER = 1;
+        private java.lang.Object clientPayloadClassName_;
+        /**
+         * <code>optional string clientPayloadClassName = 1;</code>
+         */
+        public boolean hasClientPayloadClassName() {
+          return ((bitField0_ & 0x00000001) == 0x00000001);
+        }
+        /**
+         * <code>optional string clientPayloadClassName = 1;</code>
+         */
+        public java.lang.String getClientPayloadClassName() {
+          java.lang.Object ref = clientPayloadClassName_;
+          if (ref instanceof java.lang.String) {
+            return (java.lang.String) ref;
+          } else {
+            com.google.protobuf.ByteString bs = 
+                (com.google.protobuf.ByteString) ref;
+            java.lang.String s = bs.toStringUtf8();
+            if (bs.isValidUtf8()) {
+              clientPayloadClassName_ = s;
+            }
+            return s;
+          }
+        }
+        /**
+         * <code>optional string clientPayloadClassName = 1;</code>
+         */
+        public com.google.protobuf.ByteString
+            getClientPayloadClassNameBytes() {
+          java.lang.Object ref = clientPayloadClassName_;
+          if (ref instanceof java.lang.String) {
+            com.google.protobuf.ByteString b = 
+                com.google.protobuf.ByteString.copyFromUtf8(
+                    (java.lang.String) ref);
+            clientPayloadClassName_ = b;
+            return b;
+          } else {
+            return (com.google.protobuf.ByteString) ref;
+          }
+        }
+
+        private void initFields() {
+          clientPayloadClassName_ = "";
+        }
+        private byte memoizedIsInitialized = -1;
+        public final boolean isInitialized() {
+          byte isInitialized = memoizedIsInitialized;
+          if (isInitialized != -1) return isInitialized == 1;
+
+          if (!extensionsAreInitialized()) {
+            memoizedIsInitialized = 0;
+            return false;
+          }
+          memoizedIsInitialized = 1;
+          return true;
+        }
+
+        public void writeTo(com.google.protobuf.CodedOutputStream output)
+                            throws java.io.IOException {
+          getSerializedSize();
+          com.google.protobuf.GeneratedMessage
+            .ExtendableMessage<org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload>.ExtensionWriter extensionWriter =
+              newExtensionWriter();
+          if (((bitField0_ & 0x00000001) == 0x00000001)) {
+            output.writeBytes(1, getClientPayloadClassNameBytes());
+          }
+          extensionWriter.writeUntil(101, output);
+          getUnknownFields().writeTo(output);
+        }
+
+        private int memoizedSerializedSize = -1;
+        public int getSerializedSize() {
+          int size = memoizedSerializedSize;
+          if (size != -1) return size;
+
+          size = 0;
+          if (((bitField0_ & 0x00000001) == 0x00000001)) {
+            size += com.google.protobuf.CodedOutputStream
+              .computeBytesSize(1, getClientPayloadClassNameBytes());
+          }
+          size += extensionsSerializedSize();
+          size += getUnknownFields().getSerializedSize();
+          memoizedSerializedSize = size;
+          return size;
+        }
+
+        private static final long serialVersionUID = 0L;
+        @java.lang.Override
+        protected java.lang.Object writeReplace()
+            throws java.io.ObjectStreamException {
+          return super.writeReplace();
+        }
+
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(
+            com.google.protobuf.ByteString data)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          return PARSER.parseFrom(data);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(
+            com.google.protobuf.ByteString data,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          return PARSER.parseFrom(data, extensionRegistry);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(byte[] data)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          return PARSER.parseFrom(data);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(
+            byte[] data,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          return PARSER.parseFrom(data, extensionRegistry);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(java.io.InputStream input)
+            throws java.io.IOException {
+          return PARSER.parseFrom(input);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(
+            java.io.InputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+          return PARSER.parseFrom(input, extensionRegistry);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseDelimitedFrom(java.io.InputStream input)
+            throws java.io.IOException {
+          return PARSER.parseDelimitedFrom(input);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseDelimitedFrom(
+            java.io.InputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+          return PARSER.parseDelimitedFrom(input, extensionRegistry);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(
+            com.google.protobuf.CodedInputStream input)
+            throws java.io.IOException {
+          return PARSER.parseFrom(input);
+        }
+        public static org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parseFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+          return PARSER.parseFrom(input, extensionRegistry);
+        }
+
+        public static Builder newBuilder() { return Builder.create(); }
+        public Builder newBuilderForType() { return newBuilder(); }
+        public static Builder newBuilder(org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload prototype) {
+          return newBuilder().mergeFrom(prototype);
+        }
+        public Builder toBuilder() { return newBuilder(this); }
+
+        @java.lang.Override
+        protected Builder newBuilderForType(
+            com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+          Builder builder = new Builder(parent);
+          return builder;
+        }
+        /**
+         * Protobuf type {@code org.opendaylight.controller.cluster.raft.AppendEntries.ReplicatedLogEntry.Payload}
+         */
+        public static final class Builder extends
+            com.google.protobuf.GeneratedMessage.ExtendableBuilder<
+              org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload, Builder> implements org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.PayloadOrBuilder {
+          public static final com.google.protobuf.Descriptors.Descriptor
+              getDescriptor() {
+            return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_Payload_descriptor;
+          }
+
+          protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+              internalGetFieldAccessorTable() {
+            return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_Payload_fieldAccessorTable
+                .ensureFieldAccessorsInitialized(
+                    org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.class, org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.Builder.class);
+          }
+
+          // Construct using org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.newBuilder()
+          private Builder() {
+            maybeForceBuilderInitialization();
+          }
+
+          private Builder(
+              com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+            super(parent);
+            maybeForceBuilderInitialization();
+          }
+          private void maybeForceBuilderInitialization() {
+            if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+            }
+          }
+          private static Builder create() {
+            return new Builder();
+          }
+
+          public Builder clear() {
+            super.clear();
+            clientPayloadClassName_ = "";
+            bitField0_ = (bitField0_ & ~0x00000001);
+            return this;
+          }
+
+          public Builder clone() {
+            return create().mergeFrom(buildPartial());
+          }
+
+          public com.google.protobuf.Descriptors.Descriptor
+              getDescriptorForType() {
+            return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.internal_static_org_opendaylight_controller_cluster_raft_AppendEntries_ReplicatedLogEntry_Payload_descriptor;
+          }
+
+          public org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload getDefaultInstanceForType() {
+            return org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.getDefaultInstance();
+          }
+
+          public org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload build() {
+            org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload result = buildPartial();
+            if (!result.isInitialized()) {
+              throw newUninitializedMessageException(result);
+            }
+            return result;
+          }
+
+          public org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload buildPartial() {
+            org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload result = new org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload(this);
+            int from_bitField0_ = bitField0_;
+            int to_bitField0_ = 0;
+            if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+              to_bitField0_ |= 0x00000001;
+            }
+            result.clientPayloadClassName_ = clientPayloadClassName_;
+            result.bitField0_ = to_bitField0_;
+            onBuilt();
+            return result;
+          }
+
+          public Builder mergeFrom(com.google.protobuf.Message other) {
+            if (other instanceof org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload) {
+              return mergeFrom((org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload)other);
+            } else {
+              super.mergeFrom(other);
+              return this;
+            }
+          }
+
+          public Builder mergeFrom(org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload other) {
+            if (other == org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload.getDefaultInstance()) return this;
+            if (other.hasClientPayloadClassName()) {
+              bitField0_ |= 0x00000001;
+              clientPayloadClassName_ = other.clientPayloadClassName_;
+              onChanged();
+            }
+            this.mergeExtensionFields(other);
+            this.mergeUnknownFields(other.getUnknownFields());
+            return this;
+          }
+
+          public final boolean isInitialized() {
+            if (!extensionsAreInitialized()) {
+              
+              return false;
+            }
+            return true;
+          }
+
+          public Builder mergeFrom(
+              com.google.protobuf.CodedInputStream input,
+              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+              throws java.io.IOException {
+            org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload parsedMessage = null;
+            try {
+              parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+            } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+              parsedMessage = (org.opendaylight.controller.cluster.raft.protobuff.messages.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload) e.getUnfinishedMessage();
+              throw e;
+            } finally {
+              if (parsedMessage != null) {
+                mergeFrom(parsedMessage);
+              }
+            }
+            return this;
+          }
+          private int bitField0_;
+
+          // optional string clientPayloadClassName = 1;
+          private java.lang.Object clientPayloadClassName_ = "";
+          /**
+           * <code>optional string clientPayloadClassName = 1;</code>
+           */
+          public boolean hasClientPayloadClassName() {
+            return ((bitField0_ & 0x00000001) == 0x00000001);
+          }
+          /**
+           * <code>optional string clientPayloadClassName = 1;</code>
+           */
+          public java.lang.String getClientPayloadClassName() {
+            java.lang.Object ref = clientPayloadClassName_;
+            if (!(ref instanceof java.lang.String)) {
+              java.lang.String s = ((com.google.protobuf.ByteString) ref)
+                  .toStringUtf8();
+              clientPayloadClassName_ = s;
+              return s;
+            } else {
+              return (java.lang.String) ref;
+            }
+          }
+          /**
+           * <code>optional string clientPayloadClassName = 1;</code>
+           */
+          public com.google.protobuf.ByteString
+              getClientPayloadClassNameBytes() {
+            java.lang.Object ref = clientPayloadClassName_;
+            if (ref instanceof String) {
+              com.google.protobuf.ByteString b = 
+                  com.google.protobuf.Byt