2 * Copyright (c) 2017 Ericsson Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.bulk.o.matic;
10 import com.google.common.util.concurrent.FutureCallback;
11 import com.google.common.util.concurrent.MoreExecutors;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicInteger;
14 import java.util.concurrent.atomic.AtomicLong;
15 import org.opendaylight.mdsal.binding.api.DataBroker;
16 import org.opendaylight.mdsal.binding.api.WriteTransaction;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
21 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 public class TableWriter implements FlowCounterMBean {
26 private static final Logger LOG = LoggerFactory.getLogger(TableWriter.class);
28 private final AtomicInteger writeOpStatus = new AtomicInteger(FlowCounter.OperationStatus.INIT.status());
29 private final AtomicLong taskCompletionTime = new AtomicLong(BulkOMaticUtils.DEFAULT_COMPLETION_TIME);
30 private final AtomicInteger successfulWrites = new AtomicInteger();
31 private final AtomicInteger failedWrites = new AtomicInteger();
32 private final DataBroker dataBroker;
33 private final ExecutorService tablePusher;
35 public TableWriter(final DataBroker dataBroker, final ExecutorService tablePusher) {
36 this.dataBroker = dataBroker;
37 this.tablePusher = tablePusher;
40 public void addTables(final int dpnCount, final short startTableId, final short endTableId) {
41 LOG.info("Starting to add tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
42 TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, true);
43 tablePusher.execute(task);
46 public void deleteTables(int dpnCount, short startTableId, short endTableId) {
47 LOG.info("Starting to delete tables: {} to {} on each of {}", startTableId, endTableId, dpnCount);
48 TableHandlerTask task = new TableHandlerTask(dpnCount, startTableId, endTableId, false);
49 tablePusher.execute(task);
53 public int getWriteOpStatus() {
54 return writeOpStatus.get();
58 public long getTaskCompletionTime() {
59 return taskCompletionTime.get();
63 public long getTableCount() {
64 return successfulWrites.get();
67 private class TableHandlerTask implements Runnable {
69 private final short startTableId;
70 private final short endTableId;
71 private final int dpnCount;
72 private final boolean isAdd;
74 TableHandlerTask(int dpnCount, short startTableId, short endTableId, boolean isAdd) {
75 this.dpnCount = dpnCount;
76 this.startTableId = startTableId;
77 this.endTableId = endTableId;
83 writeOpStatus.set(FlowCounter.OperationStatus.IN_PROGRESS.status());
84 int totalTables = dpnCount * (endTableId - startTableId + 1);
86 for (int dpn = 1; dpn <= dpnCount; dpn++) {
87 String dpId = BulkOMaticUtils.DEVICE_TYPE_PREFIX + String.valueOf(dpn);
88 for (short tableId = startTableId; tableId <= endTableId; tableId++) {
89 WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
90 Table table = new TableBuilder().withKey(new TableKey(tableId)).setId(tableId).build();
91 InstanceIdentifier<Table> tableIId = BulkOMaticUtils.getTableId(tableId, dpId);
94 wtx.put(LogicalDatastoreType.CONFIGURATION, tableIId, table, true);
96 wtx.delete(LogicalDatastoreType.CONFIGURATION, tableIId);
99 wtx.commit().addCallback(new FutureCallback<Object>() {
101 public void onSuccess(Object voidParameter) {
102 if (successfulWrites.incrementAndGet() == totalTables) {
103 if (failedWrites.get() > 0) {
104 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
106 writeOpStatus.set(FlowCounter.OperationStatus.SUCCESS.status());
112 public void onFailure(Throwable throwable) {
113 LOG.error("Table addition Failed.", throwable);
114 if (failedWrites.incrementAndGet() == totalTables) {
115 writeOpStatus.set(FlowCounter.OperationStatus.FAILURE.status());
118 }, MoreExecutors.directExecutor());