public HwvtepTableReader getHwvtepTableReader() {
return hwvtepTableReader;
}
+
+ public void refreshOperNode() throws ExecutionException, InterruptedException {
+ TableUpdates tableUpdates = hwvtepTableReader.readAllTables();
+ callback.update(tableUpdates, getDatabaseSchema(HwvtepSchemaConstants.HARDWARE_VTEP));
+ }
}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
private final ReconciliationManager reconciliationManager;
private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
new ConcurrentHashMap<>();
+ private HwvtepOperGlobalListener hwvtepOperGlobalListener;
public HwvtepConnectionManager(DataBroker db, TransactionInvoker txInvoker,
EntityOwnershipService entityOwnershipService) {
this.entityOwnershipService = entityOwnershipService;
this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
this.reconciliationManager = new ReconciliationManager(db);
+ this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
}
@Override
// not be used as a candidate in Entity election (given that this instance is
// about to disconnect as well), if current owner get disconnected from
// HWVTEP device.
- unregisterEntityForOwnership(hwvtepConnectionInstance);
-
- //TODO: remove all the hwvtep nodes
- txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
+ if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
+ unregisterEntityForOwnership(hwvtepConnectionInstance);
+ txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
+ } else {
+ unregisterEntityForOwnership(hwvtepConnectionInstance);
+ //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
+ }
removeConnectionInstance(key);
private void registerEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
+ if (entityConnectionMap.get(candidateEntity) != null) {
+ disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
+ putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
+ }
entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
//If entity already has owner, it won't get notification from EntityOwnershipService
//so cache the connection instances.
- Optional<EntityOwnershipState> ownershipStateOpt =
- entityOwnershipService.getOwnershipState(candidateEntity);
- if (ownershipStateOpt.isPresent()) {
- EntityOwnershipState ownershipState = ownershipStateOpt.get();
- if (ownershipState.hasOwner() && !ownershipState.isOwner()) {
- if (getConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo()) != null) {
- LOG.info("OVSDB entity {} is already owned by other southbound plugin "
- + "instance, so *this* instance is NOT an OWNER of the device",
- hwvtepConnectionInstance.getConnectionInfo());
- putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(),hwvtepConnectionInstance);
- }
- }
- }
+ handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
} catch (CandidateAlreadyRegisteredException e) {
LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
}
}
+ private void handleOwnershipState(Entity candidateEntity, HwvtepConnectionInstance hwvtepConnectionInstance) {
+ //If entity already has owner, it won't get notification from EntityOwnershipService
+ //so cache the connection instances.
+ Optional<EntityOwnershipState> ownershipStateOpt =
+ entityOwnershipService.getOwnershipState(candidateEntity);
+ if (ownershipStateOpt.isPresent()) {
+ EntityOwnershipState ownershipState = ownershipStateOpt.get();
+ putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
+ if (ownershipState.hasOwner()) {
+ hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState.isOwner());
+ if (!ownershipState.isOwner()) {
+ LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
+ + "instance, so *this* instance is NOT an OWNER of the device",
+ hwvtepConnectionInstance.getConnectionInfo());
+ } else {
+ afterTakingOwnership(hwvtepConnectionInstance);
+ }
+ }
+ }
+ }
+
+ private void afterTakingOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
+ txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
+ putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
+ hwvtepConnectionInstance.setHasDeviceOwnership(true);
+ hwvtepConnectionInstance.registerCallbacks();
+ }
+
private Global getHwvtepGlobalTableEntry(HwvtepConnectionInstance connectionInstance) {
DatabaseSchema dbSchema = null;
Global globalRow = null;
@Override
public void onSuccess(@Nullable Optional<Node> node) {
if (node.isPresent()) {
+ HwvtepGlobalAugmentation augmentation = node.get()
+ .getAugmentation(HwvtepGlobalAugmentation.class);
+ if (augmentation == null || augmentation.getConnectionInfo() == null) {
+ return;
+ }
LOG.info("Disconnected/Failed connection {} was controller initiated, attempting " +
"reconnection", hwvtepNode.getConnectionInfo());
reconciliationManager.enqueue(task);
//*this* instance of southbound plugin is owner of the device,
//so register for monitor callbacks
- hwvtepConnectionInstance.registerCallbacks();
+ afterTakingOwnership(hwvtepConnectionInstance);
} else {
//You were owner of the device, but now you are not. With the current ownership
--- /dev/null
+/*
+ * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.ovsdb.hwvtepsouthbound;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HwvtepOperGlobalListener implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HwvtepOperGlobalListener.class);
+
+ private Timer timer = new Timer();
+ private ListenerRegistration<HwvtepOperGlobalListener> registration;
+ private HwvtepConnectionManager hcm;
+ private DataBroker db;
+ private Map<YangInstanceIdentifier, Node> connectedNodes = new ConcurrentHashMap<>();
+
+ HwvtepOperGlobalListener(DataBroker db, HwvtepConnectionManager hcm) {
+ LOG.info("Registering HwvtepOperGlobalListener");
+ this.db = db;
+ this.hcm = hcm;
+ registerListener(db);
+ }
+
+ private void registerListener(final DataBroker db) {
+ final DataTreeIdentifier<Node> treeId =
+ new DataTreeIdentifier<Node>(LogicalDatastoreType.OPERATIONAL, getWildcardPath());
+ try {
+ registration = db.registerDataTreeChangeListener(treeId, HwvtepOperGlobalListener.this);
+ } catch (final Exception e) {
+ LOG.error("HwvtepDataChangeListener registration failed", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if(registration != null) {
+ registration.close();
+ }
+ }
+
+ @Override
+ public void onDataTreeChanged(Collection<DataTreeModification<Node>> changes) {
+ changes.forEach( (change) -> {
+ InstanceIdentifier<Node> key = change.getRootPath().getRootIdentifier();
+ DataObjectModification<Node> mod = change.getRootNode();
+ InstanceIdentifier<Node> nodeIid = change.getRootPath().getRootIdentifier();
+ YangInstanceIdentifier entityId =
+ HwvtepSouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(nodeIid);
+ Node node = getCreated(mod);
+ if (node != null) {
+ connectedNodes.put(entityId, node);
+ }
+ node = getRemoved(mod);
+ if (node != null) {
+ connectedNodes.remove(entityId);
+ HwvtepConnectionInstance connectionInstance = hcm.getConnectionInstanceFromNodeIid(nodeIid);
+ if (Objects.equals(connectionInstance.getConnectionInfo().getRemotePort(),
+ HwvtepSouthboundUtil.getRemotePort(node))) {
+ //Oops some one deleted the node held by me This should never happen
+ try {
+ connectionInstance.refreshOperNode();
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Failed to refresh operational nodes ", e);
+ }
+ }
+
+ }
+ });
+ }
+
+ private Node getCreated(DataObjectModification<Node> mod) {
+ if((mod.getModificationType() == ModificationType.WRITE)
+ && (mod.getDataBefore() == null)){
+ return mod.getDataAfter();
+ }
+ return null;
+ }
+
+ private Node getRemoved(DataObjectModification<Node> mod) {
+ if(mod.getModificationType() == ModificationType.DELETE){
+ return mod.getDataBefore();
+ }
+ return null;
+ }
+
+ public Map<YangInstanceIdentifier, Node> getConnectedNodes() {
+ return Collections.unmodifiableMap(connectedNodes);
+ }
+
+ private InstanceIdentifier<Node> getWildcardPath() {
+ InstanceIdentifier<Node> path = InstanceIdentifier
+ .create(NetworkTopology.class)
+ .child(Topology.class, new TopologyKey(HwvtepSouthboundConstants.HWVTEP_TOPOLOGY_ID))
+ .child(Node.class);
+ return path;
+ }
+}
}
return physicalNodeIid.firstIdentifierOf(Topology.class).child(Node.class , new NodeKey(new NodeId(nodeId)));
}
+
+ public static Integer getRemotePort(Node node) {
+ HwvtepGlobalAugmentation augmentation = node.getAugmentation(HwvtepGlobalAugmentation.class);
+ if (augmentation != null && augmentation.getConnectionInfo() != null) {
+ return augmentation.getConnectionInfo().getRemotePort().getValue();
+ }
+ return 0;
+ }
}
import com.google.common.collect.Lists;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.opendaylight.ovsdb.lib.message.TableUpdate;
+import org.opendaylight.ovsdb.lib.message.TableUpdates;
import org.opendaylight.ovsdb.lib.notation.Condition;
import org.opendaylight.ovsdb.lib.notation.Row;
import org.opendaylight.ovsdb.lib.notation.UUID;
import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
+import org.opendaylight.ovsdb.schema.hardwarevtep.ACL;
+import org.opendaylight.ovsdb.schema.hardwarevtep.ACLEntry;
+import org.opendaylight.ovsdb.schema.hardwarevtep.ArpSourcesLocal;
+import org.opendaylight.ovsdb.schema.hardwarevtep.ArpSourcesRemote;
+import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
+import org.opendaylight.ovsdb.schema.hardwarevtep.LogicalRouter;
import org.opendaylight.ovsdb.schema.hardwarevtep.LogicalSwitch;
+import org.opendaylight.ovsdb.schema.hardwarevtep.Manager;
+import org.opendaylight.ovsdb.schema.hardwarevtep.McastMacsLocal;
import org.opendaylight.ovsdb.schema.hardwarevtep.McastMacsRemote;
import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalLocator;
+import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalLocatorSet;
+import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalPort;
+import org.opendaylight.ovsdb.schema.hardwarevtep.PhysicalSwitch;
+import org.opendaylight.ovsdb.schema.hardwarevtep.Tunnel;
+import org.opendaylight.ovsdb.schema.hardwarevtep.UcastMacsLocal;
import org.opendaylight.ovsdb.schema.hardwarevtep.UcastMacsRemote;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.LogicalSwitches;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.RemoteMcastMacs;
private static final Logger LOG = LoggerFactory.getLogger(HwvtepTableReader.class);
+ private final Class alltables[] = new Class[] {
+ ACLEntry.class,
+ ACL.class,
+ ArpSourcesLocal.class,
+ Global.class,
+ ArpSourcesRemote.class,
+ LogicalRouter.class,
+ Manager.class,
+ LogicalSwitch.class,
+ McastMacsLocal.class,
+ PhysicalLocator.class,
+ McastMacsRemote.class,
+ PhysicalPort.class,
+ Tunnel.class,
+ PhysicalLocatorSet.class,
+ PhysicalSwitch.class,
+ UcastMacsLocal.class,
+ UcastMacsRemote.class
+ };
+
private final Map<Class, Function<InstanceIdentifier, List<Condition>>> whereClauseGetterMap = new HashMap();
private final Map<Class, Class> tableMap = new HashMap();
private final Map<Class, TypedBaseTable> tables = new HashMap<>();
}
return Collections.emptyList();
}
+
+ public TableUpdates readAllTables() throws ExecutionException, InterruptedException {
+ Map<String, TableUpdate> tableUpdates = new HashMap<>();
+ DatabaseSchema dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
+
+ List<Operation> operations = Arrays.asList(alltables).stream()
+ .map(tableClass -> TyperUtils.getTableSchema(dbSchema, tableClass))
+ .map(tableSchema -> buildSelectOperationFor(tableSchema))
+ .collect(Collectors.toList());
+
+ List<OperationResult> results = connectionInstance.transact(dbSchema, operations).get();
+ if (results != null && !results.isEmpty()) {
+ results.stream()
+ .filter(result -> result.getRows() != null)
+ .flatMap(result -> result.getRows().stream())
+ .forEach(row -> {
+ tableUpdates.compute(row.getTableSchema().getName(), (tableName, tableUpdate) -> {
+ if (tableUpdate == null) {
+ tableUpdate = new TableUpdate();
+ }
+ tableUpdate.addRow(getRowUuid(row), null, row);
+ return tableUpdate;
+ });
+ });
+ }
+ return new TableUpdates(tableUpdates);
+ }
+
+ private Select<GenericTableSchema> buildSelectOperationFor(GenericTableSchema tableSchema) {
+ Select<GenericTableSchema> selectOpearation = op.select(tableSchema);
+ selectOpearation.setColumns(new ArrayList<>(tableSchema.getColumns()));
+ return selectOpearation;
+ }
+
+ private UUID getRowUuid(Row<GenericTableSchema> row) {
+ return row.getColumns().stream()
+ .filter(column -> column.getSchema().getName().equals("_uuid"))
+ .map(column -> (UUID) column.getData())
+ .findFirst().orElse(new UUID("test"));
+ }
}
\ No newline at end of file
if (dbVersion.compareTo(minVersion) >= 0) {
if (lSwitch.getReplicationModeColumn().getData() != null && !lSwitch.getReplicationModeColumn().getData().isEmpty()) {
lsBuilder.setReplicationMode(lSwitch.getReplicationModeColumn().getData().iterator().next());
+ LOG.debug("setReplicationMode to: {}",
+ lSwitch.getReplicationModeColumn().getData().iterator().next());
}
- LOG.debug("setReplicationMode to: {}", lSwitch.getReplicationModeColumn().getData().iterator().next());
}
}
HwvtepNodeName hwvtepName = new HwvtepNodeName(lSwitch.getName());