• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 26250451051-1

21 May 2026 08:11PM UTC coverage: 85.585% (-0.1%) from 85.712%
26250451051-1

Pull #7215

github

ec9d4c
web-flow
Merge 9487bf852 into af1280aac
Pull Request #7215: Chore: fix flake in non_blocking_minority_configured_to_favour_...

188844 of 220651 relevant lines covered (85.58%)

18975267.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.16
/stacks-node/src/event_dispatcher.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2026 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use std::collections::hash_map::Entry;
18
use std::collections::{HashMap, HashSet};
19
use std::path::PathBuf;
20
#[cfg(test)]
21
use std::sync::mpsc::channel;
22
#[cfg(test)]
23
use std::sync::LazyLock;
24
use std::sync::{Arc, Mutex};
25
use std::time::{Duration, SystemTime};
26

27
use clarity::vm::costs::ExecutionCost;
28
use clarity::vm::events::{FTEventType, NFTEventType, STXEventType};
29
use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier};
30
#[cfg(any(test, feature = "testing"))]
31
use lazy_static::lazy_static;
32
use serde_json::json;
33
use stacks::burnchains::{PoxConstants, Txid};
34
use stacks::chainstate::burn::ConsensusHash;
35
use stacks::chainstate::coordinator::{BlockEventDispatcher, PoxTransactionReward};
36
use stacks::chainstate::nakamoto::NakamotoBlock;
37
use stacks::chainstate::stacks::address::PoxAddress;
38
use stacks::chainstate::stacks::boot::RewardSetData;
39
use stacks::chainstate::stacks::db::accounts::MinerReward;
40
use stacks::chainstate::stacks::db::unconfirmed::ProcessedUnconfirmedState;
41
use stacks::chainstate::stacks::db::{MinerRewardInfo, StacksHeaderInfo};
42
use stacks::chainstate::stacks::events::{
43
    StackerDBChunksEvent, StacksBlockEventData, StacksTransactionEvent, StacksTransactionReceipt,
44
};
45
use stacks::chainstate::stacks::miner::TransactionEvent;
46
use stacks::chainstate::stacks::{StacksBlock, StacksMicroblock, StacksTransaction};
47
use stacks::config::{EventKeyType, EventObserverConfig};
48
use stacks::core::mempool::{MemPoolDropReason, MemPoolEventDispatcher, ProposalCallbackReceiver};
49
use stacks::libstackerdb::StackerDBChunkData;
50
use stacks::net::api::postblock_proposal::{
51
    BlockValidateOk, BlockValidateReject, BlockValidateResponse,
52
};
53
use stacks::net::atlas::{Attachment, AttachmentInstance};
54
use stacks::net::stackerdb::StackerDBEventDispatcher;
55
#[cfg(any(test, feature = "testing"))]
56
use stacks::util::tests::TestFlag;
57
use stacks_common::bitvec::BitVec;
58
use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId};
59
use url::Url;
60

61
mod db;
62
mod payloads;
63
mod stacker_db;
64
mod worker;
65

66
use db::EventDispatcherDbConnection;
67
use payloads::*;
68
pub use payloads::{
69
    MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent, NakamotoSignerEntryPayload,
70
    RewardSetEventPayload, TransactionEventPayload,
71
};
72
pub use stacker_db::StackerDBChannel;
73

74
use crate::event_dispatcher::db::PendingPayload;
75
use crate::event_dispatcher::worker::{EventDispatcherResult, EventDispatcherWorker};
76

77
#[cfg(test)]
78
mod tests;
79

80
#[cfg(any(test, feature = "testing"))]
81
lazy_static! {
82
    /// Do not announce a signed/mined block to the network when set to true.
83
    pub static ref TEST_SKIP_BLOCK_ANNOUNCEMENT: TestFlag<bool> = TestFlag::default();
84
}
85

86
#[derive(Debug, thiserror::Error)]
87
enum EventDispatcherError {
88
    #[error("Serialization error: {0}")]
89
    SerializationError(#[from] serde_json::Error),
90
    #[error("HTTP error: {0}")]
91
    HttpError(#[from] std::io::Error),
92
    #[error("Database error: {0}")]
93
    DbError(#[from] stacks::util_lib::db::Error),
94
    #[error("Channel receive error: {0}")]
95
    RecvError(#[from] std::sync::mpsc::RecvError),
96
    #[error("Channel send error: {0}")]
97
    SendError(String), // not capturing the underlying because it's a generic type
98
}
99

100
impl<T> From<std::sync::mpsc::SendError<T>> for EventDispatcherError {
101
    fn from(value: std::sync::mpsc::SendError<T>) -> Self {
1✔
102
        EventDispatcherError::SendError(format!("{value}"))
1✔
103
    }
1✔
104
}
105

106
#[derive(Debug, Clone)]
107
struct EventObserver {
108
    /// URL to which events will be sent
109
    endpoint: String,
110
    /// Timeout for sending events to this observer
111
    timeout: Duration,
112
    /// If true, the stacks-node will not retry if event delivery fails for any reason.
113
    /// WARNING: This should not be set on observers that require successful delivery of all events.
114
    disable_retries: bool,
115
}
116

117
/// Update `serve()` in `neon_integrations.rs` with any new paths that need to be tested
118
pub const PATH_MICROBLOCK_SUBMIT: &str = "new_microblocks";
119
pub const PATH_MEMPOOL_TX_SUBMIT: &str = "new_mempool_tx";
120
pub const PATH_MEMPOOL_TX_DROP: &str = "drop_mempool_tx";
121
pub const PATH_MINED_BLOCK: &str = "mined_block";
122
pub const PATH_MINED_MICROBLOCK: &str = "mined_microblock";
123
pub const PATH_MINED_NAKAMOTO_BLOCK: &str = "mined_nakamoto_block";
124
pub const PATH_STACKERDB_CHUNKS: &str = "stackerdb_chunks";
125
pub const PATH_BURN_BLOCK_SUBMIT: &str = "new_burn_block";
126
pub const PATH_BLOCK_PROCESSED: &str = "new_block";
127
pub const PATH_ATTACHMENT_PROCESSED: &str = "attachments/new";
128
pub const PATH_PROPOSAL_RESPONSE: &str = "proposal_response";
129

130
#[cfg(test)]
131
static TEST_EVENT_OBSERVER_SKIP_RETRY: LazyLock<TestFlag<bool>> = LazyLock::new(TestFlag::default);
132

133
impl EventObserver {
134
    fn new(endpoint: String, timeout: Duration, disable_retries: bool) -> Self {
983✔
135
        EventObserver {
983✔
136
            endpoint,
983✔
137
            timeout,
983✔
138
            disable_retries,
983✔
139
        }
983✔
140
    }
983✔
141
}
142

143
struct EventRequestData {
144
    pub url: String,
145
    pub payload_bytes: Arc<[u8]>,
146
    pub timeout: Duration,
147
}
148

149
/// Events received from block-processing.
150
/// Stacks events are structured as JSON, and are grouped by topic.  An event observer can
151
/// subscribe to one or more specific event streams, or the "any" stream to receive all of them.
152
#[derive(Clone)]
153
pub struct EventDispatcher {
154
    /// List of configured event observers to which events will be posted.
155
    /// The fields below this contain indexes into this list.
156
    registered_observers: Vec<EventObserver>,
157
    /// Smart contract-specific events, keyed by (contract-id, event-name). Values are indexes into `registered_observers`.
158
    contract_events_observers_lookup: HashMap<(QualifiedContractIdentifier, String), HashSet<u16>>,
159
    /// Asset event observers, keyed by fully-qualified asset identifier. Values are indexes into
160
    /// `registered_observers.
161
    assets_observers_lookup: HashMap<AssetIdentifier, HashSet<u16>>,
162
    /// Index into `registered_observers` that will receive burn block events
163
    burn_block_observers_lookup: HashSet<u16>,
164
    /// Index into `registered_observers` that will receive mempool events
165
    mempool_observers_lookup: HashSet<u16>,
166
    /// Index into `registered_observers` that will receive microblock events
167
    microblock_observers_lookup: HashSet<u16>,
168
    /// Index into `registered_observers` that will receive STX events
169
    stx_observers_lookup: HashSet<u16>,
170
    /// Index into `registered_observers` that will receive all events
171
    any_event_observers_lookup: HashSet<u16>,
172
    /// Index into `registered_observers` that will receive block miner events (Stacks 2.5 and
173
    /// lower)
174
    miner_observers_lookup: HashSet<u16>,
175
    /// Index into `registered_observers` that will receive microblock miner events (Stacks 2.5 and
176
    /// lower)
177
    mined_microblocks_observers_lookup: HashSet<u16>,
178
    /// Index into `registered_observers` that will receive StackerDB events
179
    stackerdb_observers_lookup: HashSet<u16>,
180
    /// Index into `registered_observers` that will receive block proposal events (Nakamoto and
181
    /// later)
182
    block_proposal_observers_lookup: HashSet<u16>,
183
    /// Channel for sending StackerDB events to the miner coordinator
184
    pub stackerdb_channel: Arc<Mutex<StackerDBChannel>>,
185
    /// Path to the database where pending payloads are stored.
186
    db_path: PathBuf,
187
    /// The worker thread that performs the actual HTTP requests so that they don't block
188
    /// the main operation of the node.
189
    worker: EventDispatcherWorker,
190
}
191

192
/// This struct is used specifically for receiving proposal responses.
193
/// It's constructed separately to play nicely with threading.
194
struct ProposalCallbackHandler {
195
    observers: Vec<EventObserver>,
196
    dispatcher: EventDispatcher,
197
}
198

199
impl ProposalCallbackReceiver for ProposalCallbackHandler {
200
    fn notify_proposal_result(&self, result: Result<BlockValidateOk, BlockValidateReject>) {
3,807✔
201
        let response = match serde_json::to_value(BlockValidateResponse::from(result)) {
3,807✔
202
            Ok(x) => x,
3,807✔
203
            Err(e) => {
×
204
                error!(
×
205
                    "Failed to serialize block proposal validation response, will not notify over event observer";
206
                    "error" => ?e
207
                );
208
                return;
×
209
            }
210
        };
211

212
        for observer in self.observers.iter() {
20,125✔
213
            self.dispatcher
20,125✔
214
                .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE)
20,125✔
215
                .unwrap()
20,125✔
216
                .wait_until_complete();
20,125✔
217
        }
20,125✔
218
    }
3,807✔
219
}
220

221
impl MemPoolEventDispatcher for EventDispatcher {
222
    fn mempool_txs_dropped(
324,970✔
223
        &self,
324,970✔
224
        txids: Vec<Txid>,
324,970✔
225
        new_txid: Option<Txid>,
324,970✔
226
        reason: MemPoolDropReason,
324,970✔
227
    ) {
324,970✔
228
        if !txids.is_empty() {
324,970✔
229
            self.process_dropped_mempool_txs(txids, new_txid, reason)
9✔
230
        }
324,961✔
231
    }
324,970✔
232

233
    fn mined_block_event(
25,789✔
234
        &self,
25,789✔
235
        target_burn_height: u64,
25,789✔
236
        block: &StacksBlock,
25,789✔
237
        block_size_bytes: u64,
25,789✔
238
        consumed: &ExecutionCost,
25,789✔
239
        confirmed_microblock_cost: &ExecutionCost,
25,789✔
240
        tx_events: Vec<TransactionEvent>,
25,789✔
241
    ) {
25,789✔
242
        self.process_mined_block_event(
25,789✔
243
            target_burn_height,
25,789✔
244
            block,
25,789✔
245
            block_size_bytes,
25,789✔
246
            consumed,
25,789✔
247
            confirmed_microblock_cost,
25,789✔
248
            tx_events,
25,789✔
249
        )
250
    }
25,789✔
251

252
    fn mined_microblock_event(
4✔
253
        &self,
4✔
254
        microblock: &StacksMicroblock,
4✔
255
        tx_events: Vec<TransactionEvent>,
4✔
256
        anchor_block_consensus_hash: ConsensusHash,
4✔
257
        anchor_block: BlockHeaderHash,
4✔
258
    ) {
4✔
259
        self.process_mined_microblock_event(
4✔
260
            microblock,
4✔
261
            tx_events,
4✔
262
            anchor_block_consensus_hash,
4✔
263
            anchor_block,
4✔
264
        );
265
    }
4✔
266

267
    fn mined_nakamoto_block_event(
×
268
        &self,
×
269
        target_burn_height: u64,
×
270
        block: &NakamotoBlock,
×
271
        block_size_bytes: u64,
×
272
        consumed: &ExecutionCost,
×
273
        tx_events: Vec<TransactionEvent>,
×
274
    ) {
×
275
        self.process_mined_nakamoto_block_event(
×
276
            target_burn_height,
×
277
            block,
×
278
            block_size_bytes,
×
279
            consumed,
×
280
            tx_events,
×
281
        )
282
    }
×
283

284
    fn get_proposal_callback_receiver(&self) -> Option<Box<dyn ProposalCallbackReceiver>> {
3,807✔
285
        let callback_receivers: Vec<_> = self
3,807✔
286
            .block_proposal_observers_lookup
3,807✔
287
            .iter()
3,807✔
288
            .filter_map(|observer_ix|
3,807✔
289
                match self.registered_observers.get(usize::from(*observer_ix)) {
20,125✔
290
                    Some(x) => Some(x.clone()),
20,125✔
291
                    None => {
292
                        warn!(
×
293
                            "Event observer index not found in registered observers. Ignoring that index.";
294
                            "index" => observer_ix,
×
295
                            "observers_len" => self.registered_observers.len()
×
296
                        );
297
                        None
×
298
                    }
299
                }
20,125✔
300
            )
301
            .collect();
3,807✔
302
        if callback_receivers.is_empty() {
3,807✔
303
            return None;
×
304
        }
3,807✔
305
        let handler = ProposalCallbackHandler {
3,807✔
306
            observers: callback_receivers,
3,807✔
307
            dispatcher: self.clone(),
3,807✔
308
        };
3,807✔
309
        Some(Box::new(handler))
3,807✔
310
    }
3,807✔
311
}
312

313
impl StackerDBEventDispatcher for EventDispatcher {
314
    /// Relay new StackerDB chunks
315
    fn new_stackerdb_chunks(
57,755✔
316
        &self,
57,755✔
317
        contract_id: QualifiedContractIdentifier,
57,755✔
318
        chunks: Vec<StackerDBChunkData>,
57,755✔
319
    ) {
57,755✔
320
        self.process_new_stackerdb_chunks(contract_id, chunks);
57,755✔
321
    }
57,755✔
322
}
323

324
impl BlockEventDispatcher for EventDispatcher {
325
    fn announce_block(
11,225✔
326
        &self,
11,225✔
327
        block: &StacksBlockEventData,
11,225✔
328
        metadata: &StacksHeaderInfo,
11,225✔
329
        receipts: &[StacksTransactionReceipt],
11,225✔
330
        parent: &StacksBlockId,
11,225✔
331
        winner_txid: &Txid,
11,225✔
332
        mature_rewards: &[MinerReward],
11,225✔
333
        mature_rewards_info: Option<&MinerRewardInfo>,
11,225✔
334
        parent_burn_block_hash: &BurnchainHeaderHash,
11,225✔
335
        parent_burn_block_height: u32,
11,225✔
336
        parent_burn_block_timestamp: u64,
11,225✔
337
        anchored_consumed: &ExecutionCost,
11,225✔
338
        mblock_confirmed_consumed: &ExecutionCost,
11,225✔
339
        pox_constants: &PoxConstants,
11,225✔
340
        reward_set_data: &Option<RewardSetData>,
11,225✔
341
        signer_bitvec: &Option<BitVec<4000>>,
11,225✔
342
        block_timestamp: Option<u64>,
11,225✔
343
        coinbase_height: u64,
11,225✔
344
    ) {
11,225✔
345
        self.process_chain_tip(
11,225✔
346
            block,
11,225✔
347
            metadata,
11,225✔
348
            receipts,
11,225✔
349
            parent,
11,225✔
350
            winner_txid,
11,225✔
351
            mature_rewards,
11,225✔
352
            mature_rewards_info,
11,225✔
353
            parent_burn_block_hash,
11,225✔
354
            parent_burn_block_height,
11,225✔
355
            parent_burn_block_timestamp,
11,225✔
356
            anchored_consumed,
11,225✔
357
            mblock_confirmed_consumed,
11,225✔
358
            pox_constants,
11,225✔
359
            reward_set_data,
11,225✔
360
            signer_bitvec,
11,225✔
361
            block_timestamp,
11,225✔
362
            coinbase_height,
11,225✔
363
        );
364
    }
11,225✔
365

366
    fn announce_burn_block(
64,716✔
367
        &self,
64,716✔
368
        burn_block: &BurnchainHeaderHash,
64,716✔
369
        burn_block_height: u64,
64,716✔
370
        rewards: Vec<(PoxAddress, u64)>,
64,716✔
371
        burns: u64,
64,716✔
372
        pox_transactions: Vec<PoxTransactionReward>,
64,716✔
373
        recipient_info: Vec<PoxAddress>,
64,716✔
374
        consensus_hash: &ConsensusHash,
64,716✔
375
        parent_burn_block_hash: &BurnchainHeaderHash,
64,716✔
376
    ) {
64,716✔
377
        self.process_burn_block(
64,716✔
378
            burn_block,
64,716✔
379
            burn_block_height,
64,716✔
380
            rewards,
64,716✔
381
            burns,
64,716✔
382
            pox_transactions,
64,716✔
383
            recipient_info,
64,716✔
384
            consensus_hash,
64,716✔
385
            parent_burn_block_hash,
64,716✔
386
        )
387
    }
64,716✔
388
}
389

390
impl EventDispatcher {
391
    /// The default behavior is to create a non-blocking dispatcher with a
392
    /// queue size of 1,000. Note however that the default *node* configuration
393
    /// is to always block (i.e. an effective queue size of 0).
394
    ///
395
    /// See the `event_dispatcher_blocking` and `event_dispatcher_queue_size`
396
    /// config values.
397
    pub fn new(working_dir: PathBuf) -> EventDispatcher {
16✔
398
        Self::new_with_custom_queue_size(working_dir, 1_000)
16✔
399
    }
16✔
400
    /// The queue size specifies how many events may be in-flight without
401
    /// blocking the calling thread when sending additional events. A value
402
    /// of 0 means they always block.
403
    pub fn new_with_custom_queue_size(working_dir: PathBuf, queue_size: usize) -> EventDispatcher {
317✔
404
        let mut db_path = working_dir;
317✔
405
        db_path.push("event_observers.sqlite");
317✔
406
        EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize database");
317✔
407

408
        let worker = EventDispatcherWorker::new(db_path.clone(), queue_size)
317✔
409
            .expect("Failed to start worker thread");
317✔
410

411
        EventDispatcher {
317✔
412
            stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())),
317✔
413
            registered_observers: vec![],
317✔
414
            contract_events_observers_lookup: HashMap::new(),
317✔
415
            assets_observers_lookup: HashMap::new(),
317✔
416
            stx_observers_lookup: HashSet::new(),
317✔
417
            any_event_observers_lookup: HashSet::new(),
317✔
418
            burn_block_observers_lookup: HashSet::new(),
317✔
419
            mempool_observers_lookup: HashSet::new(),
317✔
420
            microblock_observers_lookup: HashSet::new(),
317✔
421
            miner_observers_lookup: HashSet::new(),
317✔
422
            mined_microblocks_observers_lookup: HashSet::new(),
317✔
423
            stackerdb_observers_lookup: HashSet::new(),
317✔
424
            block_proposal_observers_lookup: HashSet::new(),
317✔
425
            db_path,
317✔
426
            worker,
317✔
427
        }
317✔
428
    }
317✔
429

430
    /// Sends a noop task to the worker and waits until its completion is acknowledged.
431
    /// This has the effect that all payloads that have been submitted before this point
432
    /// are also done, which is a useful thing to wait for in some tests where you want
433
    /// to assert on certain event deliveries.
434
    #[cfg(test)]
435
    pub fn catch_up(&self) {
×
436
        self.worker.noop().unwrap().wait_until_complete();
×
437
    }
×
438

439
    pub fn process_burn_block(
64,716✔
440
        &self,
64,716✔
441
        burn_block: &BurnchainHeaderHash,
64,716✔
442
        burn_block_height: u64,
64,716✔
443
        rewards: Vec<(PoxAddress, u64)>,
64,716✔
444
        burns: u64,
64,716✔
445
        pox_transactions: Vec<PoxTransactionReward>,
64,716✔
446
        recipient_info: Vec<PoxAddress>,
64,716✔
447
        consensus_hash: &ConsensusHash,
64,716✔
448
        parent_burn_block_hash: &BurnchainHeaderHash,
64,716✔
449
    ) {
64,716✔
450
        // lazily assemble payload only if we have observers
451
        let interested_observers = self.filter_observers(&self.burn_block_observers_lookup, true);
64,716✔
452
        if interested_observers.is_empty() {
64,716✔
453
            return;
8,106✔
454
        }
56,610✔
455

456
        let payload = make_new_burn_block_payload(
56,610✔
457
            burn_block,
56,610✔
458
            burn_block_height,
56,610✔
459
            rewards,
56,610✔
460
            burns,
56,610✔
461
            pox_transactions,
56,610✔
462
            recipient_info,
56,610✔
463
            consensus_hash,
56,610✔
464
            parent_burn_block_hash,
56,610✔
465
        );
466

467
        for observer in interested_observers.iter() {
222,973✔
468
            self.send_new_burn_block(&observer, &payload);
222,973✔
469
        }
222,973✔
470
    }
64,716✔
471

472
    /// Iterates through tx receipts, and then the events corresponding to each receipt to
473
    /// generate a dispatch matrix & event vector.
474
    ///
475
    /// # Returns
476
    /// - dispatch_matrix: a vector where each index corresponds to the hashset of event indexes
477
    ///     that each respective event observer is subscribed to
478
    /// - events: a vector of all events from all the tx receipts
479
    #[allow(clippy::type_complexity)]
480
    fn create_dispatch_matrix_and_event_vector<'a>(
11,229✔
481
        &self,
11,229✔
482
        receipts: &'a [StacksTransactionReceipt],
11,229✔
483
    ) -> (Vec<HashSet<usize>>, Vec<(Txid, &'a StacksTransactionEvent)>) {
11,229✔
484
        let mut dispatch_matrix: Vec<HashSet<usize>> = self
11,229✔
485
            .registered_observers
11,229✔
486
            .iter()
11,229✔
487
            .map(|_| HashSet::new())
39,971✔
488
            .collect();
11,229✔
489
        let mut events: Vec<(Txid, &StacksTransactionEvent)> = vec![];
11,229✔
490
        let mut i: usize = 0;
11,229✔
491

492
        for receipt in receipts {
93,040✔
493
            let tx_hash = receipt.transaction.txid();
93,040✔
494
            if receipt.post_condition_aborted {
93,040✔
495
                debug!("Transaction {tx_hash} aborted by post-condition, skipping events");
4✔
496
                continue;
4✔
497
            }
93,036✔
498
            for event in receipt.events.iter() {
94,380✔
499
                match event {
81,662✔
500
                    StacksTransactionEvent::SmartContractEvent(event_data) => {
1,616✔
501
                        if let Some(observer_indexes) =
×
502
                            self.contract_events_observers_lookup.get(&event_data.key)
1,616✔
503
                        {
504
                            for o_i in observer_indexes {
×
505
                                dispatch_matrix[*o_i as usize].insert(i);
×
506
                            }
×
507
                        }
1,616✔
508
                    }
509
                    StacksTransactionEvent::STXEvent(STXEventType::STXTransferEvent(_))
510
                    | StacksTransactionEvent::STXEvent(STXEventType::STXMintEvent(_))
511
                    | StacksTransactionEvent::STXEvent(STXEventType::STXBurnEvent(_))
512
                    | StacksTransactionEvent::STXEvent(STXEventType::STXLockEvent(_)) => {
513
                        for o_i in &self.stx_observers_lookup {
81,662✔
514
                            dispatch_matrix[*o_i as usize].insert(i);
×
515
                        }
×
516
                    }
517
                    StacksTransactionEvent::NFTEvent(NFTEventType::NFTTransferEvent(
518
                        event_data,
2✔
519
                    )) => {
2✔
520
                        self.update_dispatch_matrix_if_observer_subscribed(
2✔
521
                            &event_data.asset_identifier,
2✔
522
                            i,
2✔
523
                            &mut dispatch_matrix,
2✔
524
                        );
2✔
525
                    }
2✔
526
                    StacksTransactionEvent::NFTEvent(NFTEventType::NFTMintEvent(event_data)) => {
3✔
527
                        self.update_dispatch_matrix_if_observer_subscribed(
3✔
528
                            &event_data.asset_identifier,
3✔
529
                            i,
3✔
530
                            &mut dispatch_matrix,
3✔
531
                        );
3✔
532
                    }
3✔
533
                    StacksTransactionEvent::NFTEvent(NFTEventType::NFTBurnEvent(event_data)) => {
×
534
                        self.update_dispatch_matrix_if_observer_subscribed(
×
535
                            &event_data.asset_identifier,
×
536
                            i,
×
537
                            &mut dispatch_matrix,
×
538
                        );
×
539
                    }
×
540
                    StacksTransactionEvent::FTEvent(FTEventType::FTTransferEvent(event_data)) => {
×
541
                        self.update_dispatch_matrix_if_observer_subscribed(
×
542
                            &event_data.asset_identifier,
×
543
                            i,
×
544
                            &mut dispatch_matrix,
×
545
                        );
×
546
                    }
×
547
                    StacksTransactionEvent::FTEvent(FTEventType::FTMintEvent(event_data)) => {
1✔
548
                        self.update_dispatch_matrix_if_observer_subscribed(
1✔
549
                            &event_data.asset_identifier,
1✔
550
                            i,
1✔
551
                            &mut dispatch_matrix,
1✔
552
                        );
1✔
553
                    }
1✔
554
                    StacksTransactionEvent::FTEvent(FTEventType::FTBurnEvent(event_data)) => {
×
555
                        self.update_dispatch_matrix_if_observer_subscribed(
×
556
                            &event_data.asset_identifier,
×
557
                            i,
×
558
                            &mut dispatch_matrix,
×
559
                        );
×
560
                    }
×
561
                }
562
                events.push((tx_hash.clone(), event));
83,284✔
563
                for o_i in &self.any_event_observers_lookup {
83,284✔
564
                    dispatch_matrix[*o_i as usize].insert(i);
3,007✔
565
                }
3,007✔
566
                i += 1;
83,284✔
567
            }
568
        }
569

570
        (dispatch_matrix, events)
11,229✔
571
    }
11,229✔
572

573
    #[allow(clippy::too_many_arguments)]
574
    pub fn process_chain_tip(
11,225✔
575
        &self,
11,225✔
576
        block: &StacksBlockEventData,
11,225✔
577
        metadata: &StacksHeaderInfo,
11,225✔
578
        receipts: &[StacksTransactionReceipt],
11,225✔
579
        parent_index_hash: &StacksBlockId,
11,225✔
580
        winner_txid: &Txid,
11,225✔
581
        mature_rewards: &[MinerReward],
11,225✔
582
        mature_rewards_info: Option<&MinerRewardInfo>,
11,225✔
583
        parent_burn_block_hash: &BurnchainHeaderHash,
11,225✔
584
        parent_burn_block_height: u32,
11,225✔
585
        parent_burn_block_timestamp: u64,
11,225✔
586
        anchored_consumed: &ExecutionCost,
11,225✔
587
        mblock_confirmed_consumed: &ExecutionCost,
11,225✔
588
        pox_constants: &PoxConstants,
11,225✔
589
        reward_set_data: &Option<RewardSetData>,
11,225✔
590
        signer_bitvec: &Option<BitVec<4000>>,
11,225✔
591
        block_timestamp: Option<u64>,
11,225✔
592
        coinbase_height: u64,
11,225✔
593
    ) {
11,225✔
594
        let (dispatch_matrix, events) = self.create_dispatch_matrix_and_event_vector(receipts);
11,225✔
595

596
        if !dispatch_matrix.is_empty() {
11,225✔
597
            let mature_rewards_vec = if let Some(rewards_info) = mature_rewards_info {
10,272✔
598
                mature_rewards
7,921✔
599
                    .iter()
7,921✔
600
                    .map(|reward| {
15,842✔
601
                        json!({
15,842✔
602
                            "recipient": reward.recipient.to_string(),
15,842✔
603
                            "miner_address": reward.address.to_string(),
15,842✔
604
                            "coinbase_amount": reward.coinbase.to_string(),
15,842✔
605
                            "tx_fees_anchored": reward.tx_fees_anchored.to_string(),
15,842✔
606
                            "tx_fees_streamed_confirmed": reward.tx_fees_streamed_confirmed.to_string(),
15,842✔
607
                            "tx_fees_streamed_produced": reward.tx_fees_streamed_produced.to_string(),
15,842✔
608
                            "from_stacks_block_hash": format!("0x{}", rewards_info.from_stacks_block_hash),
15,842✔
609
                            "from_index_consensus_hash": format!("0x{}", StacksBlockId::new(&rewards_info.from_block_consensus_hash,
15,842✔
610
                                                                                            &rewards_info.from_stacks_block_hash)),
15,842✔
611
                        })
612
                    })
15,842✔
613
                    .collect()
7,921✔
614
            } else {
615
                vec![]
2,351✔
616
            };
617

618
            let mature_rewards = serde_json::Value::Array(mature_rewards_vec);
10,272✔
619

620
            #[cfg(any(test, feature = "testing"))]
621
            if test_skip_block_announcement(block) {
10,272✔
622
                return;
×
623
            }
10,272✔
624

625
            for (observer_id, filtered_events_ids) in dispatch_matrix.iter().enumerate() {
39,208✔
626
                let filtered_events: Vec<_> = filtered_events_ids
39,208✔
627
                    .iter()
39,208✔
628
                    .map(|event_id| (*event_id, &events[*event_id]))
39,870✔
629
                    .collect();
39,208✔
630

631
                let payload = make_new_block_processed_payload(
39,208✔
632
                    filtered_events,
39,208✔
633
                    block,
39,208✔
634
                    metadata,
39,208✔
635
                    receipts,
39,208✔
636
                    parent_index_hash,
39,208✔
637
                    &winner_txid,
39,208✔
638
                    &mature_rewards,
39,208✔
639
                    parent_burn_block_hash,
39,208✔
640
                    parent_burn_block_height,
39,208✔
641
                    parent_burn_block_timestamp,
39,208✔
642
                    anchored_consumed,
39,208✔
643
                    mblock_confirmed_consumed,
39,208✔
644
                    pox_constants,
39,208✔
645
                    reward_set_data,
39,208✔
646
                    signer_bitvec,
39,208✔
647
                    block_timestamp,
39,208✔
648
                    coinbase_height,
39,208✔
649
                );
650

651
                // Send payload
652
                self.dispatch_to_observer_or_log_error(
39,208✔
653
                    &self.registered_observers[observer_id],
39,208✔
654
                    &payload,
39,208✔
655
                    PATH_BLOCK_PROCESSED,
39,208✔
656
                );
657
            }
658
        }
953✔
659
    }
11,225✔
660

661
    /// Creates a list of observers that are interested in the new microblocks event,
662
    /// creates a mapping from observers to the event ids that are relevant to each, and then
663
    /// sends the event to each interested observer.
664
    pub fn process_new_microblocks(
5✔
665
        &self,
5✔
666
        parent_index_block_hash: &StacksBlockId,
5✔
667
        processed_unconfirmed_state: &ProcessedUnconfirmedState,
5✔
668
    ) {
5✔
669
        // lazily assemble payload only if we have observers
670
        let interested_observers: Vec<_> = self
5✔
671
            .registered_observers
5✔
672
            .iter()
5✔
673
            .enumerate()
5✔
674
            .filter(|(obs_id, _observer)| {
5✔
675
                self.microblock_observers_lookup
2✔
676
                    .contains(&(u16::try_from(*obs_id).expect("FATAL: more than 2^16 observers")))
2✔
677
                    || self.any_event_observers_lookup.contains(
2✔
678
                        &(u16::try_from(*obs_id).expect("FATAL: more than 2^16 observers")),
2✔
679
                    )
680
            })
2✔
681
            .collect();
5✔
682
        if interested_observers.is_empty() {
5✔
683
            return;
3✔
684
        }
2✔
685
        let flattened_receipts: Vec<_> = processed_unconfirmed_state
2✔
686
            .receipts
2✔
687
            .iter()
2✔
688
            .flat_map(|(_, _, r)| r.clone())
2✔
689
            .collect();
2✔
690
        let (dispatch_matrix, events) =
2✔
691
            self.create_dispatch_matrix_and_event_vector(&flattened_receipts);
2✔
692

693
        // Serialize receipts
694
        let mut tx_index;
695
        let mut serialized_txs = Vec::new();
2✔
696

697
        for (_, _, receipts) in processed_unconfirmed_state.receipts.iter() {
2✔
698
            tx_index = 0;
2✔
699
            for receipt in receipts.iter() {
3✔
700
                let payload = make_new_block_txs_payload(receipt, tx_index);
3✔
701
                serialized_txs.push(payload);
3✔
702
                tx_index += 1;
3✔
703
            }
3✔
704
        }
705

706
        for (obs_id, observer) in interested_observers.iter() {
2✔
707
            let filtered_events_ids = &dispatch_matrix[*obs_id];
2✔
708
            let filtered_events: Vec<_> = filtered_events_ids
2✔
709
                .iter()
2✔
710
                .map(|event_id| (*event_id, &events[*event_id]))
2✔
711
                .collect();
2✔
712

713
            self.send_new_microblocks(
2✔
714
                observer,
2✔
715
                &parent_index_block_hash,
2✔
716
                &filtered_events,
2✔
717
                &serialized_txs,
2✔
718
                &processed_unconfirmed_state.burn_block_hash,
2✔
719
                processed_unconfirmed_state.burn_block_height,
2✔
720
                processed_unconfirmed_state.burn_block_timestamp,
2✔
721
            );
722
        }
723
    }
5✔
724

725
    fn filter_observers(&self, lookup: &HashSet<u16>, include_any: bool) -> Vec<&EventObserver> {
154,556✔
726
        self.registered_observers
154,556✔
727
            .iter()
154,556✔
728
            .enumerate()
154,556✔
729
            .filter_map(|(obs_id, observer)| {
560,380✔
730
                let lookup_ix = u16::try_from(obs_id).expect("FATAL: more than 2^16 observers");
544,483✔
731
                if lookup.contains(&lookup_ix)
544,483✔
732
                    || (include_any && self.any_event_observers_lookup.contains(&lookup_ix))
86,759✔
733
                {
734
                    Some(observer)
473,956✔
735
                } else {
736
                    None
70,527✔
737
                }
738
            })
544,483✔
739
            .collect()
154,556✔
740
    }
154,556✔
741

742
    pub fn process_new_mempool_txs(&self, txs: Vec<StacksTransaction>) {
3,879✔
743
        // lazily assemble payload only if we have observers
744
        let interested_observers = self.filter_observers(&self.mempool_observers_lookup, true);
3,879✔
745

746
        if interested_observers.is_empty() {
3,879✔
747
            return;
2,077✔
748
        }
1,802✔
749

750
        let payload = make_new_mempool_txs_payload(txs);
1,802✔
751

752
        for observer in interested_observers.iter() {
1,802✔
753
            self.send_new_mempool_txs(observer, &payload);
1,802✔
754
        }
1,802✔
755
    }
3,879✔
756

757
    pub fn process_mined_block_event(
25,789✔
758
        &self,
25,789✔
759
        target_burn_height: u64,
25,789✔
760
        block: &StacksBlock,
25,789✔
761
        block_size_bytes: u64,
25,789✔
762
        consumed: &ExecutionCost,
25,789✔
763
        confirmed_microblock_cost: &ExecutionCost,
25,789✔
764
        tx_events: Vec<TransactionEvent>,
25,789✔
765
    ) {
25,789✔
766
        let interested_observers = self.filter_observers(&self.miner_observers_lookup, false);
25,789✔
767

768
        if interested_observers.is_empty() {
25,789✔
769
            return;
16,376✔
770
        }
9,413✔
771

772
        let payload = serde_json::to_value(MinedBlockEvent {
9,413✔
773
            target_burn_height,
9,413✔
774
            block_hash: block.block_hash().to_string(),
9,413✔
775
            stacks_height: block.header.total_work.work,
9,413✔
776
            block_size: block_size_bytes,
9,413✔
777
            anchored_cost: consumed.clone(),
9,413✔
778
            confirmed_microblocks_cost: confirmed_microblock_cost.clone(),
9,413✔
779
            tx_events,
9,413✔
780
        })
9,413✔
781
        .unwrap();
9,413✔
782

783
        for observer in interested_observers.iter() {
9,413✔
784
            self.send_mined_block(observer, &payload);
9,413✔
785
        }
9,413✔
786
    }
25,789✔
787

788
    pub fn process_mined_microblock_event(
4✔
789
        &self,
4✔
790
        microblock: &StacksMicroblock,
4✔
791
        tx_events: Vec<TransactionEvent>,
4✔
792
        anchor_block_consensus_hash: ConsensusHash,
4✔
793
        anchor_block: BlockHeaderHash,
4✔
794
    ) {
4✔
795
        let interested_observers =
4✔
796
            self.filter_observers(&self.mined_microblocks_observers_lookup, false);
4✔
797
        if interested_observers.is_empty() {
4✔
798
            return;
3✔
799
        }
1✔
800

801
        let payload = serde_json::to_value(MinedMicroblockEvent {
1✔
802
            block_hash: microblock.block_hash().to_string(),
1✔
803
            sequence: microblock.header.sequence,
1✔
804
            tx_events,
1✔
805
            anchor_block_consensus_hash,
1✔
806
            anchor_block,
1✔
807
        })
1✔
808
        .unwrap();
1✔
809

810
        for observer in interested_observers.iter() {
1✔
811
            self.send_mined_microblock(observer, &payload);
1✔
812
        }
1✔
813
    }
4✔
814

815
    pub fn process_mined_nakamoto_block_event(
2,404✔
816
        &self,
2,404✔
817
        target_burn_height: u64,
2,404✔
818
        block: &NakamotoBlock,
2,404✔
819
        block_size_bytes: u64,
2,404✔
820
        consumed: &ExecutionCost,
2,404✔
821
        tx_events: Vec<TransactionEvent>,
2,404✔
822
    ) {
2,404✔
823
        let interested_observers = self.filter_observers(&self.miner_observers_lookup, false);
2,404✔
824
        if interested_observers.is_empty() {
2,404✔
825
            return;
1,272✔
826
        }
1,132✔
827

828
        let signer_bitvec = serde_json::to_value(block.header.pox_treatment.clone())
1,132✔
829
            .unwrap_or_default()
1,132✔
830
            .as_str()
1,132✔
831
            .unwrap_or_default()
1,132✔
832
            .to_string();
1,132✔
833

834
        let payload = serde_json::to_value(MinedNakamotoBlockEvent {
1,132✔
835
            target_burn_height,
1,132✔
836
            parent_block_id: block.header.parent_block_id.to_string(),
1,132✔
837
            block_hash: block.header.block_hash().to_string(),
1,132✔
838
            block_id: block.header.block_id().to_string(),
1,132✔
839
            stacks_height: block.header.chain_length,
1,132✔
840
            block_size: block_size_bytes,
1,132✔
841
            cost: consumed.clone(),
1,132✔
842
            tx_events,
1,132✔
843
            miner_signature: block.header.miner_signature.clone(),
1,132✔
844
            miner_signature_hash: block.header.miner_signature_hash(),
1,132✔
845
            signer_signature_hash: block.header.signer_signature_hash(),
1,132✔
846
            signer_signature: block.header.signer_signature.clone(),
1,132✔
847
            signer_bitvec,
1,132✔
848
        })
1,132✔
849
        .unwrap();
1,132✔
850

851
        for observer in interested_observers.iter() {
1,132✔
852
            self.send_mined_nakamoto_block(observer, &payload);
1,132✔
853
        }
1,132✔
854
    }
2,404✔
855

856
    /// Forward newly-accepted StackerDB chunk metadata to downstream `stackerdb` observers.
857
    /// Infallible.
858
    pub fn process_new_stackerdb_chunks(
57,755✔
859
        &self,
57,755✔
860
        contract_id: QualifiedContractIdentifier,
57,755✔
861
        modified_slots: Vec<StackerDBChunkData>,
57,755✔
862
    ) {
57,755✔
863
        debug!(
57,755✔
864
            "event_dispatcher: New StackerDB chunk events for {contract_id}: {modified_slots:?}"
865
        );
866

867
        let interested_observers = self.filter_observers(&self.stackerdb_observers_lookup, false);
57,755✔
868

869
        let stackerdb_channel = self
57,755✔
870
            .stackerdb_channel
57,755✔
871
            .lock()
57,755✔
872
            .expect("FATAL: failed to lock StackerDB channel mutex");
57,755✔
873
        let interested_receiver = stackerdb_channel.is_active(&contract_id);
57,755✔
874
        if interested_observers.is_empty() && interested_receiver.is_none() {
57,755✔
875
            return;
3,314✔
876
        }
54,441✔
877

878
        let event = StackerDBChunksEvent {
54,441✔
879
            contract_id,
54,441✔
880
            modified_slots,
54,441✔
881
        };
54,441✔
882
        let payload = serde_json::to_value(&event)
54,441✔
883
            .expect("FATAL: failed to serialize StackerDBChunksEvent to JSON");
54,441✔
884

885
        if let Some(channel) = interested_receiver {
54,441✔
886
            if let Err(send_err) = channel.send(event) {
8,755✔
887
                warn!(
×
888
                    "Failed to send StackerDB event to signer coordinator channel. Miner thread may have exited.";
889
                    "err" => ?send_err
890
                );
891
            }
8,755✔
892
        }
45,686✔
893

894
        for observer in interested_observers.iter() {
238,621✔
895
            self.send_stackerdb_chunks(observer, &payload);
238,620✔
896
        }
238,620✔
897
    }
57,755✔
898

899
    pub fn process_dropped_mempool_txs(
9✔
900
        &self,
9✔
901
        txs: Vec<Txid>,
9✔
902
        new_txid: Option<Txid>,
9✔
903
        reason: MemPoolDropReason,
9✔
904
    ) {
9✔
905
        // lazily assemble payload only if we have observers
906
        let interested_observers = self.filter_observers(&self.mempool_observers_lookup, true);
9✔
907

908
        if interested_observers.is_empty() {
9✔
909
            return;
2✔
910
        }
7✔
911

912
        let dropped_txids: Vec<_> = txs
7✔
913
            .into_iter()
7✔
914
            .map(|tx| serde_json::Value::String(format!("0x{tx}")))
7✔
915
            .collect();
7✔
916

917
        let payload = match new_txid {
7✔
918
            Some(id) => {
1✔
919
                json!({
1✔
920
                    "dropped_txids": serde_json::Value::Array(dropped_txids),
1✔
921
                    "reason": reason.to_string(),
1✔
922
                    "new_txid": format!("0x{}", &id),
1✔
923
                })
924
            }
925
            None => {
926
                json!({
6✔
927
                    "dropped_txids": serde_json::Value::Array(dropped_txids),
6✔
928
                    "reason": reason.to_string(),
6✔
929
                    "new_txid": null,
6✔
930
                })
931
            }
932
        };
933

934
        for observer in interested_observers.iter() {
7✔
935
            self.send_dropped_mempool_txs(observer, &payload);
7✔
936
        }
7✔
937
    }
9✔
938

939
    pub fn process_new_attachments(&self, attachments: &[(AttachmentInstance, Attachment)]) {
×
940
        let interested_observers: Vec<_> = self.registered_observers.iter().enumerate().collect();
×
941
        if interested_observers.is_empty() {
×
942
            return;
×
943
        }
×
944

945
        let mut serialized_attachments = vec![];
×
946
        for attachment in attachments.iter() {
×
947
            let payload = make_new_attachment_payload(attachment);
×
948
            serialized_attachments.push(payload);
×
949
        }
×
950

951
        for (_, observer) in interested_observers.iter() {
×
952
            self.send_new_attachments(observer, &json!(serialized_attachments));
×
953
        }
×
954
    }
×
955

956
    fn update_dispatch_matrix_if_observer_subscribed(
6✔
957
        &self,
6✔
958
        asset_identifier: &AssetIdentifier,
6✔
959
        event_index: usize,
6✔
960
        dispatch_matrix: &mut [HashSet<usize>],
6✔
961
    ) {
6✔
962
        if let Some(observer_indexes) = self.assets_observers_lookup.get(asset_identifier) {
6✔
963
            for o_i in observer_indexes {
×
964
                dispatch_matrix[*o_i as usize].insert(event_index);
×
965
            }
×
966
        }
6✔
967
    }
6✔
968

969
    pub fn register_observer(&mut self, conf: &EventObserverConfig) {
973✔
970
        self.register_observer_private(conf);
973✔
971
    }
973✔
972

973
    fn register_observer_private(&mut self, conf: &EventObserverConfig) -> EventObserver {
974✔
974
        info!("Registering event observer at: {}", conf.endpoint);
974✔
975
        let event_observer = EventObserver::new(
974✔
976
            conf.endpoint.clone(),
974✔
977
            Duration::from_millis(conf.timeout_ms),
974✔
978
            conf.disable_retries,
974✔
979
        );
980

981
        if conf.disable_retries {
974✔
982
            warn!(
2✔
983
                "Observer {} is configured in \"disable_retries\" mode: events are not guaranteed to be delivered",
984
                conf.endpoint
985
            );
986
        }
972✔
987

988
        let observer_index = self.registered_observers.len() as u16;
974✔
989

990
        for event_key_type in conf.events_keys.iter() {
3,031✔
991
            match event_key_type {
3,031✔
992
                EventKeyType::SmartContractEvent(event_key) => {
×
993
                    match self
×
994
                        .contract_events_observers_lookup
×
995
                        .entry(event_key.clone())
×
996
                    {
997
                        Entry::Occupied(observer_indexes) => {
×
998
                            observer_indexes.into_mut().insert(observer_index);
×
999
                        }
×
1000
                        Entry::Vacant(v) => {
×
1001
                            let mut observer_indexes = HashSet::new();
×
1002
                            observer_indexes.insert(observer_index);
×
1003
                            v.insert(observer_indexes);
×
1004
                        }
×
1005
                    };
1006
                }
1007
                EventKeyType::BurnchainBlocks => {
884✔
1008
                    self.burn_block_observers_lookup.insert(observer_index);
884✔
1009
                }
884✔
1010
                EventKeyType::MemPoolTransactions => {
141✔
1011
                    self.mempool_observers_lookup.insert(observer_index);
141✔
1012
                }
141✔
1013
                EventKeyType::Microblocks => {
×
1014
                    self.microblock_observers_lookup.insert(observer_index);
×
1015
                }
×
1016
                EventKeyType::STXEvent => {
×
1017
                    self.stx_observers_lookup.insert(observer_index);
×
1018
                }
×
1019
                EventKeyType::AssetEvent(event_key) => {
×
1020
                    match self.assets_observers_lookup.entry(event_key.clone()) {
×
1021
                        Entry::Occupied(observer_indexes) => {
×
1022
                            observer_indexes.into_mut().insert(observer_index);
×
1023
                        }
×
1024
                        Entry::Vacant(v) => {
×
1025
                            let mut observer_indexes = HashSet::new();
×
1026
                            observer_indexes.insert(observer_index);
×
1027
                            v.insert(observer_indexes);
×
1028
                        }
×
1029
                    };
1030
                }
1031
                EventKeyType::AnyEvent => {
74✔
1032
                    self.any_event_observers_lookup.insert(observer_index);
74✔
1033
                }
74✔
1034
                EventKeyType::MinedBlocks => {
158✔
1035
                    self.miner_observers_lookup.insert(observer_index);
158✔
1036
                }
158✔
1037
                EventKeyType::MinedMicroblocks => {
1✔
1038
                    self.mined_microblocks_observers_lookup
1✔
1039
                        .insert(observer_index);
1✔
1040
                }
1✔
1041
                EventKeyType::StackerDBChunks => {
884✔
1042
                    self.stackerdb_observers_lookup.insert(observer_index);
884✔
1043
                }
884✔
1044
                EventKeyType::BlockProposal => {
889✔
1045
                    self.block_proposal_observers_lookup.insert(observer_index);
889✔
1046
                }
889✔
1047
            }
1048
        }
1049

1050
        self.registered_observers.push(event_observer.clone());
974✔
1051

1052
        event_observer
974✔
1053
    }
974✔
1054

1055
    /// Process any pending payloads in the database. This is meant to be called at startup, in order to
1056
    /// handle anything that was enqueued but not sent before shutdown. This method blocks until all
1057
    /// requests are made (or, if the observer is no longer registered, removed from the DB).
1058
    pub fn process_pending_payloads(&self) {
3✔
1059
        let conn =
3✔
1060
            EventDispatcherDbConnection::new(&self.db_path).expect("Failed to initialize database");
3✔
1061
        let pending_payloads = match conn.get_pending_payloads() {
3✔
1062
            Ok(payloads) => payloads,
3✔
1063
            Err(e) => {
×
1064
                error!(
×
1065
                    "Event observer: failed to retrieve pending payloads from database";
1066
                    "error" => ?e
1067
                );
1068
                return;
×
1069
            }
1070
        };
1071

1072
        info!(
3✔
1073
            "Event dispatcher: processing {} pending payloads",
1074
            pending_payloads.len()
3✔
1075
        );
1076

1077
        for PendingPayload {
1078
            id, request_data, ..
3✔
1079
        } in pending_payloads
3✔
1080
        {
1081
            info!(
3✔
1082
                "Event dispatcher: processing pending payload: {}",
1083
                request_data.url
1084
            );
1085
            let full_url = Url::parse(request_data.url.as_str()).unwrap_or_else(|_| {
3✔
1086
                panic!(
×
1087
                    "Event dispatcher: unable to parse {} as a URL",
1088
                    request_data.url
1089
                )
1090
            });
1091
            // find the right observer
1092
            let observer = self.registered_observers.iter().find(|observer| {
3✔
1093
                let endpoint_url = Url::parse(format!("http://{}", &observer.endpoint).as_str())
3✔
1094
                    .unwrap_or_else(|_| {
3✔
1095
                        panic!(
×
1096
                            "Event dispatcher: unable to parse {} as a URL",
1097
                            observer.endpoint
1098
                        )
1099
                    });
1100
                full_url.origin() == endpoint_url.origin()
3✔
1101
            });
3✔
1102

1103
            let Some(observer) = observer else {
3✔
1104
                // This observer is no longer registered, skip and delete
1105
                info!(
1✔
1106
                    "Event dispatcher: observer {} no longer registered, skipping",
1107
                    request_data.url
1108
                );
1109
                if let Err(e) = conn.delete_payload(id) {
1✔
1110
                    error!(
×
1111
                        "Event observer: failed to delete pending payload from database";
1112
                        "error" => ?e
1113
                    );
1114
                }
1✔
1115
                continue;
1✔
1116
            };
1117

1118
            // If the timeout configuration for this observer is different from what it was
1119
            // originally, the updated config wins.
1120
            self.worker
2✔
1121
                .initiate_send(id, observer.disable_retries, Some(observer.timeout))
2✔
1122
                .expect("failed to dispatch pending event payload to worker thread")
2✔
1123
                .wait_until_complete();
2✔
1124
        }
1125
    }
3✔
1126

1127
    /// A successful result from this method only indicates that that payload was successfully
1128
    /// enqueued, not that the HTTP request was actually made. If you need to wait until that's
1129
    /// the case, call `wait_until_complete()` on the `EventDispatcherResult`.
1130
    fn dispatch_to_observer(
533,296✔
1131
        &self,
533,296✔
1132
        event_observer: &EventObserver,
533,296✔
1133
        payload: &serde_json::Value,
533,296✔
1134
        path: &str,
533,296✔
1135
    ) -> Result<EventDispatcherResult, EventDispatcherError> {
533,296✔
1136
        let full_url = Self::get_full_url(event_observer, path);
533,296✔
1137
        let bytes = match Self::get_payload_bytes(payload) {
533,296✔
1138
            Ok(bytes) => bytes,
533,296✔
1139
            Err(err) => {
×
1140
                error!(
×
1141
                    "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err
×
1142
                );
1143
                return Err(err);
×
1144
            }
1145
        };
1146

1147
        let data = EventRequestData {
533,296✔
1148
            payload_bytes: bytes,
533,296✔
1149
            url: full_url,
533,296✔
1150
            timeout: event_observer.timeout,
533,296✔
1151
        };
533,296✔
1152

1153
        let id = self.save_to_db(&data);
533,296✔
1154

1155
        self.worker
533,296✔
1156
            .initiate_send(id, event_observer.disable_retries, None)
533,296✔
1157
    }
533,296✔
1158

1159
    /// This fire-and-forget version of `dispatch_to_observer` logs any error from enqueueing the
1160
    /// request, and does not give you a way to wait for blocking until it's sent. If you need
1161
    /// more control, use `dispatch_to_observer()` directly and handle the result yourself.
1162
    ///
1163
    /// This method exists because we generally don't want the event dispatcher to interrupt the node's
1164
    /// processing.
1165
    fn dispatch_to_observer_or_log_error(
513,164✔
1166
        &self,
513,164✔
1167
        event_observer: &EventObserver,
513,164✔
1168
        payload: &serde_json::Value,
513,164✔
1169
        path: &str,
513,164✔
1170
    ) {
513,164✔
1171
        if let Err(err) = self.dispatch_to_observer(event_observer, payload, path) {
513,164✔
1172
            error!("Event dispatcher: Failed to enqueue payload for sending to observer: {err:?}");
3✔
1173
        }
513,161✔
1174
    }
513,164✔
1175

1176
    fn get_payload_bytes(payload: &serde_json::Value) -> Result<Arc<[u8]>, EventDispatcherError> {
533,296✔
1177
        let payload_bytes = serde_json::to_vec(payload)?;
533,296✔
1178
        Ok(Arc::<[u8]>::from(payload_bytes))
533,296✔
1179
    }
533,296✔
1180

1181
    fn get_full_url(event_observer: &EventObserver, path: &str) -> String {
533,296✔
1182
        let url_str = if path.starts_with('/') {
533,296✔
1183
            format!("{}{path}", &event_observer.endpoint)
10✔
1184
        } else {
1185
            format!("{}/{path}", &event_observer.endpoint)
533,286✔
1186
        };
1187
        format!("http://{url_str}")
533,296✔
1188
    }
533,296✔
1189

1190
    fn save_to_db(&self, data: &EventRequestData) -> i64 {
533,296✔
1191
        // Because the DB is initialized in the call to process_pending_payloads() during startup,
1192
        // it is *probably* ok to skip initialization here. That said, at the time of writing this is the
1193
        // only call to new_without_init(), and we might want to revisit the question whether it's
1194
        // really worth it.
1195
        let conn = EventDispatcherDbConnection::new_without_init(&self.db_path)
533,296✔
1196
            .expect("Failed to open database for event observer");
533,296✔
1197

1198
        conn.insert_payload_with_retry(data, SystemTime::now())
533,296✔
1199
    }
533,296✔
1200

1201
    fn send_new_attachments(&self, event_observer: &EventObserver, payload: &serde_json::Value) {
×
1202
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_ATTACHMENT_PROCESSED);
×
1203
    }
×
1204

1205
    fn send_new_mempool_txs(&self, event_observer: &EventObserver, payload: &serde_json::Value) {
1,802✔
1206
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT);
1,802✔
1207
    }
1,802✔
1208

1209
    /// Serializes new microblocks data into a JSON payload and sends it off to the correct path
1210
    fn send_new_microblocks(
2✔
1211
        &self,
2✔
1212
        event_observer: &EventObserver,
2✔
1213
        parent_index_block_hash: &StacksBlockId,
2✔
1214
        filtered_events: &[(usize, &(Txid, &StacksTransactionEvent))],
2✔
1215
        serialized_txs: &[TransactionEventPayload],
2✔
1216
        burn_block_hash: &BurnchainHeaderHash,
2✔
1217
        burn_block_height: u32,
2✔
1218
        burn_block_timestamp: u64,
2✔
1219
    ) {
2✔
1220
        // Serialize events to JSON
1221
        let serialized_events: Vec<serde_json::Value> = filtered_events
2✔
1222
            .iter()
2✔
1223
            .map(|(event_index, (txid, event))| {
2✔
1224
                // Since we no longer send events for post condition aborted transactions,
1225
                // all events we serialize here are committed events, so we can set the `committed` field to `true`.
1226
                event.json_serialize(*event_index, txid, true).unwrap()
1✔
1227
            })
1✔
1228
            .collect();
2✔
1229

1230
        let payload = json!({
2✔
1231
            "parent_index_block_hash": format!("0x{parent_index_block_hash}"),
2✔
1232
            "events": serialized_events,
2✔
1233
            "transactions": serialized_txs,
2✔
1234
            "burn_block_hash": format!("0x{burn_block_hash}"),
2✔
1235
            "burn_block_height": burn_block_height,
2✔
1236
            "burn_block_timestamp": burn_block_timestamp,
2✔
1237
        });
1238

1239
        self.dispatch_to_observer_or_log_error(event_observer, &payload, PATH_MICROBLOCK_SUBMIT);
2✔
1240
    }
2✔
1241

1242
    fn send_dropped_mempool_txs(
7✔
1243
        &self,
7✔
1244
        event_observer: &EventObserver,
7✔
1245
        payload: &serde_json::Value,
7✔
1246
    ) {
7✔
1247
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_DROP);
7✔
1248
    }
7✔
1249

1250
    fn send_mined_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) {
9,413✔
1251
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_BLOCK);
9,413✔
1252
    }
9,413✔
1253

1254
    fn send_mined_microblock(&self, event_observer: &EventObserver, payload: &serde_json::Value) {
1✔
1255
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_MICROBLOCK);
1✔
1256
    }
1✔
1257

1258
    fn send_mined_nakamoto_block(
1,132✔
1259
        &self,
1,132✔
1260
        event_observer: &EventObserver,
1,132✔
1261
        payload: &serde_json::Value,
1,132✔
1262
    ) {
1,132✔
1263
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK);
1,132✔
1264
    }
1,132✔
1265

1266
    fn send_stackerdb_chunks(&self, event_observer: &EventObserver, payload: &serde_json::Value) {
238,623✔
1267
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_STACKERDB_CHUNKS);
238,623✔
1268
    }
238,623✔
1269

1270
    fn send_new_burn_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) {
222,973✔
1271
        self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_BURN_BLOCK_SUBMIT);
222,973✔
1272
    }
222,973✔
1273
}
1274

1275
#[cfg(any(test, feature = "testing"))]
1276
fn test_skip_block_announcement(block: &StacksBlockEventData) -> bool {
10,272✔
1277
    if TEST_SKIP_BLOCK_ANNOUNCEMENT.get() {
10,272✔
1278
        warn!(
×
1279
            "Skipping new block announcement due to testing directive";
1280
            "block_hash" => %block.block_hash
1281
        );
1282
        return true;
×
1283
    }
10,272✔
1284
    false
10,272✔
1285
}
10,272✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc