2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.southbound;
10 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
12 import java.net.InetAddress;
13 import java.net.UnknownHostException;
14 import java.util.ArrayList;
15 import java.util.List;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
20 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
26 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.ovsdb.lib.OvsdbClient;
35 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
36 import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
37 import org.opendaylight.ovsdb.lib.operations.Operation;
38 import org.opendaylight.ovsdb.lib.operations.OperationResult;
39 import org.opendaylight.ovsdb.lib.operations.Select;
40 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
41 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
42 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
43 import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
44 import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
45 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 import com.google.common.base.Optional;
57 import com.google.common.base.Preconditions;
58 import com.google.common.util.concurrent.CheckedFuture;
60 public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable {
61 private Map<ConnectionInfo, OvsdbConnectionInstance> clients =
62 new ConcurrentHashMap<ConnectionInfo,OvsdbConnectionInstance>();
63 private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
64 private static final String ENTITY_TYPE = "ovsdb";
66 private DataBroker db;
67 private TransactionInvoker txInvoker;
68 private Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers =
69 new ConcurrentHashMap<ConnectionInfo,InstanceIdentifier<Node>>();
70 private Map<Entity, OvsdbConnectionInstance> entityConnectionMap =
71 new ConcurrentHashMap<>();
72 private EntityOwnershipService entityOwnershipService;
73 private OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener;
75 public OvsdbConnectionManager(DataBroker db,TransactionInvoker txInvoker,
76 EntityOwnershipService entityOwnershipService) {
78 this.txInvoker = txInvoker;
79 this.entityOwnershipService = entityOwnershipService;
80 this.ovsdbDeviceEntityOwnershipListener = new OvsdbDeviceEntityOwnershipListener(this, entityOwnershipService);
84 public void connected(@Nonnull final OvsdbClient externalClient) {
86 OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient);
88 // Register Cluster Ownership for ConnectionInfo
89 registerEntityForOwnership(client);
92 public OvsdbConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
93 LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
94 externalClient.getConnectionInfo().getRemotePort());
95 ConnectionInfo key = SouthboundMapper.createConnectionInfo(externalClient);
96 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
98 // Check if existing ovsdbConnectionInstance for the OvsdbClient present.
99 // In such cases, we will see if the ovsdbConnectionInstance has same externalClient.
100 if (ovsdbConnectionInstance != null) {
101 if (ovsdbConnectionInstance.hasOvsdbClient(externalClient)) {
102 LOG.warn("OVSDB Connection Instance {} already exists for client {}", key, externalClient);
103 return ovsdbConnectionInstance;
105 LOG.warn("OVSDB Connection Instance {} being replaced with client {}", key, externalClient);
106 ovsdbConnectionInstance.disconnect();
108 // Unregister Cluster Ownership for ConnectionInfo
109 // Because the ovsdbConnectionInstance is about to be completely replaced!
110 unregisterEntityForOwnership(ovsdbConnectionInstance);
112 removeConnectionInstance(key);
115 ovsdbConnectionInstance = new OvsdbConnectionInstance(key, externalClient, txInvoker,
116 getInstanceIdentifier(key));
117 ovsdbConnectionInstance.createTransactInvokers();
118 return ovsdbConnectionInstance;
122 public void disconnected(OvsdbClient client) {
123 LOG.info("OVSDB Disconnected from {}:{}. Cleaning up the operational data store"
124 ,client.getConnectionInfo().getRemoteAddress(),
125 client.getConnectionInfo().getRemotePort());
126 ConnectionInfo key = SouthboundMapper.createConnectionInfo(client);
127 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
128 if (ovsdbConnectionInstance != null) {
129 txInvoker.invoke(new OvsdbNodeRemoveCommand(ovsdbConnectionInstance, null, null));
130 removeConnectionInstance(key);
132 // Unregister Cluster Onwership for ConnectionInfo
133 unregisterEntityForOwnership(ovsdbConnectionInstance);
135 LOG.warn("OVSDB disconnected event did not find connection instance for {}", key);
137 LOG.trace("OvsdbConnectionManager: disconnected exit");
140 public OvsdbClient connect(InstanceIdentifier<Node> iid,
141 OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException {
142 // TODO handle case where we already have a connection
143 // TODO use transaction chains to handle ordering issues between disconnected
144 // TODO and connected when writing to the operational store
145 InetAddress ip = SouthboundMapper.createInetAddress(ovsdbNode.getConnectionInfo().getRemoteIp());
146 OvsdbClient client = OvsdbConnectionService.getService().connect(ip,
147 ovsdbNode.getConnectionInfo().getRemotePort().getValue());
148 // For connections from the controller to the ovs instance, the library doesn't call
149 // this method for us
150 if (client != null) {
151 putInstanceIdentifier(ovsdbNode.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
152 OvsdbConnectionInstance ovsdbConnectionInstance = connectedButCallBacksNotRegistered(client);
154 // Register Cluster Ownership for ConnectionInfo
155 registerEntityForOwnership(ovsdbConnectionInstance);
157 LOG.warn("Failed to connect to Ovsdb Node {}", ovsdbNode.getConnectionInfo());
162 public void disconnect(OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException {
163 OvsdbConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
164 if (client != null) {
167 // Unregister Cluster Onwership for ConnectionInfo
168 unregisterEntityForOwnership(client);
170 removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
174 /* public void init(ConnectionInfo key) {
175 OvsdbConnectionInstance client = getConnectionInstance(key);
177 // TODO (FF): make sure that this cluster instance is the 'entity owner' fo the given OvsdbConnectionInstance ?
179 if (client != null) {
181 * Note: registerCallbacks() is idemPotent... so if you call it repeatedly all is safe,
182 * it only registersCallbacks on the *first* call.
184 client.registerCallbacks();
189 public void close() throws Exception {
190 if (ovsdbDeviceEntityOwnershipListener != null) {
191 ovsdbDeviceEntityOwnershipListener.close();
194 for (OvsdbClient client: clients.values()) {
199 private void putConnectionInstance(ConnectionInfo key,OvsdbConnectionInstance instance) {
200 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
201 clients.put(connectionInfo, instance);
204 private void removeConnectionInstance(ConnectionInfo key) {
205 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
206 clients.remove(connectionInfo);
209 private void putInstanceIdentifier(ConnectionInfo key,InstanceIdentifier<Node> iid) {
210 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
211 instanceIdentifiers.put(connectionInfo, iid);
214 private void removeInstanceIdentifier(ConnectionInfo key) {
215 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
216 instanceIdentifiers.remove(connectionInfo);
219 public OvsdbConnectionInstance getConnectionInstance(ConnectionInfo key) {
220 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
221 return clients.get(connectionInfo);
224 public InstanceIdentifier<Node> getInstanceIdentifier(ConnectionInfo key) {
225 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
226 InstanceIdentifier<Node> iid = instanceIdentifiers.get(connectionInfo);
230 public OvsdbConnectionInstance getConnectionInstance(OvsdbBridgeAttributes mn) {
231 Optional<OvsdbNodeAugmentation> optional = SouthboundUtil.getManagingNode(db, mn);
232 if (optional.isPresent()) {
233 return getConnectionInstance(optional.get().getConnectionInfo());
239 public OvsdbConnectionInstance getConnectionInstance(Node node) {
240 Preconditions.checkNotNull(node);
241 OvsdbNodeAugmentation ovsdbNode = node.getAugmentation(OvsdbNodeAugmentation.class);
242 OvsdbBridgeAugmentation ovsdbManagedNode = node.getAugmentation(OvsdbBridgeAugmentation.class);
243 if (ovsdbNode != null) {
244 return getConnectionInstance(ovsdbNode.getConnectionInfo());
245 } else if (ovsdbManagedNode != null) {
246 return getConnectionInstance(ovsdbManagedNode);
248 LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
253 public OvsdbConnectionInstance getConnectionInstance(InstanceIdentifier<Node> nodePath) {
255 ReadOnlyTransaction transaction = db.newReadOnlyTransaction();
256 CheckedFuture<Optional<Node>, ReadFailedException> nodeFuture = transaction.read(
257 LogicalDatastoreType.OPERATIONAL, nodePath);
259 Optional<Node> optional = nodeFuture.get();
260 if (optional != null && optional.isPresent() && optional.get() instanceof Node) {
261 return this.getConnectionInstance(optional.get());
263 LOG.warn("Found non-topological node {} on path {}",optional);
266 } catch (Exception e) {
267 LOG.warn("Failed to get Ovsdb Node {}",nodePath, e);
272 public OvsdbClient getClient(ConnectionInfo connectionInfo) {
273 return getConnectionInstance(connectionInfo);
276 public OvsdbClient getClient(OvsdbBridgeAttributes mn) {
277 return getConnectionInstance(mn);
280 public OvsdbClient getClient(Node node) {
281 return getConnectionInstance(node);
284 public Boolean getHasDeviceOwnership(ConnectionInfo connectionInfo) {
285 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo);
286 if (ovsdbConnectionInstance == null) {
287 return Boolean.FALSE;
289 return ovsdbConnectionInstance.getHasDeviceOwnership();
292 public void setHasDeviceOwnership(ConnectionInfo connectionInfo, Boolean hasDeviceOwnership) {
293 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo);
294 if (ovsdbConnectionInstance != null) {
295 ovsdbConnectionInstance.setHasDeviceOwnership(hasDeviceOwnership);
299 private void handleOwnershipChanged(EntityOwnershipChange ownershipChange) {
300 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
301 LOG.info("handleOwnershipChanged: {} event received for device {}",
302 ownershipChange, ovsdbConnectionInstance != null ? ovsdbConnectionInstance.getConnectionInfo()
303 : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
305 if (ovsdbConnectionInstance == null) {
306 if (ownershipChange.isOwner()) {
307 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
309 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
310 // that instance registered for the device ownership or not. It is to make sure that
311 // If all the controller instance that was connected to the device are down, so the
312 // running instance can clear up the operational data store even though it was not
313 // connected to the device.
314 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
317 // If entity has no owner, clean up the operational data store (it's possible because owner controller
318 // might went down abruptly and didn't get a chance to clean up the operational data store.
319 if (!ownershipChange.hasOwner()) {
320 LOG.debug("{} has no onwer, cleaning up the operational data store", ownershipChange.getEntity());
321 // Below code might look weird but it's required. We want to give first opportunity to the
322 // previous owner of the device to clean up the operational data store if there is no owner now.
323 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
324 if (ownershipChange.wasOwner()) {
325 cleanEntityOperationalData(ownershipChange.getEntity());
327 // If first cleanEntityOperationalData() was called, this call will be no-op.
328 cleanEntityOperationalData(ownershipChange.getEntity());
332 //Connection detail need to be cached, irrespective of ownership result.
333 putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(),ovsdbConnectionInstance);
335 if (ownershipChange.isOwner() == ovsdbConnectionInstance.getHasDeviceOwnership()) {
336 LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
337 ovsdbConnectionInstance.getConnectionInfo(), ovsdbConnectionInstance.getHasDeviceOwnership());
341 ovsdbConnectionInstance.setHasDeviceOwnership(ownershipChange.isOwner());
342 // You were not an owner, but now you are
343 if (ownershipChange.isOwner()) {
344 LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
345 ovsdbConnectionInstance.getConnectionInfo());
347 //*this* instance of southbound plugin is owner of the device,
348 //so register for monitor callbacks
349 ovsdbConnectionInstance.registerCallbacks();
352 //You were owner of the device, but now you are not. With the current ownership
353 //grant mechanism, this scenario should not occur. Because this scenario will occur
354 //when this controller went down or switch flap the connection, but in both the case
355 //it will go through the re-registration process. We need to implement this condition
356 //when clustering service implement a ownership grant strategy which can revoke the
357 //device ownership for load balancing the devices across the instances.
358 //Once this condition occur, we should unregister the callback.
359 LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
360 ovsdbConnectionInstance.getNodeId().getValue());
364 private void cleanEntityOperationalData(Entity entity) {
366 InstanceIdentifier<Node> nodeIid = (InstanceIdentifier<Node>) SouthboundUtil
367 .getInstanceIdentifierCodec().bindingDeserializer(entity.getId());
369 final ReadWriteTransaction transaction = db.newReadWriteTransaction();
370 Optional<Node> node = SouthboundUtil.readNode(transaction, nodeIid);
371 if (node.isPresent()) {
372 SouthboundUtil.deleteNode(transaction, nodeIid);
376 private OpenVSwitch getOpenVswitchTableEntry(OvsdbConnectionInstance connectionInstance) {
377 DatabaseSchema dbSchema = null;
378 OpenVSwitch openVSwitchRow = null;
380 dbSchema = connectionInstance.getSchema(OvsdbSchemaContants.databaseName).get();
381 } catch (InterruptedException | ExecutionException e) {
382 LOG.warn("Not able to fetch schema for database {} from device {}",
383 OvsdbSchemaContants.databaseName,connectionInstance.getConnectionInfo(),e);
385 if (dbSchema != null) {
386 GenericTableSchema openVSwitchSchema = TyperUtils.getTableSchema(dbSchema, OpenVSwitch.class);
388 List<String> openVSwitchTableColumn = new ArrayList<String>();
389 openVSwitchTableColumn.addAll(openVSwitchSchema.getColumns());
390 Select<GenericTableSchema> selectOperation = op.select(openVSwitchSchema);
391 selectOperation.setColumns(openVSwitchTableColumn);;
393 ArrayList<Operation> operations = new ArrayList<Operation>();
394 operations.add(selectOperation);
395 operations.add(op.comment("Fetching Open_VSwitch table rows"));
396 List<OperationResult> results = null;
398 results = connectionInstance.transact(dbSchema, operations).get();
399 if (results != null ) {
400 OperationResult selectResult = results.get(0);
401 openVSwitchRow = TyperUtils.getTypedRowWrapper(
402 dbSchema,OpenVSwitch.class,selectResult.getRows().get(0));
405 } catch (InterruptedException | ExecutionException e) {
406 LOG.warn("Not able to fetch OpenVswitch table row from device {}",
407 connectionInstance.getConnectionInfo(),e);
410 return openVSwitchRow;
412 private Entity getEntityFromConnectionInstance(@Nonnull OvsdbConnectionInstance ovsdbConnectionInstance) {
413 YangInstanceIdentifier entityId = null;
414 InstanceIdentifier<Node> iid = ovsdbConnectionInstance.getInstanceIdentifier();;
416 /* Switch initiated connection won't have iid, till it gets OpenVSwitch
417 * table update but update callback is always registered after ownership
418 * is granted. So we are explicitly fetch the row here to get the iid.
420 OpenVSwitch openvswitchRow = getOpenVswitchTableEntry(ovsdbConnectionInstance);
421 iid = SouthboundMapper.getInstanceIdentifier(openvswitchRow);
422 LOG.info("InstanceIdentifier {} generated for device "
423 + "connection {}",iid,ovsdbConnectionInstance.getConnectionInfo());
426 entityId = SouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(iid);
427 Entity deviceEntity = new Entity(ENTITY_TYPE, entityId);
428 LOG.debug("Entity {} created for device connection {}",
429 deviceEntity, ovsdbConnectionInstance.getConnectionInfo());
433 private OvsdbConnectionInstance getConnectionInstanceFromEntity(Entity entity) {
434 return entityConnectionMap.get(entity);
437 private void registerEntityForOwnership(OvsdbConnectionInstance ovsdbConnectionInstance) {
439 Entity candidateEntity = getEntityFromConnectionInstance(ovsdbConnectionInstance);
440 entityConnectionMap.put(candidateEntity, ovsdbConnectionInstance);
441 ovsdbConnectionInstance.setConnectedEntity(candidateEntity);
443 EntityOwnershipCandidateRegistration registration =
444 entityOwnershipService.registerCandidate(candidateEntity);
445 ovsdbConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
446 LOG.info("OVSDB entity {} is registred for ownership.", candidateEntity);
447 } catch (CandidateAlreadyRegisteredException e) {
448 LOG.warn("OVSDB entity {} was already registered for {} ownership", candidateEntity, e);
453 private void unregisterEntityForOwnership(OvsdbConnectionInstance ovsdbConnectionInstance) {
454 ovsdbConnectionInstance.closeDeviceOwnershipCandidateRegistration();
455 entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity());
458 private class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener {
459 private OvsdbConnectionManager cm;
460 private EntityOwnershipListenerRegistration listenerRegistration;
462 OvsdbDeviceEntityOwnershipListener(OvsdbConnectionManager cm, EntityOwnershipService entityOwnershipService) {
464 listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
466 public void close() {
467 listenerRegistration.close();
470 public void ownershipChanged(EntityOwnershipChange ownershipChange) {
471 cm.handleOwnershipChanged(ownershipChange);