private final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>();
private final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>();
private final OvsdbConnection ovsdbConnectionService;
+ private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
public HwvtepConnectionManager(final DataBroker db, final TransactionInvoker txInvoker,
final EntityOwnershipService entityOwnershipService, final OvsdbConnection ovsdbConnectionService) {
@Override
public void connected(final OvsdbClient externalClient) {
+ if (alreadyProcessedClients.containsKey(externalClient)) {
+ LOG.info("Hwvtep Library already connected {} from {}:{} to {}:{} to this, hence skipping the processing",
+ externalClient.getConnectionInfo().getType(),
+ externalClient.getConnectionInfo().getRemoteAddress(),
+ externalClient.getConnectionInfo().getRemotePort(),
+ externalClient.getConnectionInfo().getLocalAddress(),
+ externalClient.getConnectionInfo().getLocalPort());
+ return;
+ }
+ alreadyProcessedClients.put(externalClient, externalClient);
HwvtepConnectionInstance hwClient = null;
try {
List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void disconnected(final OvsdbClient client) {
+ alreadyProcessedClients.remove(client);
HwvtepConnectionInstance hwvtepConnectionInstance = null;
try {
LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
public interface HwvtepSouthboundConstants {
TopologyId HWVTEP_TOPOLOGY_ID = new TopologyId(new Uri("hwvtep:1"));
Integer DEFAULT_OVSDB_PORT = 6640;
+ long PORT_OPEN_MAX_DELAY_IN_MINS = 5;
String IID_OTHER_CONFIG_KEY = "opendaylight-iid";
String UUID = "uuid";
ImmutableBiMap<Class<? extends EncapsulationTypeBase>,String> ENCAPS_TYPE_MAP
LOG.trace("Registering listener for path {}", treeId);
operTopologyRegistration = dataBroker.registerDataTreeChangeListener(treeId, this);
+ Scheduler.getScheduledExecutorService().schedule(() -> {
+ if (!registered.get()) {
+ openOvsdbPort();
+ LOG.error("Timed out to get eos notification opening the port now");
+ }
+ }, HwvtepSouthboundConstants.PORT_OPEN_MAX_DELAY_IN_MINS, TimeUnit.MINUTES);
}
private void registerConfigListenerPostUpgrade() {
@Override
public void onDataTreeChanged(final Collection<DataTreeModification<Topology>> collection) {
- if (!registered.getAndSet(true)) {
- LOG.info("Starting the ovsdb port");
- ovsdbConnection.registerConnectionListener(cm);
- ovsdbConnection.startOvsdbManager();
- }
+ openOvsdbPort();
if (operTopologyRegistration != null) {
operTopologyRegistration.close();
}
}
+ private void openOvsdbPort() {
+ if (!registered.getAndSet(true)) {
+ LOG.info("Starting the ovsdb port");
+ ovsdbConnection.registerConnectionListener(cm);
+ ovsdbConnection.startOvsdbManager();
+ }
+ }
+
public void setShardStatusCheckRetryCount(int retryCount) {
this.shardStatusCheckRetryCount = retryCount;
}
@Override
public void registerConnectionListener(final OvsdbConnectionListener listener) {
LOG.info("registerConnectionListener: registering {}", listener.getClass().getSimpleName());
- CONNECTION_LISTENERS.add(listener);
- notifyAlreadyExistingConnectionsToListener(listener);
+ if (CONNECTION_LISTENERS.add(listener)) {
+ LOG.info("registerConnectionListener: registered {} notifying exisitng connections",
+ listener.getClass().getSimpleName());
+ //notify only the first time if called multiple times
+ notifyAlreadyExistingConnectionsToListener(listener);
+ }
}
private void notifyAlreadyExistingConnectionsToListener(final OvsdbConnectionListener listener) {
private final DataBroker db;
private final TransactionInvoker txInvoker;
+ private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers =
new ConcurrentHashMap<>();
private final Map<InstanceIdentifier<Node>, OvsdbConnectionInstance> nodeIdVsConnectionInstance =
@Override
public void connected(final OvsdbClient externalClient) {
+ if (alreadyProcessedClients.containsKey(externalClient)) {
+ LOG.info("OvsdbConnectionManager Library already connected {} from {}:{} to {}:{} "
+ + "to this, hence skipping the processing",
+ externalClient.getConnectionInfo().getType(),
+ externalClient.getConnectionInfo().getRemoteAddress(),
+ externalClient.getConnectionInfo().getRemotePort(),
+ externalClient.getConnectionInfo().getLocalAddress(),
+ externalClient.getConnectionInfo().getLocalPort());
+ return;
+ }
+ alreadyProcessedClients.put(externalClient, externalClient);
+
LOG.info("Library connected {} from {}:{} to {}:{}",
externalClient.getConnectionInfo().getType(),
externalClient.getConnectionInfo().getRemoteAddress(),
@Override
public void disconnected(final OvsdbClient client) {
+ alreadyProcessedClients.remove(client);
LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
client.getConnectionInfo().getType(),
client.getConnectionInfo().getRemoteAddress(),
field(OvsdbConnectionManager.class, "entityOwnershipService").set(ovsdbConnManager, entityOwnershipService);
field(OvsdbConnectionManager.class, "reconciliationManager").set(ovsdbConnManager, reconciliationManager);
field(OvsdbConnectionManager.class, "ovsdbConnection").set(ovsdbConnManager, ovsdbConnection);
+ field(OvsdbConnectionManager.class, "alreadyProcessedClients").set(ovsdbConnManager, new HashMap());
entityConnectionMap = new ConcurrentHashMap<>();
OvsdbConnectionInfo info = mock(OvsdbConnectionInfo.class);