Migrate TyperUtils.getTableSchema() users
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / HwvtepConnectionInstance.java
1 /*
2  * Copyright (c) 2015, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.HashMap;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.eos.binding.api.Entity;
30 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
31 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.TransactCommand;
32 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.TransactInvoker;
33 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.TransactInvokerImpl;
34 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
35 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
36 import org.opendaylight.ovsdb.lib.LockStolenCallback;
37 import org.opendaylight.ovsdb.lib.MonitorCallBack;
38 import org.opendaylight.ovsdb.lib.MonitorHandle;
39 import org.opendaylight.ovsdb.lib.OvsdbClient;
40 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
41 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
42 import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
43 import org.opendaylight.ovsdb.lib.message.MonitorSelect;
44 import org.opendaylight.ovsdb.lib.message.TableUpdates;
45 import org.opendaylight.ovsdb.lib.notation.Row;
46 import org.opendaylight.ovsdb.lib.operations.Operation;
47 import org.opendaylight.ovsdb.lib.operations.OperationResult;
48 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
49 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
50 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
51 import org.opendaylight.ovsdb.lib.schema.TableSchema;
52 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
53 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
54 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 public class HwvtepConnectionInstance {
65     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionInstance.class);
66     private ConnectionInfo connectionInfo;
67     private final OvsdbClient client;
68     private final HwvtepTableReader hwvtepTableReader;
69     private InstanceIdentifier<Node> instanceIdentifier;
70     private final TransactionInvoker txInvoker;
71     private Map<TypedDatabaseSchema, TransactInvoker> transactInvokers;
72     private MonitorCallBack callback;
73     private volatile boolean hasDeviceOwnership = false;
74     private Entity connectedEntity;
75     private EntityOwnershipCandidateRegistration deviceOwnershipCandidateRegistration;
76     private HwvtepGlobalAugmentation initialCreatedData = null;
77     private final HwvtepDeviceInfo deviceInfo;
78     private final DataBroker dataBroker;
79     private final HwvtepConnectionManager hwvtepConnectionManager;
80     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
81     @VisibleForTesting
82     final SettableFuture<Boolean> reconciliationFt = SettableFuture.create();
83     @VisibleForTesting
84     final AtomicBoolean firstUpdateTriggered = new AtomicBoolean(false);
85     private TransactionHistory controllerTxHistory;
86     private TransactionHistory deviceUpdateHistory;
87
88     HwvtepConnectionInstance(final HwvtepConnectionManager hwvtepConnectionManager, final ConnectionInfo key,
89             final OvsdbClient client, final InstanceIdentifier<Node> iid, final TransactionInvoker txInvoker,
90             final DataBroker dataBroker) {
91         this.hwvtepConnectionManager = hwvtepConnectionManager;
92         this.connectionInfo = key;
93         this.client = client;
94         this.instanceIdentifier = iid;
95         this.txInvoker = txInvoker;
96         this.deviceInfo = new HwvtepDeviceInfo(this);
97         this.dataBroker = dataBroker;
98         this.hwvtepTableReader = new HwvtepTableReader(this);
99     }
100
101     public void transact(final TransactCommand command) {
102         String nodeId = getNodeId().getValue();
103         boolean firstUpdate = firstUpdateTriggered.compareAndSet(false, true);
104         if (reconciliationFt.isDone()) {
105             transact(command, false);
106         } else {
107             LOG.info("Job waiting for reconciliation {}", nodeId);
108             Futures.addCallback(reconciliationFt, new FutureCallback<Boolean>() {
109                 @Override
110                 public void onSuccess(final Boolean notUsed) {
111                     LOG.info("Running the job waiting for reconciliation {}", nodeId);
112                     transact(command, false);
113                 }
114
115                 @Override
116                 public void onFailure(final Throwable throwable) {
117                     LOG.info("Running the job waiting for reconciliation {}", nodeId);
118                     transact(command, false);
119                 }
120             }, MoreExecutors.directExecutor());
121
122             if (firstUpdate) {
123                 LOG.info("Scheduling the reconciliation timeout task {}", nodeId);
124                 scheduledExecutorService.schedule(() -> reconciliationFt.set(Boolean.TRUE),
125                         HwvtepSouthboundConstants.CONFIG_NODE_UPDATE_MAX_DELAY_MS, TimeUnit.MILLISECONDS);
126             }
127         }
128     }
129
130     public synchronized void transact(final TransactCommand command, final boolean reconcile) {
131         try {
132             for (TransactInvoker transactInvoker : transactInvokers.values()) {
133                 transactInvoker.invoke(command);
134             }
135         } finally {
136             if (reconcile) {
137                 reconciliationFt.set(Boolean.TRUE);
138             }
139         }
140     }
141
142     public ListenableFuture<List<OperationResult>> transact(final DatabaseSchema dbSchema,
143             final List<Operation> operations) {
144         return client.transact(dbSchema, operations);
145     }
146
147     public void registerCallbacks() {
148         if (this.callback == null) {
149             if (this.initialCreatedData != null) {
150                 this.updateConnectionAttributes();
151             }
152
153             try {
154                 String database = HwvtepSchemaConstants.HARDWARE_VTEP;
155                 DatabaseSchema dbSchema = getSchema(database).get();
156                 if (dbSchema != null) {
157                     LOG.info("Monitoring database: {}", database);
158                     callback = new HwvtepMonitorCallback(this, txInvoker);
159                     monitorAllTables(database, dbSchema);
160                 } else {
161                     LOG.info("No database {} found on {}", database, connectionInfo);
162                 }
163             } catch (InterruptedException | ExecutionException e) {
164                 LOG.warn("Exception attempting to registerCallbacks {}: ", connectionInfo, e);
165             }
166         }
167     }
168
169     public void createTransactInvokers() {
170         if (transactInvokers == null) {
171             try {
172                 transactInvokers = new HashMap<>();
173                 TypedDatabaseSchema dbSchema = getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
174                 if (dbSchema != null) {
175                     transactInvokers.put(dbSchema, new TransactInvokerImpl(this, dbSchema));
176                 }
177             } catch (InterruptedException | ExecutionException e) {
178                 LOG.warn("Exception attempting to createTransactionInvokers {}", connectionInfo, e);
179             }
180         }
181     }
182
183     private void monitorAllTables(final String database, final DatabaseSchema dbSchema) {
184         Set<String> tables = dbSchema.getTables();
185         if (tables != null) {
186             List<MonitorRequest> monitorRequests = new ArrayList<>();
187             for (String tableName : tables) {
188                 if (!HwvtepSouthboundConstants.SKIP_HWVTEP_TABLE.containsKey(tableName)) {
189                     LOG.info("HwvtepSouthbound monitoring Hwvtep schema table {}", tableName);
190                     GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class);
191                     final Set<String> columns = new HashSet<>(tableSchema.getColumns());
192                     List<String> skipColumns = HwvtepSouthboundConstants.SKIP_COLUMN_FROM_HWVTEP_TABLE.get(tableName);
193                     skipColumns = skipColumns == null ? new ArrayList<>() : new ArrayList<>(skipColumns);
194                     skipColumns.add(HwvtepSouthboundConstants.VERSION_COLUMN);
195
196                     LOG.info("HwvtepSouthbound NOT monitoring columns {} in table {}", skipColumns, tableName);
197                     columns.removeAll(skipColumns);
198
199                     monitorRequests.add(new MonitorRequestBuilder<>(tableSchema)
200                             .addColumns(columns)
201                             .with(new MonitorSelect(true, true, true, true)).build());
202                 }
203             }
204             this.callback.update(monitor(dbSchema, monitorRequests, callback), dbSchema);
205         } else {
206             LOG.warn("No tables for schema {} for database {} for key {}",dbSchema,database,connectionInfo);
207         }
208     }
209
210     private void updateConnectionAttributes() {
211         LOG.debug("Update attributes of ovsdb node ip: {} port: {}",
212                     this.initialCreatedData.getConnectionInfo().getRemoteIp(),
213                     this.initialCreatedData.getConnectionInfo().getRemotePort());
214         /*
215          * TODO: Do we have anything to update?
216          * Hwvtep doesn't have other_config or external_ids like
217          * Open_vSwitch. What else will be needed?
218          */
219     }
220
221     public DataBroker getDataBroker() {
222         return dataBroker;
223     }
224
225     public ListenableFuture<List<String>> getDatabases() {
226         return client.getDatabases();
227     }
228
229     public ListenableFuture<TypedDatabaseSchema> getSchema(final String database) {
230         return client.getSchema(database);
231     }
232
233     public TransactionBuilder transactBuilder(final DatabaseSchema dbSchema) {
234         return client.transactBuilder(dbSchema);
235     }
236
237     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema schema,
238                     final List<MonitorRequest> monitorRequests, final MonitorCallBack monitorCallBack) {
239         return client.monitor(schema, monitorRequests, monitorCallBack);
240     }
241
242     public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema schema,
243             final List<MonitorRequest> monitorRequests, final MonitorHandle monitorHandle,
244             final MonitorCallBack monitorCallBack) {
245         return null;
246     }
247
248     public void cancelMonitor(final MonitorHandle handler) {
249         client.cancelMonitor(handler);
250     }
251
252     public void lock(final String lockId, final LockAquisitionCallback lockedCallBack,
253             final LockStolenCallback stolenCallback) {
254         client.lock(lockId, lockedCallBack, stolenCallback);
255     }
256
257     public ListenableFuture<Boolean> steal(final String lockId) {
258         return client.steal(lockId);
259     }
260
261     public ListenableFuture<Boolean> unLock(final String lockId) {
262         return client.unLock(lockId);
263     }
264
265     public OvsdbConnectionInfo getConnectionInfo() {
266         return client.getConnectionInfo();
267     }
268
269     public boolean isActive() {
270         return client.isActive();
271     }
272
273     public void disconnect() {
274         client.disconnect();
275     }
276
277     public DatabaseSchema getDatabaseSchema(final String dbName) {
278         return client.getDatabaseSchema(dbName);
279     }
280
281     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final Class<T> klazz) {
282         return client.createTypedRowWrapper(klazz);
283     }
284
285     public <T extends TypedBaseTable<?>> T createTypedRowWrapper(final DatabaseSchema dbSchema, final Class<T> klazz) {
286         return client.createTypedRowWrapper(dbSchema, klazz);
287     }
288
289     public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
290         return client.getTypedRowWrapper(klazz, row);
291     }
292
293     public ConnectionInfo getMDConnectionInfo() {
294         return connectionInfo;
295     }
296
297     public void setMDConnectionInfo(final ConnectionInfo key) {
298         this.connectionInfo = key;
299     }
300
301     public InstanceIdentifier<Node> getInstanceIdentifier() {
302         return instanceIdentifier;
303     }
304
305     public NodeKey getNodeKey() {
306         //TODO: What is the alternative here?
307         return getInstanceIdentifier().firstKeyOf(Node.class);
308     }
309
310     public NodeId getNodeId() {
311         return getNodeKey().getNodeId();
312     }
313
314     public void setInstanceIdentifier(final InstanceIdentifier<Node> iid) {
315         this.instanceIdentifier = iid;
316         hwvtepConnectionManager.putConnectionInstance(instanceIdentifier, this);
317     }
318
319     public Entity getConnectedEntity() {
320         return this.connectedEntity;
321     }
322
323     public void setConnectedEntity(final Entity entity) {
324         this.connectedEntity = entity;
325     }
326
327     public Boolean hasOvsdbClient(final OvsdbClient otherClient) {
328         return client.equals(otherClient);
329     }
330
331     public Boolean getHasDeviceOwnership() {
332         return hasDeviceOwnership;
333     }
334
335     public void setHasDeviceOwnership(final Boolean hasDeviceOwnership) {
336         if (hasDeviceOwnership != null) {
337             this.hasDeviceOwnership = hasDeviceOwnership;
338         }
339     }
340
341     public void setDeviceOwnershipCandidateRegistration(
342             @NonNull final EntityOwnershipCandidateRegistration registration) {
343         this.deviceOwnershipCandidateRegistration = registration;
344     }
345
346     public void closeDeviceOwnershipCandidateRegistration() {
347         if (deviceOwnershipCandidateRegistration != null) {
348             this.deviceOwnershipCandidateRegistration.close();
349             setHasDeviceOwnership(Boolean.FALSE);
350         }
351     }
352
353     public void setHwvtepGlobalAugmentation(final HwvtepGlobalAugmentation hwvtepGlobalData) {
354         this.initialCreatedData = hwvtepGlobalData;
355     }
356
357     public HwvtepGlobalAugmentation getHwvtepGlobalAugmentation() {
358         return this.initialCreatedData;
359     }
360
361     public HwvtepDeviceInfo getDeviceInfo() {
362         return this.deviceInfo;
363     }
364
365     public OvsdbClient getOvsdbClient() {
366         return client;
367     }
368
369     public HwvtepTableReader getHwvtepTableReader() {
370         return hwvtepTableReader;
371     }
372
373     public void refreshOperNode() throws ExecutionException, InterruptedException {
374         TableUpdates tableUpdates = hwvtepTableReader.readAllTables();
375         callback.update(tableUpdates, getDatabaseSchema(HwvtepSchemaConstants.HARDWARE_VTEP));
376     }
377
378     public MonitorCallBack getCallback() {
379         return callback;
380     }
381
382     public void setCallback(final MonitorCallBack callback) {
383         this.callback = callback;
384     }
385
386     public TransactionHistory getControllerTxHistory() {
387         return controllerTxHistory;
388     }
389
390     public void setControllerTxHistory(final TransactionHistory controllerTxLog) {
391         deviceInfo.setControllerTxHistory(controllerTxLog);
392         this.controllerTxHistory = controllerTxLog;
393     }
394
395     public TransactionHistory getDeviceUpdateHistory() {
396         return deviceUpdateHistory;
397     }
398
399     public void setDeviceUpdateHistory(final TransactionHistory deviceUpdateLog) {
400         deviceInfo.setDeviceUpdateHistory(deviceUpdateLog);
401         this.deviceUpdateHistory = deviceUpdateLog;
402     }
403 }