/* * Copyright (c) 2015, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.ovsdb.hwvtepsouthbound; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.TransactCommand; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.TransactInvoker; import org.opendaylight.ovsdb.hwvtepsouthbound.transact.TransactInvokerImpl; import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker; import org.opendaylight.ovsdb.lib.LockAquisitionCallback; import org.opendaylight.ovsdb.lib.LockStolenCallback; import org.opendaylight.ovsdb.lib.MonitorCallBack; import org.opendaylight.ovsdb.lib.MonitorHandle; import org.opendaylight.ovsdb.lib.OvsdbClient; import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo; import org.opendaylight.ovsdb.lib.message.MonitorRequest; import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder; import org.opendaylight.ovsdb.lib.message.MonitorSelect; import org.opendaylight.ovsdb.lib.message.TableUpdates; import org.opendaylight.ovsdb.lib.notation.Row; import org.opendaylight.ovsdb.lib.operations.Operation; import org.opendaylight.ovsdb.lib.operations.OperationResult; import org.opendaylight.ovsdb.lib.operations.TransactionBuilder; import org.opendaylight.ovsdb.lib.schema.DatabaseSchema; import org.opendaylight.ovsdb.lib.schema.GenericTableSchema; import org.opendaylight.ovsdb.lib.schema.TableSchema; import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable; import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HwvtepConnectionInstance { private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionInstance.class); private ConnectionInfo connectionInfo; private final OvsdbClient client; private final HwvtepTableReader hwvtepTableReader; private InstanceIdentifier instanceIdentifier; private final TransactionInvoker txInvoker; private Map transactInvokers; private MonitorCallBack callback; private volatile boolean hasDeviceOwnership = false; private Entity connectedEntity; private EntityOwnershipCandidateRegistration deviceOwnershipCandidateRegistration; private HwvtepGlobalAugmentation initialCreatedData = null; private final HwvtepDeviceInfo deviceInfo; private final DataBroker dataBroker; private final HwvtepConnectionManager hwvtepConnectionManager; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final SettableFuture reconciliationFt = SettableFuture.create(); private final AtomicBoolean firstUpdateTriggered = new AtomicBoolean(false); private TransactionHistory controllerTxHistory; private TransactionHistory deviceUpdateHistory; HwvtepConnectionInstance(HwvtepConnectionManager hwvtepConnectionManager, ConnectionInfo key, OvsdbClient client, InstanceIdentifier iid, TransactionInvoker txInvoker, DataBroker dataBroker) { this.hwvtepConnectionManager = hwvtepConnectionManager; this.connectionInfo = key; this.client = client; this.instanceIdentifier = iid; this.txInvoker = txInvoker; this.deviceInfo = new HwvtepDeviceInfo(this); this.dataBroker = dataBroker; this.hwvtepTableReader = new HwvtepTableReader(this); } public void transact(final TransactCommand command) { String nodeId = getNodeId().getValue(); boolean firstUpdate = firstUpdateTriggered.compareAndSet(false, true); if (reconciliationFt.isDone()) { transact(command, false); } else { LOG.info("Job waiting for reconciliation {}", nodeId); Futures.addCallback(reconciliationFt, new FutureCallback() { @Override public void onSuccess(Boolean notUsed) { LOG.info("Running the job waiting for reconciliation {}", nodeId); transact(command, false); } @Override public void onFailure(Throwable throwable) { LOG.info("Running the job waiting for reconciliation {}", nodeId); transact(command, false); } }, MoreExecutors.directExecutor()); if (firstUpdate) { LOG.info("Scheduling the reconciliation timeout task {}", nodeId); scheduledExecutorService.schedule(() -> reconciliationFt.set(Boolean.TRUE), HwvtepSouthboundConstants.CONFIG_NODE_UPDATE_MAX_DELAY_MS, TimeUnit.MILLISECONDS); } } } public synchronized void transact(TransactCommand command, boolean reconcile) { try { for (TransactInvoker transactInvoker : transactInvokers.values()) { transactInvoker.invoke(command); } } finally { if (reconcile) { reconciliationFt.set(Boolean.TRUE); } } } public ListenableFuture> transact(DatabaseSchema dbSchema, List operations) { return client.transact(dbSchema, operations); } public void registerCallbacks() { if (this.callback == null) { if (this.initialCreatedData != null) { this.updateConnectionAttributes(); } try { String database = HwvtepSchemaConstants.HARDWARE_VTEP; DatabaseSchema dbSchema = getSchema(database).get(); if (dbSchema != null) { LOG.info("Monitoring database: {}", database); callback = new HwvtepMonitorCallback(this, txInvoker); monitorAllTables(database, dbSchema); } else { LOG.info("No database {} found on {}", database, connectionInfo); } } catch (InterruptedException | ExecutionException e) { LOG.warn("Exception attempting to registerCallbacks {}: ", connectionInfo, e); } } } public void createTransactInvokers() { if (transactInvokers == null) { try { transactInvokers = new HashMap<>(); DatabaseSchema dbSchema = getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get(); if (dbSchema != null) { transactInvokers.put(dbSchema, new TransactInvokerImpl(this,dbSchema)); } } catch (InterruptedException | ExecutionException e) { LOG.warn("Exception attempting to createTransactionInvokers {}", connectionInfo, e); } } } private void monitorAllTables(String database, DatabaseSchema dbSchema) { Set tables = dbSchema.getTables(); if (tables != null) { List monitorRequests = new ArrayList<>(); for (String tableName : tables) { if (!HwvtepSouthboundConstants.SKIP_HWVTEP_TABLE.containsKey(tableName)) { LOG.info("HwvtepSouthbound monitoring Hwvtep schema table {}", tableName); GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class); final Set columns = new HashSet<>(tableSchema.getColumns()); List skipColumns = HwvtepSouthboundConstants.SKIP_COLUMN_FROM_HWVTEP_TABLE.get(tableName); skipColumns = skipColumns == null ? new ArrayList<>() : new ArrayList<>(skipColumns); skipColumns.add(HwvtepSouthboundConstants.VERSION_COLUMN); LOG.info("HwvtepSouthbound NOT monitoring columns {} in table {}", skipColumns, tableName); columns.removeAll(skipColumns); monitorRequests.add(new MonitorRequestBuilder<>(tableSchema) .addColumns(columns) .with(new MonitorSelect(true, true, true, true)).build()); } } this.callback.update(monitor(dbSchema, monitorRequests, callback), dbSchema); } else { LOG.warn("No tables for schema {} for database {} for key {}",dbSchema,database,connectionInfo); } } private void updateConnectionAttributes() { LOG.debug("Update attributes of ovsdb node ip: {} port: {}", this.initialCreatedData.getConnectionInfo().getRemoteIp(), this.initialCreatedData.getConnectionInfo().getRemotePort()); /* * TODO: Do we have anything to update? * Hwvtep doesn't have other_config or external_ids like * Open_vSwitch. What else will be needed? */ } public DataBroker getDataBroker() { return dataBroker; } public ListenableFuture> getDatabases() { return client.getDatabases(); } public ListenableFuture getSchema(String database) { return client.getSchema(database); } public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) { return client.transactBuilder(dbSchema); } public > TableUpdates monitor(DatabaseSchema schema, List monitorRequests, MonitorCallBack monitorCallBack) { return client.monitor(schema, monitorRequests, monitorCallBack); } public > TableUpdates monitor(DatabaseSchema schema, List monitorRequests, MonitorHandle monitorHandle, MonitorCallBack monitorCallBack) { return null; } public void cancelMonitor(MonitorHandle handler) { client.cancelMonitor(handler); } public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) { client.lock(lockId, lockedCallBack, stolenCallback); } public ListenableFuture steal(String lockId) { return client.steal(lockId); } public ListenableFuture unLock(String lockId) { return client.unLock(lockId); } public OvsdbConnectionInfo getConnectionInfo() { return client.getConnectionInfo(); } public boolean isActive() { return client.isActive(); } public void disconnect() { client.disconnect(); } public DatabaseSchema getDatabaseSchema(String dbName) { return client.getDatabaseSchema(dbName); } public > T createTypedRowWrapper(Class klazz) { return client.createTypedRowWrapper(klazz); } public > T createTypedRowWrapper(DatabaseSchema dbSchema, Class klazz) { return client.createTypedRowWrapper(dbSchema, klazz); } public > T getTypedRowWrapper(Class klazz, Row row) { return client.getTypedRowWrapper(klazz, row); } public ConnectionInfo getMDConnectionInfo() { return connectionInfo; } public void setMDConnectionInfo(ConnectionInfo key) { this.connectionInfo = key; } public InstanceIdentifier getInstanceIdentifier() { return instanceIdentifier; } public NodeKey getNodeKey() { //TODO: What is the alternative here? return getInstanceIdentifier().firstKeyOf(Node.class); } public NodeId getNodeId() { return getNodeKey().getNodeId(); } public void setInstanceIdentifier(InstanceIdentifier iid) { this.instanceIdentifier = iid; hwvtepConnectionManager.putConnectionInstance(instanceIdentifier, this); } public Entity getConnectedEntity() { return this.connectedEntity; } public void setConnectedEntity(Entity entity) { this.connectedEntity = entity; } public Boolean hasOvsdbClient(OvsdbClient otherClient) { return client.equals(otherClient); } public Boolean getHasDeviceOwnership() { return hasDeviceOwnership; } public void setHasDeviceOwnership(Boolean hasDeviceOwnership) { if (hasDeviceOwnership != null) { this.hasDeviceOwnership = hasDeviceOwnership; } } public void setDeviceOwnershipCandidateRegistration(@Nonnull EntityOwnershipCandidateRegistration registration) { this.deviceOwnershipCandidateRegistration = registration; } public void closeDeviceOwnershipCandidateRegistration() { if (deviceOwnershipCandidateRegistration != null) { this.deviceOwnershipCandidateRegistration.close(); setHasDeviceOwnership(Boolean.FALSE); } } public void setHwvtepGlobalAugmentation(HwvtepGlobalAugmentation hwvtepGlobalData) { this.initialCreatedData = hwvtepGlobalData; } public HwvtepGlobalAugmentation getHwvtepGlobalAugmentation() { return this.initialCreatedData; } public HwvtepDeviceInfo getDeviceInfo() { return this.deviceInfo; } public OvsdbClient getOvsdbClient() { return client; } public HwvtepTableReader getHwvtepTableReader() { return hwvtepTableReader; } public void refreshOperNode() throws ExecutionException, InterruptedException { TableUpdates tableUpdates = hwvtepTableReader.readAllTables(); callback.update(tableUpdates, getDatabaseSchema(HwvtepSchemaConstants.HARDWARE_VTEP)); } public MonitorCallBack getCallback() { return callback; } public void setCallback(MonitorCallBack callback) { this.callback = callback; } public TransactionHistory getControllerTxHistory() { return controllerTxHistory; } public void setControllerTxHistory(TransactionHistory controllerTxLog) { deviceInfo.setControllerTxHistory(controllerTxLog); this.controllerTxHistory = controllerTxLog; } public TransactionHistory getDeviceUpdateHistory() { return deviceUpdateHistory; } public void setDeviceUpdateHistory(TransactionHistory deviceUpdateLog) { deviceInfo.setDeviceUpdateHistory(deviceUpdateLog); this.deviceUpdateHistory = deviceUpdateLog; } }