• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #157

25 Nov 2024 11:22PM UTC coverage: 0.0%. Remained the same
#157

push

web-flow
Merge 096a65ddf into e64813ef2

0 of 18 new or added lines in 13 files covered. (0.0%)

1 existing line in 1 file now uncovered.

0 of 6640 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/StreamTmFrameLink.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.util.concurrent.TimeUnit;
5
import java.util.concurrent.atomic.AtomicBoolean;
6
import java.util.concurrent.atomic.AtomicLong;
7
import org.yamcs.ConfigurationException;
8
import org.yamcs.YConfiguration;
9
import org.yamcs.tctm.ccsds.AbstractTmFrameLink;
10
import org.yamcs.utils.DataRateMeter;
11
import org.yamcs.yarch.ColumnDefinition;
12
import org.yamcs.yarch.DataType;
13
import org.yamcs.yarch.Stream;
14
import org.yamcs.yarch.StreamSubscriber;
15
import org.yamcs.yarch.Tuple;
16
import org.yamcs.yarch.TupleDefinition;
17
import org.yamcs.yarch.YarchDatabase;
18
import org.yamcs.yarch.YarchDatabaseInstance;
19

20
/**
21
 * Receives telemetry frames via a YAMCS stream. One stream Tuple = one TM frame.
22
 *
23
 * @author Mathew Benson
24
 */
25
public class StreamTmFrameLink extends AbstractTmFrameLink implements StreamSubscriber {
×
26
  private volatile int invalidDatagramCount = 0;
×
27

28
  String packetPreprocessorClassName;
29
  Object packetPreprocessorArgs;
30
  Thread thread;
31
  RateLimiter rateLimiter;
32
  protected Stream stream;
33
  protected AtomicLong packetCount = new AtomicLong(0);
×
34
  DataRateMeter packetRateMeter = new DataRateMeter();
×
35
  DataRateMeter dataRateMeter = new DataRateMeter();
×
36
  protected String linkName;
37
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
38

39
  static TupleDefinition gftdef;
40

41
  static final String RECTIME_CNAME = "rectime";
42
  static final String DATA_CNAME = "data";
43

44
  static {
45
    gftdef = new TupleDefinition();
×
46
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
47
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
48
  }
×
49

50
  /**
51
   * Creates a new Stream TM Frame Data Link
52
   *
53
   * @throws ConfigurationException if port is not defined in the configuration
54
   */
55
  public void init(String instance, String name, YConfiguration config)
56
      throws ConfigurationException {
57
    super.init(instance, name, config);
×
58

59
    String streamName = config.getString("in_stream");
×
60
    this.linkName = name;
×
61

62
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
63
    this.stream = getStream(ydb, streamName);
×
64

65
    this.stream.addSubscriber(this);
×
66

67
    if (config.containsKey("frameMaxRate")) {
×
68
      rateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
69
    }
70
  }
×
71

72
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
73
    Stream stream = ydb.getStream(streamName);
×
74
    if (stream == null) {
×
75
      try {
76
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
77
        // ydb.execute("create stream " + streamName);
78
      } catch (Exception e) {
×
79
        throw new ConfigurationException(e);
×
80
      }
×
81
      stream = ydb.getStream(streamName);
×
82
    }
83
    return stream;
×
84
  }
85

86
  @Override
87
  public YConfiguration getConfig() {
88
    return config;
×
89
  }
90

91
  @Override
92
  public String getName() {
93
    return linkName;
×
94
  }
95

96
  @Override
97
  public void resetCounters() {
98
    packetCount.set(0);
×
99
  }
×
100

101
  @Override
102
  public void doStart() {
103
    notifyStarted();
×
104
  }
×
105

106
  @Override
107
  public void doStop() {
108
    notifyStopped();
×
109
  }
×
110

111
  /**
112
   * called when a new packet is received to update the statistics
113
   *
114
   * @param packetSize
115
   */
116
  protected void updateStats(int packetSize) {
117
    packetCount.getAndIncrement();
×
118
    packetRateMeter.mark(1);
×
119
    dataRateMeter.mark(packetSize);
×
120
  }
×
121

122
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
123
  @Override
124
  public String getDetailedStatus() {
125
    if (isDisabled()) {
×
126
      return "DISABLED";
×
127
    } else {
128
      return String.format(
×
129
          "OK %nValid datagrams received: %d%nInvalid datagrams received: %d",
NEW
130
          validFrameCount.get(), invalidFrameCount.get());
×
131
    }
132
  }
133

134
  @Override
135
  protected void doDisable() {}
×
136

137
  @Override
138
  protected void doEnable() {}
×
139

140
  @Override
141
  public long getDataInCount() {
142
    return 0;
×
143
  }
144

145
  @Override
146
  public long getDataOutCount() {
147
    return packetCount.get();
×
148
  }
149

150
  @Override
151
  public Status getLinkStatus() {
152
    if (isDisabled()) {
×
153
      return Status.DISABLED;
×
154
    }
155
    if (state() == State.FAILED) {
×
156
      return Status.FAILED;
×
157
    }
158

159
    return connectionStatus();
×
160
  }
161

162
  @Override
163
  public boolean isDisabled() {
164
    return disabled.get();
×
165
  }
166

167
  protected Status connectionStatus() {
168
    return Status.OK;
×
169
  }
170

171
  @Override
172
  public void onTuple(Stream arg0, Tuple tuple) {
173
    if (isRunningAndEnabled()) {
×
174

175
      byte[] packet;
176
      packet = tuple.getColumn(DATA_CNAME);
×
177

178
      handleFrame(timeService.getHresMissionTime(), packet, 0, packet.length);
×
179
    }
180
  }
×
181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc