• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5672512448

pending completion
5672512448

push

github

web-flow
chore: Change `make_from!` in `from_arrow` to func to improve readability (#1792)

31 of 31 new or added lines in 4 files covered. (100.0%)

45630 of 59777 relevant lines covered (76.33%)

38810.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-log-js/src/lib.rs
1
use std::sync::Arc;
×
2

3
use dozer_log::{
4
    reader::{LogReader as RustLogReader, LogReaderBuilder, LogReaderOptions},
5
    tokio::{runtime::Runtime as TokioRuntime, sync::Mutex},
6
};
7
use neon::prelude::*;
8

9
const EXTERNAL_PROPERTY_NAME: &str = "__external__";
10

11
#[derive(Debug, Clone)]
×
12
struct Runtime {
13
    runtime: Arc<TokioRuntime>,
14
    channel: Channel,
15
}
16

17
impl Finalize for Runtime {}
18

19
fn new_runtime(mut cx: FunctionContext) -> JsResult<JsObject> {
×
20
    // Create the object that will be returned.
×
21
    let runtime_object = JsObject::new(&mut cx);
×
22

23
    // Create the runtime and store it in the object.
24
    let runtime = match TokioRuntime::new() {
×
25
        Ok(runtime) => runtime,
×
26
        Err(error) => return cx.throw_error(error.to_string()),
×
27
    };
28
    let channel = Channel::new(&mut cx);
×
29
    let managed_runtime = cx.boxed(Runtime {
×
30
        runtime: Arc::new(runtime),
×
31
        channel,
×
32
    });
×
33
    runtime_object.set(&mut cx, EXTERNAL_PROPERTY_NAME, managed_runtime)?;
×
34

35
    // Create the `create_reader` function.
36
    let create_reader = JsFunction::new(&mut cx, runtime_create_reader)?;
×
37
    runtime_object.set(&mut cx, "create_reader", create_reader)?;
×
38
    Ok(runtime_object)
×
39
}
×
40

41
struct LogReader {
42
    runtime: Runtime,
43
    reader: Arc<Mutex<RustLogReader>>,
44
}
45

46
impl Finalize for LogReader {}
47

48
fn runtime_create_reader(mut cx: FunctionContext) -> JsResult<JsPromise> {
×
49
    // Extract runtime from `this`.
×
50
    let this = cx.this();
×
51
    let runtime_object = this.downcast_or_throw::<JsObject, _>(&mut cx)?;
×
52
    let runtime = runtime_object.get::<JsBox<Runtime>, _, _>(&mut cx, EXTERNAL_PROPERTY_NAME)?;
×
53

54
    // Extract `server_addr` from the first argument.
55
    let server_addr = cx.argument::<JsString>(0)?.value(&mut cx);
×
56

57
    // Extract `endpoint_name` from the second argument.
58
    let endpoint_name = cx.argument::<JsString>(1)?.value(&mut cx);
×
59

×
60
    // Create the reader.
×
61
    let (deferred, promise) = cx.promise();
×
62
    let runtime_for_reader = (**runtime).clone();
×
63
    let channel = runtime.channel.clone();
×
64
    runtime.runtime.spawn(async move {
×
65
        // Create the builder.
66
        let reader_builder =
×
67
            LogReaderBuilder::new(server_addr, LogReaderOptions::new(endpoint_name)).await;
×
68
        match reader_builder {
×
69
            Ok(reader) => {
×
70
                // Create the reader and resolve the promise.
×
71
                let reader = reader.build(0, None);
×
72
                deferred.settle_with(&channel, move |mut cx| {
×
73
                    new_reader(&mut cx, runtime_for_reader, reader)
×
74
                })
×
75
            }
76
            // Resolve the promise on error.
77
            Err(e) => deferred.settle_with(&channel, move |mut cx: TaskContext<'_>| {
×
78
                cx.throw_error::<_, Handle<JsObject>>(e.to_string())
×
79
            }),
×
80
        }
81
    });
×
82
    Ok(promise)
×
83
}
×
84

85
fn new_reader<'a, C: Context<'a>>(
×
86
    cx: &mut C,
×
87
    runtime: Runtime,
×
88
    reader: RustLogReader,
×
89
) -> JsResult<'a, JsObject> {
×
90
    // Create the object that will be returned.
×
91
    let reader_object = JsObject::new(cx);
×
92

×
93
    // Store the reader in the object.
×
94
    let managed_reader = cx.boxed(LogReader {
×
95
        runtime,
×
96
        reader: Arc::new(Mutex::new(reader)),
×
97
    });
×
98
    reader_object.set(cx, EXTERNAL_PROPERTY_NAME, managed_reader)?;
×
99

100
    // Create the `next_op` function.
101
    let next_op = JsFunction::new(cx, reader_next_op)?;
×
102
    reader_object.set(cx, "next_op", next_op)?;
×
103

104
    Ok(reader_object)
×
105
}
×
106

107
fn reader_next_op(mut cx: FunctionContext) -> JsResult<JsPromise> {
×
108
    // Extract reader from `this`.
×
109
    let this = cx.this();
×
110
    let reader_object = this.downcast_or_throw::<JsObject, _>(&mut cx)?;
×
111
    let reader = reader_object.get::<JsBox<LogReader>, _, _>(&mut cx, EXTERNAL_PROPERTY_NAME)?;
×
112

113
    // Create the promise.
114
    let (deferred, promise) = cx.promise();
×
115
    let runtime = &reader.runtime;
×
116
    let channel = runtime.channel.clone();
×
117
    let reader = reader.reader.clone();
×
118
    runtime.runtime.spawn(async move {
×
119
        // Read the next operation.
120
        let mut reader = reader.lock().await;
×
121
        let schema = reader.schema.schema.clone();
×
122
        let op = reader.next_op().await;
×
123

124
        // Resolve the promise.
125
        deferred.settle_with(&channel, move |mut cx| {
×
126
            let op = match op {
×
127
                Ok(op) => op,
×
128
                Err(error) => return cx.throw_error(error.to_string()),
×
129
            }
130
            .0;
131
            mapper::map_executor_operation(op, &schema, &mut cx)
×
132
        });
×
133
    });
×
134
    Ok(promise)
×
135
}
×
136

137
#[neon::main]
×
138
fn main(mut cx: ModuleContext) -> NeonResult<()> {
139
    cx.export_function("Runtime", new_runtime)?;
140
    Ok(())
141
}
142

143
mod mapper;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc