• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tremor-rs / tremor-runtime / 2058612730

pending completion
2058612730

Pull #1077

github

GitHub
Merge 08ea76008 into 85f4dfb1e
Pull Request #1077: Connectors Implementation

14162 of 14162 new or added lines in 167 files covered. (100.0%)

24031 of 28363 relevant lines covered (84.73%)

4.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.0
/src/connectors/impls/kafka.rs
1
// Copyright 2022, The Tremor Team
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
pub(crate) mod consumer;
15
pub(crate) mod producer;
16

17
use crate::errors::{Error, Kind as ErrorKind, Result};
18
use core::future::Future;
19
use futures::future;
20
use rdkafka::{error::KafkaError, util::AsyncRuntime};
21
use rdkafka_sys::RDKafkaErrorCode;
22
use std::time::{Duration, Instant};
23

24
const KAFKA_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
25

26
pub struct SmolRuntime;
27

28
impl AsyncRuntime for SmolRuntime {
29
    type Delay = future::Map<smol::Timer, fn(Instant)>;
30

31
    fn spawn<T>(task: T)
1✔
32
    where
33
        T: Future<Output = ()> + Send + 'static,
34
    {
35
        // This needs to be smol::spawn we can't use async_std::task::spawn
36
        smol::spawn(task).detach();
1✔
37
    }
38

39
    fn delay_for(duration: Duration) -> Self::Delay {
1✔
40
        // This needs to be smol::Timer we can't use async_io::Timer
41
        futures::FutureExt::map(smol::Timer::after(duration), |_| ())
1✔
42
    }
43
}
44

45
/// verify broker host:port pairs in kafka connector configs
46
fn verify_brokers(id: &str, brokers: &[String]) -> Result<(String, Option<u16>)> {
1✔
47
    let mut first_broker: Option<(String, Option<u16>)> = None;
1✔
48
    for broker in brokers {
3✔
49
        match broker.split(':').collect::<Vec<_>>().as_slice() {
1✔
50
            [host] => {
1✔
51
                first_broker.get_or_insert_with(|| ((*host).to_string(), None));
×
52
            }
53
            [host, port] => {
2✔
54
                let port: u16 = port.parse().map_err(|_| {
1✔
55
                    Error::from(ErrorKind::InvalidConfiguration(
×
56
                        id.to_string(),
×
57
                        format!("Invalid broker: {}:{}", host, port),
×
58
                    ))
59
                })?;
60
                first_broker.get_or_insert_with(|| ((*host).to_string(), Some(port)));
4✔
61
            }
62
            b => {
×
63
                return Err(ErrorKind::InvalidConfiguration(
×
64
                    id.to_string(),
×
65
                    format!("Invalid broker: {}", b.join(":")),
×
66
                )
67
                .into())
68
            }
69
        }
70
    }
71
    first_broker.ok_or_else(|| {
1✔
72
        ErrorKind::InvalidConfiguration(id.to_string(), "Missing brokers.".to_string()).into()
×
73
    })
74
}
75

76
/// Returns `true` if the error denotes a failed connect attempt
77
/// for both consumer and producer
78
fn is_failed_connect_error(err: &KafkaError) -> bool {
1✔
79
    matches!(
2✔
80
        err,
2✔
81
        KafkaError::ClientConfig(_, _, _, _)
82
            | KafkaError::ClientCreation(_)
83
            // TODO: what else?
84
            | KafkaError::Global(RDKafkaErrorCode::UnknownTopicOrPartition | RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::AllBrokersDown)
85
    )
86
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc