• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

eswdd / nodetsdb-gce / 3838799297

pending completion
3838799297

Pull #34

github

GitHub
Merge 6e606993d into 15b38e233
Pull Request #34: Bump tunnel-agent and coveralls

185 of 284 branches covered (65.14%)

Branch coverage included in aggregate %.

537 of 612 relevant lines covered (87.75%)

8.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.58
/index.js
1
var express = require('express');
1✔
2
var api = require('nodetsdb-api');
1✔
3
var Datastore = require('@google-cloud/datastore');
1✔
4
// actual datastore
5
var datastore;
6

7
var config = {
1✔
8
};
9

10
var backend = {};
1✔
11

12
var lpad = function (input, char, length) {
1✔
13
    while (input.length < length) {
54✔
14
        input = char + input;
114✔
15
    }
16
    return input;
54✔
17
}
18

19
var uidKind = function(type, callback) {
1✔
20
    var kind;
21
    switch (type) {
85!
22
        case "metric": kind = "metric_uid"; break;
21✔
23
        case "tagk": kind = "tagk_uid"; break;
32✔
24
        case "tagv": kind = "tagv_uid"; break;
32✔
25
        default:
26
            callback(null, 'Unsupported type: '+type);
×
27
            return;
×
28
    }
29
    callback(kind);
85✔
30
};
31

32
var uidMetaFrom = function (type, identifier, callback) {
1✔
33
    uidKind(type, function(kind, err) {
6✔
34
        if (err) {
6!
35
            callback(null, err);
×
36
            return;
×
37
        }
38

39
        datastore.get(datastore.key({namespace:config.namespace,path:[kind, identifier]}), function(err, entity) {
6✔
40
            if (err) {
6!
41
                callback(null, err);
×
42
            }
43
            else {
44
                callback(entity, null);
6✔
45
            }
46
        });
47
    });
48

49
};
50

51

52

53
var multiUidMetaFrom = function (type, identifiers, callback) {
1✔
54
    uidKind(type, function(kind, err) {
19✔
55
        if (err) {
19!
56
            callback(null, err);
×
57
            return;
×
58
        }
59

60
        if (identifiers.length === 0) {
19!
61
            callback([]);
×
62
            return;
×
63
        }
64

65
        console.log("Loading "+kind+" identifiers: "+JSON.stringify(identifiers));
19✔
66
        var dsIdentifiers = identifiers.map(function(identifier){return datastore.key({namespace:config.namespace,path:[kind, identifier]});});
47✔
67
        datastore.get(dsIdentifiers, function(err, entities) {
19✔
68
            if (err) {
19!
69
                callback(null, err);
×
70
            }
71
            else {
72
                var ret = {};
19✔
73
                var foundANull = false;
19✔
74
                for (var i=0; i<identifiers.length; i++) {
19✔
75
                    var uidmeta = entities[i];
47✔
76
                    ret[uidmeta.uid] = uidmeta;
47✔
77
                    if (entities[i] === undefined) {
47!
78
                        foundANull = true;
×
79
                    }
80
                }
81
                callback(ret, foundANull ? "At least one lookup failed" : null);
19!
82
            }
83
        });
84
    });
85
};
86

87
var uidMetaFromName = function(type, name, callback) {
1✔
88
    uidMetaFrom(type, "name:"+name, callback);
5✔
89
};
90

91
// on backend api
92
backend.uidMetaFromUid = function(type, uid, callback) {
1✔
93
    uidMetaFrom(type, "id:"+uid, callback);
×
94
};
95

96
var multiUidMetaFromUid = function(type, uids, callback) {
1✔
97
    multiUidMetaFrom(type, uids.map(function(uid) { return "id:"+uid; }), callback);
47✔
98
};
99

100
var nextUid = function(txn, kind, callback) {
1✔
101
    var uidSequenceKey = datastore.key({namespace:config.namespace,path:["uid_sequence", "kind:"+kind]});
9✔
102

103
    txn.get(uidSequenceKey, function(err, entity) {
9✔
104
        if (err) {
9!
105
            callback(null, "Error loading data entity: " + err);
×
106
            return;
×
107
        }
108

109
        var data;
110
        if (entity === undefined) {
9✔
111
            // create it
112
            entity = {
3✔
113
                key: uidSequenceKey,
114
                data: {
115
                    nextUid: 1
116
                }
117
            };
118
            data = entity.data;
3✔
119
        }
120
        else {
121
            data = entity;
6✔
122
            entity = {
6✔
123
                key: uidSequenceKey,
124
                data: data
125
            };
126
        }
127

128
        var uid = data.nextUid;
9✔
129
        data.nextUid++;
9✔
130

131
        txn.save(entity);
9✔
132

133
        callback(uid);
9✔
134
    });
135
};
136

137
var assignUidIfNecessary = function(type, name, callback) {
1✔
138
    uidKind(type, function(kind, err) {
60✔
139
        if (err) {
60!
140
            callback(null, err);
×
141
            return;
×
142
        }
143

144
        var txn = datastore.transaction();
60✔
145

146
        txn.run(function (err) {
60✔
147

148
            if (err) {
60!
149
                callback(null, 'Error creating transaction: '+ err);
×
150
                return;
×
151
            }
152

153
            var byNameKey = datastore.key({namespace:config.namespace,path:[kind, "name:"+name]});
60✔
154

155
            txn.get(byNameKey, function(err, entity) {
60✔
156
                if (err) {
60!
157
                    callback(null, "Error loading data entity: "+ err);
×
158
                    return;
×
159
                }
160
                else if (entity !== undefined) {
60✔
161
                    callback(entity.uid);
51✔
162
                    return;
51✔
163
                }
164

165
                nextUid(txn, kind, function(uid, err) {
9✔
166
                    if (err) {
9!
167
                        callback(null, 'failed to assign uid: '+ err);
×
168
                        return;
×
169
                    }
170

171
                    var uidString = lpad(uid.toString(16), '0', config[type+"_uid_bytes"]*2);
9✔
172
                    var byIdKey = datastore.key({namespace:config.namespace,path:[kind, "id:"+uidString]});
9✔
173

174
                    var byNameEntity = {
9✔
175
                        key: byNameKey,
176
                        data: {
177
                            uid: uidString,
178
                            name: name
179
                        }
180
                    };
181
                    var byIdEntity = {
9✔
182
                        key: byIdKey,
183
                        data: {
184
                            uid: uidString,
185
                            name: name
186
                        }
187
                    };
188

189
                    txn.save(byNameEntity);
9✔
190
                    txn.save(byIdEntity);
9✔
191

192
                    txn.commit(function (err) {
9✔
193
                        if (err) {
9!
194
                            callback(null, 'failed to assign uid: '+ err);
×
195
                        }
196
                        else {
197
                            callback(uidString);
9✔
198
                        }
199
                    });
200
                });
201
            });
202
        });
203
    });
204
};
205

206
var suggest = function(entity, prefix, max, callback) {
1✔
207
    if (!prefix) {
6✔
208
        prefix = "";
3✔
209
    }
210
    var query = datastore
6✔
211
        .createQuery(config.namespace, entity);
212
    if (prefix !== "") {
6✔
213
        query = query
3✔
214
            .filter("__key__", ">=", datastore.key({namespace:config.namespace,path:[entity, "name:"+prefix]}))
215
            // todo: fudgeroo!! would be better to increment the last character by one
216
            .filter("__key__", "<=", datastore.key({namespace:config.namespace,path:[entity, "name:"+prefix+"zzzzzzzzzzzzzzzzzz"]}))
217
    }
218
    else {
219
        // exclude id: keys
220
        query = query
3✔
221
            .filter("__key__", ">=", datastore.key({namespace:config.namespace,path:[entity, "name:"]}))
222
    }
223
    query = query.order("__key__");
6✔
224

225
    if (max) {
6!
226
        query = query.limit(max);
×
227
    }
228

229
    datastore
6✔
230
        .runQuery(query, function(err, entities) {
231
            if (err) {
6!
232
                callback(null, err);
×
233
            }
234
            else {
235
                if (entities.length > 0) {
6!
236
                    var ret = entities.map(function (uid) {
6✔
237
                        return uid.name;
13✔
238
                    });
239
                    callback(ret);
6✔
240
                }
241
                else {
242
                    callback([]);
×
243
                }
244
            }
245
        });
246
};
247

248
// on backend api
249
backend.suggestMetrics = function(prefix, max, callback) {
1✔
250
    suggest("metric_uid", prefix, max, callback);
2✔
251
};
252

253
// on backend api
254
backend.suggestTagKeys = function(prefix, max, callback) {
1✔
255
    suggest("tagk_uid", prefix, max, callback);
2✔
256
};
257

258
// on backend api
259
backend.suggestTagValues = function(prefix, max, callback) {
1✔
260
    suggest("tagv_uid", prefix, max, callback);
2✔
261
};
262

263
var withMetricAndTagUids = function(txn, metric, incomingTags, callback) {
1✔
264
    if (config.verbose) {
12!
265
        console.log("withMetricAndTagUids(txn, \"" + metric + "\", " + JSON.stringify(incomingTags) + ", callback)");
12✔
266
    }
267
    var metricUidCallback = function(metricUid, err) {
12✔
268
        if (config.verbose) {
12!
269
            console.log("metricUidCallback: " + JSON.stringify(metricUid));
12✔
270
        }
271
        if (err) {
12!
272
            callback(null, null, null, err);
×
273
        }
274
        else {
275
            // now for tags
276
            var tagks = [];
12✔
277
            for (var tagk in incomingTags) {
12✔
278
                if (incomingTags.hasOwnProperty(tagk)) {
24!
279
                    tagks.push(tagk);
24✔
280
                }
281
            }
282

283
            var tagkUids = [];
12✔
284
            var tags = {};
12✔
285
            var hadError = false;
12✔
286
            var processNextTag = function (t) {
12✔
287
                if (t < tagks.length && !hadError) {
36✔
288
                    var tagk = tagks[t];
24✔
289
                    var tagv = incomingTags[tagk];
24✔
290

291
                    var tagvCallback = function (tagk_uid, tagv_uid, err) {
24✔
292
                        if (err) {
24!
293
                            hadError = true;
×
294
                            callback(null, null, null, err);
×
295
                        }
296
                        else {
297
                            tagkUids.push(tagk_uid);
24✔
298
                            tags[tagk_uid] = tagv_uid;
24✔
299
                            if (!hadError) {
24!
300
                                processNextTag(t + 1);
24✔
301
                            }
302
                        }
303
                    };
304
                    var tagkCallback = function (tagk_uid, err) {
24✔
305
                        if (err) {
24!
306
                            hadError = true;
×
307
                            callback(null, null, null, err);
×
308
                        }
309
                        else {
310
                            assignUidIfNecessary("tagv", tagv, function(tagv_uid, err) {
24✔
311
                                tagvCallback(tagk_uid, tagv_uid, err);
24✔
312
                            });
313
                        }
314
                    };
315
                    assignUidIfNecessary("tagk", tagk, tagkCallback);
24✔
316
                }
317
                else {
318
                    if (!hadError) {
12!
319
                        tagkUids.sort();
12✔
320
                        var tagUidString = "";
12✔
321
                        var tagUidArray = [];
12✔
322
                        for (var t = 0; t < tagkUids.length; t++) {
12✔
323
                            var tagk_uid = tagkUids[t];
24✔
324
                            tagUidString += tagk_uid + tags[tagk_uid];
24✔
325
                            tagUidArray.push(tagk_uid);
24✔
326
                            tagUidArray.push(tags[tagk_uid]);
24✔
327
                        }
328

329
                        callback(metricUid, tagUidString, tagUidArray, null);
12✔
330
                    }
331
                }
332
            };
333
            processNextTag(0);
12✔
334
        }
335
    };
336
    assignUidIfNecessary("metric", metric, metricUidCallback);
12✔
337
};
338

339
var dataRowKey = function(metricUid, hour, tagUidString) {
1✔
340
    var hString = lpad(hour.toString(16), ' ', 4);
36✔
341
    return metricUid + hString + tagUidString;
36✔
342
};
343

344
// on backend api
345
backend.storePoints = function(points, storePointsCallback) {
1✔
346
    // start simple, process in series, single transaction for each point, read entity for timeseries/hour, update entity, write down
347
    var errors = new Array(points.length);
2✔
348
    if (points.length === 0) {
2!
349
        storePointsCallback([]);
×
350
    }
351
    var storePoint = function(pointIndex) {
2✔
352
        var point = points[pointIndex];
12✔
353
        var errorMessage = undefined;
12✔
354

355
        if (config.verbose) {
12!
356
            console.log("Processing point write " + pointIndex + ": " + JSON.stringify(point));
12✔
357
        }
358
        if (pointIndex >= points.length) {
12!
359
            throw "wat?"
×
360
        }
361

362
        var txn = datastore.transaction();
12✔
363
        txn.run(function (err) {
12✔
364
            if (err) {
12!
365
                errors[pointIndex] = err;
×
366
                if (pointIndex >= points.length - 1) {
×
367
                    storePointsCallback(errors);
×
368
                }
369
                else {
370
                    storePoint(pointIndex + 1);
×
371
                }
372
            }
373
            else {
374
                var timestamp = point.timestamp;
12✔
375
                var ms = timestamp > 10000000000;
12✔
376
                var hour = Math.floor(ms ? timestamp / 86400000 : timestamp / 86400)
12!
377
                // force offset to ms
378
                var offsetFromHour = ms ? timestamp % 86400000 : (timestamp % 86400) * 1000;
12!
379

380
                var uidCallback = function(metricUid, tagUidString, tagUidArray, err) {
12✔
381
                    if (config.verbose) {
12!
382
                        console.log("uidCallback("+metricUid+","+tagUidString+","+JSON.stringify(tagUidArray)+","+err+")");
12✔
383
                    }
384
                    if (err) {
12!
385
                        errors.push(errorMessage);
×
386
                        return;
×
387
                    }
388
                    else {
389
                        var rowId = dataRowKey(metricUid, hour, tagUidString);
12✔
390
                    }
391
                    var rowKey = datastore.key({namespace:config.namespace,path:["data", rowId]});
12✔
392
                    var row = undefined;
12✔
393

394
                    var processCommitResult = function(err) {
12✔
395
                        if (err) {
12!
396
                            errorMessage = "Error saving entity: " + err;
×
397
                        }
398
                        errors[pointIndex] = errorMessage;
12✔
399
                        if (pointIndex >= points.length - 1) {
12✔
400
                            storePointsCallback(errors);
2✔
401
                        }
402
                        else {
403
                            storePoint(pointIndex + 1);
10✔
404
                        }
405
                    };
406

407
                    txn.get(rowKey, function (err, entity, info) {
12✔
408
                        if (err) {
12!
409
                            processCommitResult("Error loading data entity: " + err);
×
410
                            return;
×
411
                        }
412
                        row = entity;
12✔
413

414
                        if (config.verbose) {
12!
415
                            console.log("New data row? "+(row === undefined));
12✔
416
                        }
417
                        var data;
418
                        if (row === undefined) {
12✔
419
                            // create it
420
                            row = {
6✔
421
                                key: rowKey,
422
                                data: {
423
                                    tags: tagUidArray
424
                                }
425
                            };
426
                            data = row.data;
6✔
427
                        }
428
                        else {
429
                            data = row;
6✔
430
                            row = {
6✔
431
                                key: rowKey,
432
                                data: data
433
                            };
434

435
                        }
436

437
                        data[offsetFromHour.toString()] = Number(point.value);
12✔
438

439
                        txn.save(row);
12✔
440

441
                        try {
12✔
442
                            if (config.verbose) {
12!
443
                                console.log("committing write on "+rowKey);
12✔
444
                            }
445
                            txn.commit(processCommitResult);
12✔
446
                        }
447
                        catch (err) {
448
                            processCommitResult(err);
×
449
                        }
450
                    });
451
                };
452
                withMetricAndTagUids(txn, point.metric, point.tags, uidCallback);
12✔
453
            }
454
        });
455
    };
456
    storePoint(0);
2✔
457
};
458

459
// on backend api
460
backend.searchLookupImpl = function(query, limit, useMeta, callback) {
1✔
461
    /*
462
    if (useMeta) {
463
        callback([]); // todo: not supported yet
464
    }
465
    */
466
    // so we need to look through all the row keys for the given metric (or all row keys if no metric given)
467
    // for each on we find the tags and then filter against the list of tag in the query
468
    var tagks = {};
3✔
469
    var disconnectedTagvs = {};
3✔
470
    for (var i=0; i<query.tags.length; i++) {
3✔
471
        if (query.tags[i].key !== "*") {
2✔
472
            var arr = tagks[query.tags[i].key];
1✔
473
            if (arr == null) {
1!
474
                arr = [];
1✔
475
                tagks[query.tags[i].key] = arr;
1✔
476
            }
477
            if (query.tags[i].value === "*") {
1!
478
                tagks[query.tags[i].key] = ["*"];
1✔
479
            }
480
            else {
481
                if (tagks[query.tags[i].key].length === 0 || tagks[query.tags[i].key][0] !== "*") {
×
482
                    tagks[query.tags[i].key].push(query.tags[i].value);
×
483
                }
484
            }
485
        }
486
        else {
487
            disconnectedTagvs[query.tags[i].value] = query.tags[i].value;
1✔
488
        }
489
    }
490

491
    var findTimeSeries = function(metricUid, callback) {
3✔
492
        var startRowKey = metricUid != null ? dataRowKey(metricUid.uid, 0, "") : dataRowKey(lpad("", "0", config.metric_uid_bytes*2), 0, "");
3✔
493
        var endRowKey = metricUid != null ? dataRowKey(metricUid.uid, Math.pow(256, config.metric_uid_bytes)-1, "") : dataRowKey(lpad("", "z", config.metric_uid_bytes*2), 0, "");
3✔
494

495
        var query = datastore
3✔
496
            .createQuery(config.namespace, "data")
497
            .filter("__key__", ">=", datastore.key({namespace:config.namespace,path:["data", startRowKey]}))
498
            .filter("__key__", "<=", datastore.key({namespace:config.namespace,path:["data", endRowKey]}));
499
        if (limit) {
3!
500
            query = query.limit(limit);
3✔
501
        }
502

503
        var processRawRows = function(rawRows) {
3✔
504
            // now we need to find all the metricUids, all the tagk uids and all the tagv uids, convert them all to names and then remap everything
505
            var metric_uid_set = {};
3✔
506
            var metric_uids = [];
3✔
507
            var tagk_uids = [];
3✔
508
            var tagk_uid_set = {};
3✔
509
            var tagv_uids = [];
3✔
510
            var tagv_uid_set = {};
3✔
511

512
            for (var r=0; r<rawRows.length; r++) {
3✔
513
                var metric_uid = rawRows[r].metric;
8✔
514
                if (!(metric_uid in metric_uid_set)) {
8✔
515
                    metric_uid_set[metric_uid] = metric_uid;
5✔
516
                    metric_uids.push(metric_uid);
5✔
517
                }
518

519
                for (var t=0; t<rawRows[r].tags.length; t+=2) {
8✔
520
                    var tagk_uid = rawRows[r].tags[t];
16✔
521
                    var tagv_uid = rawRows[r].tags[t+1];
16✔
522

523
                    if (!(tagk_uid in tagk_uid_set)) {
16✔
524
                        tagk_uid_set[tagk_uid] = tagk_uid;
8✔
525
                        tagk_uids.push(tagk_uid);
8✔
526
                    }
527

528
                    if (!(tagv_uid in tagv_uid_set)) {
16✔
529
                        tagv_uid_set[tagv_uid] = tagv_uid;
11✔
530
                        tagv_uids.push(tagv_uid);
11✔
531
                    }
532
                }
533
            }
534

535
            reverseMapUids(metric_uids, tagk_uids, tagv_uids, function(metricUidMetaByUidMap, tagkUidMetaByUidMap, tagvUidMetaByUidMap, err) {
3✔
536
                if (err) {
3!
537
                    callback(null, err);
×
538
                }
539
                for (var r=0; r<rawRows.length; r++) {
3✔
540
                    var metric_uid = rawRows[r].metric;
8✔
541
                    rawRows[r].metric = metricUidMetaByUidMap[metric_uid].name;
8✔
542

543
                    var newTags = {};
8✔
544
                    for (var t=0; t<rawRows[r].tags.length; t+=2) {
8✔
545
                        var tagk_uid = rawRows[r].tags[t];
16✔
546
                        var tagv_uid = rawRows[r].tags[t+1];
16✔
547
                        newTags[tagkUidMetaByUidMap[tagk_uid].name] = tagvUidMetaByUidMap[tagv_uid].name;
16✔
548
                    }
549
                    rawRows[r].tags = newTags;
8✔
550
                }
551

552
                // now we have all the rows with their bits resolved, now we need to filter based on any tagk/tagv in the query
553
                var filteredRows = [];
3✔
554
                for (var r=0; r<rawRows.length; r++) {
3✔
555
                    var row = rawRows[r];
8✔
556

557
                    var removeRow = false;
8✔
558
                    for (var k in tagks) {
8✔
559
                        if (tagks.hasOwnProperty(k)) {
3!
560
                            // todo
561
                        }
562
                    }
563
                    for (var v in disconnectedTagvs) {
8✔
564
                        if (disconnectedTagvs.hasOwnProperty(v)) {
3!
565
                            var foundAMatch = false;
3✔
566
                            for (var rk in row.tags) {
3✔
567
                                if (row.tags.hasOwnProperty(rk)) {
6!
568
                                    var rv = row.tags[rk];
6✔
569
                                    if (rv === v) {
6✔
570
                                        foundAMatch = true;
1✔
571
                                    }
572
                                }
573
                            }
574
                            if (!foundAMatch) {
3✔
575
                                removeRow = true;
2✔
576
                            }
577
                        }
578
                    }
579

580
                    if (!removeRow) {
8✔
581
                        filteredRows.push(row);
6✔
582
                    }
583
                }
584

585
                callback(filteredRows, null);
3✔
586
            });
587
        };
588

589

590
        datastore
3✔
591
            .runQuery(query, function(err, entities) {
592
                if (err) {
3!
593
                    callback(null, err);
×
594
                }
595
                else {
596
                    if (entities.length > 0) {
3!
597
                        var seenTsuids = {};
3✔
598
                        var rowsForDistinctTsuids = [];
3✔
599

600
                        for (var r=0; r<entities.length; r++) {
3✔
601
                            var row = entities[r];
14✔
602

603
                            var key = row[Datastore.KEY].name;
14✔
604
                            if (config.verbose) {
14!
605
                                console.log("ROW KEY: "+key+" (Namespace: "+row[Datastore.KEY].namespace+")");
14✔
606
                            }
607

608
                            var decomposedKey = decomposeRowKey(key);
14✔
609

610
                            if (!seenTsuids.hasOwnProperty(decomposedKey.tsuid)) {
14✔
611
                                // {metric:'some.metric',tags:{host:"host1"}, tsuid: "000001000001000001"}
612
                                rowsForDistinctTsuids.push({
8✔
613
                                    metric: decomposedKey.metricUid,
614
                                    tags: row.tags,
615
                                    tsuid: decomposedKey.tsuid
616
                                });
617
                                seenTsuids[decomposedKey.tsuid] = decomposedKey.tsuid;
8✔
618
                            }
619
                        }
620
                        processRawRows(rowsForDistinctTsuids);
3✔
621
                    }
622
                    else {
623
                        callback([]);
×
624
                    }
625
                }
626
            });
627

628
    };
629

630
    if (query.metric) {
3✔
631
        uidMetaFrom("metric", "name:"+query.metric, function(uid, err) {
1✔
632
            if (err) {
1!
633
                callback(null, err);
×
634
            }
635
            else {
636
                findTimeSeries(uid, callback);
1✔
637
            }
638

639
        });
640
    }
641
    else {
642
        findTimeSeries(null, callback);
2✔
643
    }
644

645

646

647
};
648

649
// on backend api
650
backend.storeAnnotations = function(annotations, storeAnnotationsCallback) {
1✔
651
    // start simple, process in series, single transaction for each, read entity for timeseries/hour, update entity, write down
652
    var errors = new Array(annotations.length);
3✔
653
    if (annotations.length === 0) {
3!
654
        storeAnnotationsCallback([]);
×
655
    }
656
    var storeAnnotation = function(annotationIndex) {
3✔
657
        var annotation = annotations[annotationIndex];
4✔
658
        var errorMessage = undefined;
4✔
659

660
        if (config.verbose) {
4!
661
            console.log("Processing annotation " + annotationIndex + ": " + JSON.stringify(annotation));
4✔
662
        }
663
        if (annotationIndex >= annotations.length) {
4!
664
            throw "wat?"
×
665
        }
666

667
        var txn = datastore.transaction();
4✔
668
        txn.run(function (err) {
4✔
669
            if (err) {
4!
670
                errors[annotationIndex] = err;
×
671
                if (annotationIndex >= annotations.length - 1) {
×
672
                    storeAnnotationsCallback(errors);
×
673
                }
674
                else {
675
                    storeAnnotation(annotationIndex + 1);
×
676
                }
677
            }
678
            else {
679
                var timestamp = annotation.startTime;
4✔
680
                var ms = timestamp > 10000000000;
4✔
681
                var hour = Math.floor(ms ? timestamp / 86400000 : timestamp / 86400);
4!
682
                // force offset to ms
683
                var offsetFromHour = ms ? timestamp % 86400000 : (timestamp % 86400) * 1000;
4!
684

685
                var tagk_string_len = config.tagk_uid_bytes*2;
4✔
686
                var tagv_string_len = config.tagv_uid_bytes*2;
4✔
687
                var tsuid = annotation.tsuid;
4✔
688
                var metricUid = lpad("", "0", config.metric_uid_bytes*2), tagUidString = "", tagUidArray = [];
4✔
689
                if (tsuid) {
4✔
690
                    metricUid = tsuid.substring(0, config.metric_uid_bytes*2);
3✔
691
                    tagUidString = tsuid.substring(metricUid.length);
3✔
692
                    for (var i=0; i<tagUidString.length; i+=(tagk_string_len+tagv_string_len)) {
3✔
693
                        var kString = tagUidString.substring(i,i+tagk_string_len);
6✔
694
                        var vString = tagUidString.substring(i+tagk_string_len, i+tagk_string_len+tagv_string_len);
6✔
695
                        tagUidArray.push(kString);
6✔
696
                        tagUidArray.push(vString);
6✔
697
                    }
698
                }
699

700
                var rowId = dataRowKey(metricUid, hour, tagUidString);
4✔
701
                var rowKey = datastore.key({namespace:config.namespace,path:["ann", rowId]});
4✔
702
                var row = undefined;
4✔
703

704
                var processCommitResult = function(err) {
4✔
705
                    if (err) {
4!
706
                        errorMessage = "Error saving entity: " + err;
×
707
                    }
708
                    errors[annotationIndex] = errorMessage;
4✔
709
                    if (annotationIndex >= annotations.length - 1) {
4✔
710
                        storeAnnotationsCallback(errors);
3✔
711
                    }
712
                    else {
713
                        storeAnnotation(annotationIndex + 1);
1✔
714
                    }
715
                };
716

717
                txn.get(rowKey, function (err, entity, info) {
4✔
718
                    if (err) {
4!
719
                        processCommitResult("Error loading data entity: " + err);
×
720
                        return;
×
721
                    }
722
                    row = entity;
4✔
723

724
                    if (config.verbose) {
4!
725
                        console.log("New data row? "+(row === undefined));
4✔
726
                    }
727
                    var data;
728
                    if (row === undefined) {
4✔
729
                        // create it
730
                        row = {
3✔
731
                            key: rowKey,
732
                            data: {
733
                                tags: tagUidArray
734
                            }
735
                        };
736
                        data = row.data;
3✔
737
                    }
738
                    else {
739
                        data = row;
1✔
740
                        row = {
1✔
741
                            key: rowKey,
742
                            data: data
743
                        };
744

745
                    }
746

747
                    data[offsetFromHour.toString()] = annotation;
4✔
748

749
                    txn.save(row);
4✔
750

751
                    try {
4✔
752
                        txn.commit(processCommitResult);
4✔
753
                    }
754
                    catch (err) {
755
                        processCommitResult(err);
×
756
                    }
757
                });
758
            }
759
        });
760
    };
761
    storeAnnotation(0);
3✔
762
};
763

764
// on backend api
765
backend.deleteAnnotation = function(annotation, callback) {
1✔
766
    callback(null); // todo: not supported yet
×
767
};
768

769
// on backend api
770
backend.performAnnotationsQueries = function(startTime, endTime, downsampleSeconds, participatingTimeSeries, callback) {
1✔
771
    // first we can map the time series to a set of row keys for possible annotations
772
    var startHour = Math.floor(startTime.getTime() / 86400000);
3✔
773
    // var startOffset = startTime.getTime() % 86400000;
774
    var endHour = Math.floor(endTime.getTime() / 86400000);
3✔
775
    // var endOffset = endTime.getTime() % 86400000;
776

777
    var allKeys = [];
3✔
778
    for (var t=0; t<participatingTimeSeries.length; t++) {
3✔
779
        for (var h=startHour; h<=endHour; h++) {
4✔
780
            var metricUid = participatingTimeSeries[t].metric_uid;
4✔
781
            var keyId = dataRowKey(metricUid, h, participatingTimeSeries[t].tsuid.substring(metricUid.length));
4✔
782
            allKeys.push(datastore.key({namespace:config.namespace,path:["ann", keyId]}));
4✔
783
        }
784
    }
785

786
    datastore.get(allKeys, function(err, entities) {
3✔
787
        if (err) {
3!
788
            callback(null, err);
×
789
        }
790
        else {
791

792
            // var metricUidLength = config.metric_uid_bytes*2;
793
            // var hourLength = 4;
794
            // var tagkUidLength = config.tagk_uid_bytes*2;
795
            // var tagvUidLength = config.tagv_uid_bytes*2;
796

797
            var ret = [];
3✔
798
            for (var i=0; i<entities.length; i++) {
3✔
799
                var entity = entities[i];
4✔
800
                if (config.verbose) {
4!
801
                    var key = entity[Datastore.KEY].name;
4✔
802
                    console.log("ANN ROW: "+key+" = "+JSON.stringify(entity));
4✔
803
                }
804

805
                for (var offset in entity) {
4✔
806
                    if (entity.hasOwnProperty(offset) && offset !== "tags" && entity[offset].hasOwnProperty("startTime")) {
10✔
807
                        var annStartTime = entity[offset].startTime;
6✔
808
                        var ms = annStartTime > 10000000000;
6✔
809
                        var desiredStartTime = ms ? startTime.getTime() : startTime.getTime() / 1000;
6!
810
                        var desiredEndTime = ms ? endTime.getTime() : endTime.getTime() / 1000;
6!
811

812
                        if (annStartTime >= desiredStartTime && annStartTime <= desiredEndTime) {
6!
813
                            ret.push(entity[offset]);
6✔
814
                        }
815
                    }
816
                }
817

818
            }
819
            callback(ret, null);
3✔
820
        }
821
    });
822

823

824
};
825

826
/*
827
data is Array of {
828
 *                     tsuid:String,
829
 *                     description:String,
830
 *                     notes:String,
831
 *                     custom:Map,
832
 *                     startTime:Date,
833
 *                     endTime:Date
834
 *                   }
835
 */
836

837
// on backend api
838
backend.performGlobalAnnotationsQuery = function(startTime, endTime, callback) {
1✔
839
    //function(startTime, endTime, downsampleSeconds, participatingTimeSeries, callback)
840
    var participatingTimeSeries = [
1✔
841
        {
842
            metric_uid: lpad("", "0", config.metric_uid_bytes*2),
843
            tsuid: ""
844
        }
845
    ];
846
    backend.performAnnotationsQueries(startTime, endTime, null, participatingTimeSeries, callback);
1✔
847
};
848

849
var decomposeRowKey = function(rowKey) {
1✔
850
    var metricUidLength = config.metric_uid_bytes*2;
25✔
851
    var hourLength = 4;
25✔
852
    // var tagkUidLength = config.tagk_uid_bytes*2;
853
    // var tagvUidLength = config.tagv_uid_bytes*2;
854

855
    var metricUidString = rowKey.substring(0, metricUidLength);
25✔
856
    var tagUidString = rowKey.substring(metricUidLength+hourLength);
25✔
857
    var hourString = rowKey.substring(metricUidLength, metricUidLength+hourLength);
25✔
858

859
    return {
25✔
860
        metricUid: metricUidString,
861
        tagUidString: tagUidString,
862
        tsuid: metricUidString + tagUidString,
863
        hour: parseInt(hourString, 16)
864
    };
865
};
866

867
// callback = function(metricUidMetaByUidMap, tagkUidMetaByUidMap, tagvUidMetaByUidMap, err)
868
var reverseMapUids = function(metric_uids, tagk_uids, tagv_uids, callback) {
1✔
869

870
    var tagvsCallback = function (metricMetas, tagkMetas, tagvMetas, err) {
8✔
871
        if (err) {
8!
872
            callback(null, null, null, err);
×
873
            return;
×
874
        }
875
        if (config.verbose) {
8!
876
            console.log("tagvMetas = " + JSON.stringify(tagvMetas));
8✔
877
        }
878

879
        callback(metricMetas, tagkMetas, tagvMetas, null);
8✔
880
    };
881

882
    var tagksCallback = function(metricMetas, tagkMetas, err) {
8✔
883
        if (err) {
8!
884
            callback(null, null, null, err);
×
885
            return;
×
886
        }
887
        if (config.verbose) {
8!
888
            console.log("tagkMetas = " + JSON.stringify(tagkMetas));
8✔
889
        }
890

891
        multiUidMetaFromUid("tagv", tagv_uids, function(tagvMetas, err) { tagvsCallback(metricMetas, tagkMetas, tagvMetas, err); });
8✔
892
    };
893

894
    var metricsCallback = function(metricMetas, err) {
8✔
895
        if (err) {
3!
896
            callback(null, null, null, err);
×
897
            return;
×
898
        }
899
        if (config.verbose) {
3!
900
            console.log("metricMetas = " + JSON.stringify(metricMetas));
3✔
901
        }
902

903
        multiUidMetaFromUid("tagk", tagk_uids, function(tagkMetas, err) { tagksCallback(metricMetas, tagkMetas, err); });
3✔
904
    };
905

906
    if (metric_uids.length > 0) {
8✔
907
        multiUidMetaFromUid("metric", metric_uids, metricsCallback);
3✔
908
    }
909
    else {
910
        multiUidMetaFromUid("tagk", tagk_uids, function(tagkMetas, err) { tagksCallback({}, tagkMetas, err); });
5✔
911
    }
912
};
913

914
// on backend api
915
backend.performBackendQueries = function(startTime, endTime, downsample, metric, filters, callback) {
1✔
916
    if (config.verbose) {
5!
917
        console.log("performBackendQueries(" + startTime + "," + endTime + ", " + downsample + ", " + metric + ", " + filters + ")");
5✔
918
    }
919

920

921
    uidMetaFromName("metric", metric, function(metricUid, err) {
5✔
922
        if (err) {
5!
923
            callback(null, err);
×
924
            return;
×
925
        }
926
        // metric not known
927
        if (metricUid === undefined) {
5!
928
            callback(null, "Metric "+metric+" not known");
×
929
            return;
×
930
        }
931
        // now run a query from metricUid+startHour inclusive to metricUid+(endHour+1) exclusive
932
        var startHour = Math.floor(startTime.getTime() / 86400000);
5✔
933
        var startOffset = startTime.getTime() % 86400000;
5✔
934
        var endHour = Math.floor(endTime.getTime() / 86400000);
5✔
935
        var endOffset = endTime.getTime() % 86400000;
5✔
936

937
        var timeFilterFunction = function(h) {
5✔
938
            if (h !== startHour && h !== endHour) {
11✔
939
                return function() {return true;}
2✔
940
            }
941
            else if (h === startHour && h === endHour) {
9✔
942
                // filter both ends
943
                return function(offset) {
7✔
944
                    return offset >= startOffset && offset <= endOffset;
21✔
945
                }
946
            }
947
            else if (h === startHour) {
2✔
948
                return function(offset) {
1✔
949
                    return offset >= startOffset;
3✔
950
                }
951
            }
952
            else { //if (h == endHour) {
953
                return function(offset) {
1✔
954
                    return offset <= endOffset;
1✔
955
                }
956
            }
957
        };
958

959
        var startKey = datastore.key({namespace:config.namespace,path:["data", dataRowKey(metricUid.uid, startHour, "")]});
5✔
960
        var endKey = datastore.key({namespace:config.namespace,path:["data", dataRowKey(metricUid.uid, endHour+1, "")]});
5✔
961
        if (config.verbose) {
5!
962
            console.log("Finding data rows where " + JSON.stringify(startKey) + " <= __key__ < " + JSON.stringify(endKey));
5✔
963
        }
964

965

966

967
        var processRawRows = function(rows) {
5✔
968
            // each item in rows has a tag string and some dps, should be in order so we can detect switches of tag string
969
            var allTagks = {};
5✔
970
            var allTagvs = {};
5✔
971
            for (var r=0; r<rows.length; r++) {
5✔
972
                for (var t=0; t<rows[r].tags.length; t+=2) {
11✔
973
                    var tagk = rows[r].tags[t];
22✔
974
                    var tagv = rows[r].tags[t+1];
22✔
975
                    allTagks[tagk] = tagk;
22✔
976
                    allTagvs[tagv] = tagv;
22✔
977
                }
978
            }
979

980
            var tagk_uids = [];
5✔
981
            for (var k in allTagks) {
5✔
982
                if (allTagks.hasOwnProperty(k)) {
10!
983
                    tagk_uids.push(k);
10✔
984
                }
985
            }
986
            if (config.verbose) {
5!
987
                console.log("tagk_uids = "+JSON.stringify(tagk_uids));
5✔
988
            }
989

990
            var tagv_uids = [];
5✔
991
            for (var v in allTagvs) {
5✔
992
                if (allTagvs.hasOwnProperty(v)) {
13!
993
                    tagv_uids.push(v);
13✔
994
                }
995
            }
996
            if (config.verbose) {
5!
997
                console.log("tagv_uids = "+JSON.stringify(tagv_uids));
5✔
998
            }
999

1000
            reverseMapUids([], tagk_uids, tagv_uids, function(metricMetas, tagkMetas, tagvMetas, err) {
5✔
1001
                var ret = [];
5✔
1002

1003
                var lastUidString = undefined;
5✔
1004
                var currentDps = [];
5✔
1005
                var currentTags = {};
5✔
1006

1007
                for (var r=0; r<rows.length; r++) {
5✔
1008
                    if (lastUidString !== undefined && rows[r].tag_uids !== lastUidString) {
11✔
1009
                        ret.push({
3✔
1010
                            metric: metric,
1011
                            metric_uid: metricUid.uid,
1012
                            tags: currentTags,
1013
                            tsuid: metricUid.uid+lastUidString,
1014
                            dps: currentDps
1015
                        });
1016
                        lastUidString = undefined;
3✔
1017
                    }
1018

1019
                    if (lastUidString === undefined) {
11✔
1020
                        // reset
1021
                        currentDps = [];
8✔
1022
                        var tags = {};
8✔
1023
                        for (var t=0; t<rows[r].tags.length; t+=2) {
8✔
1024
                            var k_uid = rows[r].tags[t];
16✔
1025
                            var v_uid = rows[r].tags[t+1];
16✔
1026
                            if (config.verbose && !tagkMetas.hasOwnProperty(k_uid)) {
16!
1027
                                console.log("Couldn't find a tagk meta for uid: "+k_uid);
×
1028
                            }
1029
                            var k = tagkMetas[k_uid].name;
16✔
1030
                            tags[k] = {
16✔
1031
                                tagk: k,
1032
                                tagk_uid: k_uid,
1033
                                tagv: tagvMetas[v_uid].name,
1034
                                tagv_uid: v_uid
1035
                            };
1036
                        }
1037
                        currentTags = tags;
8✔
1038
                        lastUidString = rows[r].tag_uids;
8✔
1039
                    }
1040

1041
                    currentDps = currentDps.concat(rows[r].dps);
11✔
1042
                }
1043
                // push the last one
1044
                ret.push({
5✔
1045
                    metric: metric,
1046
                    metric_uid: metricUid.uid,
1047
                    tags: currentTags,
1048
                    tsuid: metricUid.uid+lastUidString,
1049
                    dps: currentDps
1050
                });
1051

1052
                if (config.verbose) {
5!
1053
                    console.log("Calling back to API with ret = "+JSON.stringify(ret));
5✔
1054
                }
1055
                callback(ret, null);
5✔
1056
            });
1057
        };
1058

1059
        var runQuery = function(cursor, rows) {
5✔
1060
            if (config.verbose) {
10!
1061
                console.log("Running query with start cursor: " + cursor);
10✔
1062
            }
1063
            var query = datastore
10✔
1064
                .createQuery(config.namespace, "data")
1065
                .filter("__key__", ">=", startKey)
1066
                .filter("__key__", "<", endKey)
1067
                .order("__key__");
1068

1069
            if (cursor) {
10✔
1070
                query = query.start(cursor);
5✔
1071
            }
1072

1073
            datastore
10✔
1074
                .runQuery(query, function(err, entities, info) {
1075
                    if (config.verbose) {
10!
1076
                        console.log("query result info: "+JSON.stringify(info));
10✔
1077
                    }
1078
                    if (err) {
10!
1079
                        callback(null, err);
×
1080
                    }
1081
                    else {
1082
                        if (entities.length > 0) {
10✔
1083
                            // entities found
1084
                            var rawRows = entities;
5✔
1085
                            if (config.verbose) {
5!
1086
                                console.log("Found "+rawRows.length+" raw rows");
5✔
1087
                            }
1088

1089
                            for (var r=0; r<rawRows.length; r++) {
5✔
1090
                                if (config.verbose) {
11!
1091
                                    console.log("ROW: "+JSON.stringify(rawRows[r]));
11✔
1092
                                }
1093
                                var row = rawRows[r];
11✔
1094

1095

1096
                                var key = row[Datastore.KEY].name;
11✔
1097
                                if (config.verbose) {
11!
1098
                                    console.log("ROW KEY: "+key+" (Namespace: "+row[Datastore.KEY].namespace+")");
11✔
1099
                                }
1100

1101
                                var decomposedKey = decomposeRowKey(key);
11✔
1102
                                var tagUidString = decomposedKey.tagUidString;
11✔
1103
                                var hour = decomposedKey.hour;
11✔
1104
                                var timeFilter = timeFilterFunction(hour);
11✔
1105

1106
                                var keys = [];
11✔
1107
                                for (var k in row) {
11✔
1108
                                    if (row.hasOwnProperty(k) && k !== "tags") {
38✔
1109
                                        keys.push(Number(k));
27✔
1110
                                    }
1111
                                }
1112
                                keys.sort();
11✔
1113
                                var dps = [];
11✔
1114
                                for (var i=0; i<keys.length; i++) {
11✔
1115
                                    var offset = keys[i];
27✔
1116
                                    // filter the data
1117
                                    if (timeFilter(offset)) {
27✔
1118
                                        var timestamp = (hour * 86400000) + offset;
26✔
1119
                                        dps.push([timestamp, row[String(offset)]]);
26✔
1120
                                    }
1121
                                }
1122
                                var toPush = {tags:row.tags, tag_uids:tagUidString, dps:dps};
11✔
1123
                                if (config.verbose) {
11!
1124
                                    console.log("Pushing row: "+JSON.stringify(toPush));
11✔
1125
                                }
1126
                                rows.push(toPush);
11✔
1127
                            }
1128
                        }
1129
                        else {
1130
                            if (config.verbose) {
5!
1131
                                console.log("Found 0 raw rows");
5✔
1132
                            }
1133
                        }
1134

1135
                        if (info.moreResults !== Datastore.NO_MORE_RESULTS && info.endCursor !== cursor) {
10✔
1136
                            if (config.verbose) {
5!
1137
                                console.log("old cursor: "+cursor+", new cursor: "+info.endCursor);
5✔
1138
                            }
1139
                            runQuery(info.endCursor, rows);
5✔
1140
                        }
1141
                        else {
1142
                            console.log("out of rows, what now?");
5✔
1143
                            processRawRows(rows);
5✔
1144
                        }
1145
                    }
1146
                });
1147
        };
1148
        runQuery(null, []);
5✔
1149
    });
1150
};
1151

1152
var applyOverrides = function(from, to) {
1✔
1153
    for (var k in from) {
2✔
1154
        if (from.hasOwnProperty(k)) {
5!
1155
            if (to.hasOwnProperty(k)) {
5✔
1156
                switch (typeof from[k]) {
4!
1157
                    case 'number':
1158
                    case 'string':
1159
                    case 'boolean':
1160
                        to[k] = from[k];
3✔
1161
                        continue;
3✔
1162
                    default:
1163
                        if (config.verbose) {
1!
1164
                            console.log("unhandled: "+(typeof from[k]));
×
1165
                        }
1166
                }
1167
                applyOverrides(from[k], to[k]);
1✔
1168
            }
1169
            else {
1170
                to[k] = from[k];
1✔
1171
            }
1172
        }
1173
    }
1174
}
1175

1176
var installApiWithBackend = function(app, incomingConfig) {
1✔
1177
    var backend = setupBackend(incomingConfig);
1✔
1178

1179
    api.backend(backend);
1✔
1180
    api.install(app, config);
1✔
1181
};
1182

1183
var setupBackend = function(incomingConfig) {
1✔
1184
    if (!incomingConfig) {
1!
1185
        incomingConfig = {};
×
1186
    }
1187

1188
    var conf = {
1✔
1189
        verbose: false,
1190
        logRequests: true,
1191
        version: "2.2.0",
1192
        metric_uid_bytes: 3,
1193
        tagk_uid_bytes: 3,
1194
        tagv_uid_bytes: 3,
1195
        projectId: 'opentsdb-cloud',
1196
        dataStoreKeyFile: null,
1197
        namespace: null
1198
    };
1199

1200
    applyOverrides(incomingConfig, conf);
1✔
1201

1202
    config = conf;
1✔
1203

1204
    datastore = new Datastore({
1✔
1205
        projectId: config.projectId,
1206
        keyFile: config.dataStoreKeyFile
1207
    });
1208

1209
    return backend;
1✔
1210
};
1211

1212
module.exports = {
1✔
1213
    install: installApiWithBackend,
1214
    backend: setupBackend
1215
};
1216

1217
var runServer = function(conf) {
1✔
1218
    var app = express();
1✔
1219
    installApiWithBackend(app, conf);
1✔
1220

1221
    var server = app.listen(config.port, function() {
1✔
1222
        var host = server.address().address
1✔
1223
        var port = server.address().port
1✔
1224

1225
        console.log('OpenTSDB/GCE running at http://%s:%s', host, port)
1✔
1226
    });
1227
    return server;
1✔
1228
};
1229

1230
// command line running
1231
/* istanbul ignore if */
1232
if (require.main === module) {
1✔
1233
    var conf = {
1234
        port: 4242
1235
    };
1236

1237
    var args = process.argv.slice(2);
1238
    for (var i=0; i<args.length; i++) {
1239
        switch (args[i]) {
1240
            case '-p':
1241
                conf.port = args[++i];
1242
                break;
1243
            case '-v':
1244
                conf.verbose = true;
1245
                break;
1246
            case '-?':
1247
            case '--help':
1248
                console.log("Usage: node index.js [options]");
1249
                console.log(" -p [port] : Specify the port to bind to")
1250
                console.log(" -v        : Verbose logging")
1251
                console.log(" -? --help : Show this help page")
1252
                break;
1253
            default:
1254
                console.error("Unrecognised option: "+args[i]);
1255
        }
1256
    }
1257

1258
    runServer(conf);
1259

1260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc