• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #124

27 Sep 2023 04:02PM UTC coverage: 0.0%. Remained the same
#124

push

lorenzo-gomez-windhover
-Add staret, stop to CSVExporter

2 of 2 new or added lines in 1 file covered. (100.0%)

0 of 6432 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/archive/CSVExporter.java
1
package com.windhoverlabs.yamcs.archive;
2

3
import com.google.protobuf.ByteString;
4
import com.google.protobuf.Timestamp;
5
import java.io.File;
6
import java.io.FileNotFoundException;
7
import java.io.FileOutputStream;
8
import java.io.IOException;
9
import java.io.OutputStreamWriter;
10
import java.io.UncheckedIOException;
11
import java.io.Writer;
12
import java.nio.charset.StandardCharsets;
13
import java.nio.file.Path;
14
import java.nio.file.Paths;
15
import java.text.SimpleDateFormat;
16
import java.time.Instant;
17
import java.util.ArrayList;
18
import java.util.Date;
19
import java.util.List;
20
import java.util.Map;
21
import java.util.Map.Entry;
22
import org.yamcs.AbstractYamcsService;
23
import org.yamcs.InitException;
24
import org.yamcs.YConfiguration;
25
import org.yamcs.YamcsServer;
26
import org.yamcs.api.HttpBody;
27
import org.yamcs.api.Observer;
28
import org.yamcs.archive.ReplayOptions;
29
import org.yamcs.client.YamcsClient;
30
import org.yamcs.client.archive.ArchiveClient;
31
import org.yamcs.http.BadRequestException;
32
import org.yamcs.http.MediaType;
33
import org.yamcs.http.api.ManagementApi;
34
import org.yamcs.http.api.ParameterReplayListener;
35
import org.yamcs.http.api.ReplayFactory;
36
import org.yamcs.mdb.XtceDbFactory;
37
import org.yamcs.parameter.ParameterValueWithId;
38
import org.yamcs.protobuf.Archive.ExportParameterValuesRequest;
39
import org.yamcs.protobuf.Yamcs.NamedObjectId;
40
import org.yamcs.protobuf.Yamcs.ParameterReplayRequest;
41
import org.yamcs.security.ObjectPrivilegeType;
42
import org.yamcs.security.User;
43
import org.yamcs.utils.ParameterFormatter;
44
import org.yamcs.utils.TimeEncoding;
45
import org.yamcs.xtce.Parameter;
46
import org.yamcs.xtce.XtceDb;
47
import org.yamcs.yarch.FileSystemBucket;
48
import org.yamcs.yarch.YarchDatabase;
49
import org.yamcs.yarch.YarchDatabaseInstance;
50

51
public class CSVExporter extends AbstractYamcsService implements Runnable {
×
52

53
  private static class CsvParameterStreamer extends ParameterReplayListener {
54

55
    Observer<HttpBody> observer;
56
    List<NamedObjectId> ids;
57
    boolean addRaw;
58
    boolean addMonitoring;
59
    int recordCount = 0;
×
60
    char columnDelimiter = '\t';
×
61

62
    CsvParameterStreamer(
63
        Observer<HttpBody> observer,
64
        String filename,
65
        List<NamedObjectId> ids,
66
        boolean addRaw,
67
        boolean addMonitoring) {
×
68
      this.observer = observer;
×
69
      this.ids = ids;
×
70
      this.addRaw = addRaw;
×
71
      this.addMonitoring = addMonitoring;
×
72

73
      HttpBody metadata =
74
          HttpBody.newBuilder()
×
75
              .setContentType(MediaType.CSV.toString())
×
76
              .setFilename(filename)
×
77
              .build();
×
78

79
      observer.next(metadata);
×
80
    }
×
81

82
    @Override
83
    protected void onParameterData(List<ParameterValueWithId> params) {
84

85
      ByteString.Output data = ByteString.newOutput();
×
86
      try (Writer writer = new OutputStreamWriter(data, StandardCharsets.UTF_8);
×
87
          ParameterFormatter formatter = new ParameterFormatter(writer, ids, columnDelimiter)) {
×
88
        formatter.setWriteHeader(recordCount == 0);
×
89
        formatter.setPrintRaw(addRaw);
×
90
        formatter.setPrintMonitoring(addMonitoring);
×
91
        formatter.writeParameters(params);
×
92
      } catch (IOException e) {
×
93
        throw new UncheckedIOException(e);
×
94
      }
×
95

96
      HttpBody body = HttpBody.newBuilder().setData(data.toByteString()).build();
×
97
      observer.next(body);
×
98
      recordCount++;
×
99
    }
×
100

101
    @Override
102
    public void replayFailed(Throwable t) {
103
      observer.completeExceptionally(t);
×
104
    }
×
105

106
    @Override
107
    public void replayFinished() {
108
      observer.complete();
×
109
    }
×
110
  }
111

112
  private YamcsClient yamcsClient;
113
  private ArchiveClient archiveClient;
114
  private boolean realtime;
115
  private Instant start;
116
  private Instant stop;
117
  private String bucketName;
118
  private FileSystemBucket bucket;
119
  private Thread thread;
120
  private FileOutputStream fis;
121
  private Path bucketPath;
122
  private static final String CSV_NAME_POST_FIX = ".csv";
123
  private Map<String, List<String>> paramsMap = null;
×
124

125
  @Override
126
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
127
      throws InitException {
128

129
    //    realtime =
130
    //        this.config.getBoolean(
131
    //            "realtime", false); // might be useful for "always" writing to a CSV file...
132
    //    start = Instant.parse("2023-09-23T23:00:00.000Z");
133
    start = Instant.parse(config.getString("start"));
×
134
    //    stop = Instant.parse("2023-09-24T00:10:00.000Z");
135
    stop = Instant.parse(config.getString("stop"));
×
136

137
    /* Read in our configuration parameters. */
138
    bucketName = config.getString("bucket");
×
139
    paramsMap = config.getMap("params");
×
140

141
    //    System.out.println("params:" + paramsMap);
142

143
    /* Iterate through the bucket names passed to us by the configuration file.  We're going to add the buckets
144
     * to our internal list so we can process them later. */
145
    YarchDatabaseInstance yarch = YarchDatabase.getInstance(YamcsServer.GLOBAL_INSTANCE);
×
146

147
    try {
148
      bucket = (FileSystemBucket) yarch.getBucket(bucketName);
×
149
    } catch (IOException e) {
×
150
      e.printStackTrace();
×
151
    }
×
152
  }
×
153

154
  @Override
155
  protected void doStart() {
156

157
    thread = new Thread(this);
×
158
    thread.start();
×
159
    notifyStarted();
×
160
  }
×
161

162
  @Override
163
  protected void doStop() {
164
    // TODO Auto-generated method stub
165

166
  }
×
167

168
  private void export(
169
      User user, ExportParameterValuesRequest request, Observer<HttpBody> observer) {
170
    String instance = ManagementApi.verifyInstance(request.getInstance());
×
171
    //
172
    ReplayOptions repl = ReplayOptions.getAfapReplay();
×
173
    //
174
    List<NamedObjectId> ids = new ArrayList<>();
×
175
    XtceDb mdb = XtceDbFactory.getInstance(instance);
×
176
    String namespace = null;
×
177

178
    if (request.hasStart()) {
×
179
      repl.setRangeStart(TimeEncoding.fromProtobufTimestamp(request.getStart()));
×
180
    }
181
    if (request.hasStop()) {
×
182
      repl.setRangeStop(TimeEncoding.fromProtobufTimestamp(request.getStop()));
×
183
    }
184
    //    TODO:Get names from conofig/api request
185
    for (String id : request.getParametersList()) {
×
186
      //              ParameterWithId paramWithId = MdbApi.verifyParameterWithId(ctx, mdb, id);
187
      //      NamedObjectId.newBuilder().build();
188
      ids.add(NamedObjectId.newBuilder().setName(id).build());
×
189
    }
×
190
    if (request.hasNamespace()) {
×
191
      namespace = request.getNamespace();
×
192
    }
193

194
    if (ids.isEmpty()) {
×
195
      for (Parameter p : mdb.getParameters()) {
×
196
        if (!user.hasObjectPrivilege(ObjectPrivilegeType.ReadParameter, p.getQualifiedName())) {
×
197
          continue;
×
198
        }
199
        if (namespace != null) {
×
200
          String alias = p.getAlias(namespace);
×
201
          if (alias != null) {
×
202
            ids.add(NamedObjectId.newBuilder().setNamespace(namespace).setName(alias).build());
×
203
          }
204
        } else {
×
205
          ids.add(NamedObjectId.newBuilder().setName(p.getQualifiedName()).build());
×
206
        }
207
      }
×
208
    }
209
    repl.setParameterRequest(ParameterReplayRequest.newBuilder().addAllNameFilter(ids).build());
×
210

211
    String filename;
212
    String dateString = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss").format(new Date());
×
213
    if (ids.size() == 1) {
×
214
      NamedObjectId id = ids.get(0);
×
215
      String parameterName = id.hasNamespace() ? id.getName() : id.getName().substring(1);
×
216
      filename = parameterName.replace('/', '_') + "_export_" + dateString + ".csv";
×
217
    } else {
×
218
      filename = "parameter_export_" + dateString + ".csv";
×
219
    }
220

221
    boolean addRaw = false;
×
222
    boolean addMonitoring = false;
×
223
    for (String extra : request.getExtraList()) {
×
224
      if (extra.equals("raw")) {
×
225
        addRaw = true;
×
226
      } else if (extra.equals("monitoring")) {
×
227
        addMonitoring = true;
×
228
      } else {
229
        throw new BadRequestException("Unexpected option for parameter 'extra': " + extra);
×
230
      }
231
    }
×
232
    CsvParameterStreamer l =
×
233
        new CsvParameterStreamer(observer, filename, ids, addRaw, addMonitoring);
234
    if (request.hasDelimiter()) {
×
235
      switch (request.getDelimiter()) {
×
236
        case "TAB":
237
          l.columnDelimiter = '\t';
×
238
          break;
×
239
        case "SEMICOLON":
240
          l.columnDelimiter = ';';
×
241
          break;
×
242
        case "COMMA":
243
          l.columnDelimiter = ',';
×
244
          break;
×
245
        default:
246
          throw new BadRequestException("Unexpected column delimiter");
×
247
      }
248
    }
249
    //    observer.setCancelHandler(l::requestReplayAbortion);
250
    ReplayFactory.replay(instance, user, repl, l);
×
251
  }
×
252

253
  @Override
254
  public void run() {
255
    exportPV();
×
256
  }
×
257

258
  private void exportPV() {
259

260
    for (Entry<String, List<String>> params : paramsMap.entrySet()) {
×
261
      ArrayList<String> parameters = new ArrayList<String>();
×
262
      //      String[] nameTokens = param.split("/");
263

264
      fis = openFile(getNewFilePath(params.getKey()).toAbsolutePath().toString());
×
265
      for (String param : params.getValue()) {
×
266
        //        String p = "/cfs/ppd/apps/to/TO_HK_TLM_MID.ChannelInfo[0].MessagesSent";
267
        parameters.add(param);
×
268
      }
×
269
      this.export(
×
270
          YamcsServer.getServer().getSecurityStore().getDirectory().getUser("admin"),
×
271
          ExportParameterValuesRequest.newBuilder()
×
272
              .addAllParameters(parameters)
×
273
              .setStart(
×
274
                  Timestamp.newBuilder()
×
275
                      .setNanos(start.getNano())
×
276
                      .setSeconds(start.getEpochSecond())
×
277
                      .build())
×
278
              .setStop(
×
279
                  Timestamp.newBuilder()
×
280
                      .setNanos(stop.getNano())
×
281
                      .setSeconds(stop.getEpochSecond())
×
282
                      .build())
×
283
              .setInstance("fsw")
×
284
              .build(),
×
285
          new Observer<HttpBody>() {
×
286

287
            @Override
288
            public void next(HttpBody message) {
289
              //              // TODO Auto-generated method stub
290
              writeToCSV(message.getData().toByteArray());
×
291
            }
×
292

293
            @Override
294
            public void completeExceptionally(Throwable t) {
295
              // TODO Auto-generated method stub
296

297
            }
×
298

299
            @Override
300
            public void complete() {
301
              // TODO Auto-generated method stub
302
              closeFile(fis);
×
303
            }
×
304
          });
305
    }
×
306
  }
×
307

308
  private void writeToCSV(byte[] data) {
309

310
    writeToFile(data, fis);
×
311
  }
×
312

313
  private void writeToFile(byte[] data, FileOutputStream fis) {
314
    try {
315
      fis.write(data, 0, data.length);
×
316
    } catch (IOException e) {
×
317
      // TODO Auto-generated catch block
318
      e.printStackTrace();
×
319
    }
×
320
  }
×
321

322
  private void closeFile(FileOutputStream fis) {
323
    try {
324
      fis.close();
×
325
    } catch (IOException e) {
×
326
      // TODO Auto-generated catch block
327
      e.printStackTrace();
×
328
    }
×
329
  }
×
330

331
  private FileOutputStream openFile(String path) {
332
    File inputFile = new File(path);
×
333
    FileOutputStream fis = null;
×
334
    try {
335
      fis = new FileOutputStream(inputFile);
×
336
    } catch (FileNotFoundException e) {
×
337
      // TODO Auto-generated catch block
338
      e.printStackTrace();
×
339
    }
×
340
    return fis;
×
341
  }
342

343
  private Path getNewFilePath(String name) {
344
    bucketPath = Paths.get(bucket.getBucketRoot().toString()).toAbsolutePath();
×
345
    Path filePath = bucketPath.resolve(name + "_" + CSV_NAME_POST_FIX);
×
346

347
    return filePath;
×
348
  }
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc