• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4280059073

pending completion
4280059073

push

github

GitHub
Bump version (#1069)

27464 of 37850 relevant lines covered (72.56%)

52364.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.11
/dozer-sql/src/pipeline/expression/python_udf.rs
1
use crate::pipeline::errors::PipelineError;
2
use crate::pipeline::errors::PipelineError::UnsupportedSqlError;
3
use crate::pipeline::errors::UnsupportedSqlError::GenericError;
4
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
5
use dozer_types::ordered_float::OrderedFloat;
6
use dozer_types::pyo3::types::PyTuple;
7
use dozer_types::pyo3::Python;
8
use dozer_types::types::{Field, FieldType, Record, Schema};
9
use std::env;
10
use std::path::PathBuf;
11

12
const MODULE_NAME: &str = "python_udf";
13

14
pub fn evaluate_py_udf(
10✔
15
    schema: &Schema,
10✔
16
    name: &str,
10✔
17
    args: &[Expression],
10✔
18
    return_type: &FieldType,
10✔
19
    record: &Record,
10✔
20
) -> Result<Field, PipelineError> {
10✔
21
    let values = args
10✔
22
        .iter()
10✔
23
        .take(args.len() - 1)
10✔
24
        .map(|arg| arg.evaluate(record, schema))
15✔
25
        .collect::<Result<Vec<_>, PipelineError>>()?;
10✔
26

27
    // Get the path of the Python interpreter in your virtual environment
28
    let env_path = env::var("VIRTUAL_ENV").map_err(|_| {
10✔
29
        PipelineError::InvalidFunction("Missing 'VIRTUAL_ENV' environment var".to_string())
×
30
    })?;
10✔
31
    let py_path = format!("{env_path}/bin/python");
10✔
32
    // Set the `PYTHON_SYS_EXECUTABLE` environment variable
10✔
33
    env::set_var("PYTHON_SYS_EXECUTABLE", py_path);
10✔
34

10✔
35
    Python::with_gil(|py| -> Result<Field, PipelineError> {
10✔
36
        // Get the directory containing the module
10✔
37
        let module_dir = PathBuf::from(env_path);
10✔
38
        // Import the `sys` module and append the module directory to the system path
39
        let sys = py.import("sys")?;
10✔
40
        let path = sys.getattr("path")?;
10✔
41
        path.call_method1("append", (module_dir.to_string_lossy(),))?;
10✔
42

43
        let module = py.import(MODULE_NAME)?;
10✔
44
        let function = module.getattr(name)?;
10✔
45

46
        let args = PyTuple::new(py, values);
10✔
47
        let res = function.call1(args)?;
10✔
48

×
49
        Ok(match return_type {
10✔
50
            FieldType::UInt => Field::UInt(res.extract::<u64>()?),
×
51
            FieldType::Int => Field::Int(res.extract::<i64>()?),
×
52
            FieldType::Float => Field::Float(OrderedFloat::from(res.extract::<f64>()?)),
10✔
53
            FieldType::Boolean => Field::Boolean(res.extract::<bool>()?),
×
54
            FieldType::String => Field::String(res.extract::<String>()?),
×
55
            FieldType::Text => Field::Text(res.extract::<String>()?),
×
56
            FieldType::Binary => Field::Binary(res.extract::<Vec<u8>>()?),
×
57
            FieldType::Decimal
58
            | FieldType::Date
59
            | FieldType::Timestamp
60
            | FieldType::Point
61
            | FieldType::Bson => {
×
62
                return Err(UnsupportedSqlError(GenericError(
×
63
                    "Unsupported return type for python udf".to_string(),
×
64
                )))
×
65
            }
66
        })
×
67
    })
10✔
68
}
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc