• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

randombit / botan / 5230455705

10 Jun 2023 02:30PM UTC coverage: 91.715% (-0.03%) from 91.746%
5230455705

push

github

randombit
Merge GH #3584 Change clang-format AllowShortFunctionsOnASingleLine config from All to Inline

77182 of 84154 relevant lines covered (91.72%)

11975295.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.07
/src/lib/filters/threaded_fork.cpp
1
/*
2
* Threaded Fork
3
* (C) 2013 Joel Low
4
*     2013 Jack Lloyd
5
*
6
* Botan is released under the Simplified BSD License (see license.txt)
7
*/
8

9
#include <botan/filters.h>
10

11
#if defined(BOTAN_HAS_THREAD_UTILS)
12

13
   #include <botan/internal/barrier.h>
14
   #include <botan/internal/semaphore.h>
15
   #include <functional>
16

17
namespace Botan {
18

19
struct Threaded_Fork_Data {
2✔
20
      /*
21
   * Semaphore for indicating that there is work to be done (or to
22
   * quit)
23
   */
24
      Semaphore m_input_ready_semaphore;
25

26
      /*
27
   * Synchronises all threads to complete processing data in lock-step.
28
   */
29
      Barrier m_input_complete_barrier;
30

31
      /*
32
   * The work that needs to be done. This should be only when the threads
33
   * are NOT running (i.e. before notifying the work condition, after
34
   * the input_complete_barrier has reset.)
35
   */
36
      const uint8_t* m_input = nullptr;
37

38
      /*
39
   * The length of the work that needs to be done.
40
   */
41
      size_t m_input_length = 0;
42
};
43

44
/*
45
* Threaded_Fork constructor
46
*/
47
Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
1✔
48
      Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
1✔
49
   Filter* filters[4] = {f1, f2, f3, f4};
1✔
50
   set_next(filters, 4);
1✔
51
}
1✔
52

53
/*
54
* Threaded_Fork constructor
55
*/
56
Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
1✔
57
      Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
1✔
58
   set_next(filters, count);
1✔
59
}
1✔
60

61
Threaded_Fork::~Threaded_Fork() {
4✔
62
   m_thread_data->m_input = nullptr;
2✔
63
   m_thread_data->m_input_length = 0;
2✔
64

65
   m_thread_data->m_input_ready_semaphore.release(m_threads.size());
2✔
66

67
   for(auto& thread : m_threads) {
9✔
68
      thread->join();
7✔
69
   }
70
}
6✔
71

72
std::string Threaded_Fork::name() const {
×
73
   return "Threaded Fork";
×
74
}
75

76
void Threaded_Fork::set_next(Filter* f[], size_t n) {
2✔
77
   Fork::set_next(f, n);
2✔
78
   n = m_next.size();
2✔
79

80
   if(n < m_threads.size()) {
2✔
81
      m_threads.resize(n);
×
82
   } else {
83
      m_threads.reserve(n);
2✔
84
      for(size_t i = m_threads.size(); i != n; ++i) {
9✔
85
         m_threads.push_back(std::make_shared<std::thread>(std::bind(&Threaded_Fork::thread_entry, this, m_next[i])));
7✔
86
      }
87
   }
88
}
2✔
89

90
void Threaded_Fork::send(const uint8_t input[], size_t length) {
920✔
91
   if(!m_write_queue.empty()) {
920✔
92
      thread_delegate_work(m_write_queue.data(), m_write_queue.size());
×
93
   }
94
   thread_delegate_work(input, length);
920✔
95

96
   bool nothing_attached = true;
920✔
97
   for(size_t j = 0; j != total_ports(); ++j) {
5,517✔
98
      if(m_next[j]) {
4,597✔
99
         nothing_attached = false;
4,597✔
100
      }
101
   }
102

103
   if(nothing_attached) {
920✔
104
      m_write_queue += std::make_pair(input, length);
×
105
   } else {
106
      m_write_queue.clear();
920✔
107
   }
108
}
920✔
109

110
void Threaded_Fork::thread_delegate_work(const uint8_t input[], size_t length) {
920✔
111
   //Set the data to do.
112
   m_thread_data->m_input = input;
920✔
113
   m_thread_data->m_input_length = length;
920✔
114

115
   //Let the workers start processing.
116
   m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
920✔
117
   m_thread_data->m_input_ready_semaphore.release(total_ports());
920✔
118

119
   //Wait for all the filters to finish processing.
120
   m_thread_data->m_input_complete_barrier.sync();
920✔
121

122
   //Reset the thread data
123
   m_thread_data->m_input = nullptr;
920✔
124
   m_thread_data->m_input_length = 0;
920✔
125
}
920✔
126

127
void Threaded_Fork::thread_entry(Filter* filter) {
7✔
128
   while(true) {
4,597✔
129
      m_thread_data->m_input_ready_semaphore.acquire();
4,604✔
130

131
      if(!m_thread_data->m_input) {
4,604✔
132
         break;
133
      }
134

135
      filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
4,597✔
136
      m_thread_data->m_input_complete_barrier.sync();
4,597✔
137
   }
138
}
7✔
139

140
}  // namespace Botan
141

142
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc