• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

randombit / botan / 5079590438

25 May 2023 12:28PM UTC coverage: 92.228% (+0.5%) from 91.723%
5079590438

Pull #3502

github

Pull Request #3502: Apply clang-format to the codebase

75589 of 81959 relevant lines covered (92.23%)

12139530.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.73
/src/lib/filters/threaded_fork.cpp
1
/*
2
* Threaded Fork
3
* (C) 2013 Joel Low
4
*     2013 Jack Lloyd
5
*
6
* Botan is released under the Simplified BSD License (see license.txt)
7
*/
8

9
#include <botan/filters.h>
10

11
#if defined(BOTAN_HAS_THREAD_UTILS)
12

13
   #include <botan/internal/barrier.h>
14
   #include <botan/internal/semaphore.h>
15
   #include <functional>
16

17
namespace Botan {
18

19
struct Threaded_Fork_Data {
2✔
20
      /*
21
   * Semaphore for indicating that there is work to be done (or to
22
   * quit)
23
   */
24
      Semaphore m_input_ready_semaphore;
25

26
      /*
27
   * Synchronises all threads to complete processing data in lock-step.
28
   */
29
      Barrier m_input_complete_barrier;
30

31
      /*
32
   * The work that needs to be done. This should be only when the threads
33
   * are NOT running (i.e. before notifying the work condition, after
34
   * the input_complete_barrier has reset.)
35
   */
36
      const uint8_t* m_input = nullptr;
37

38
      /*
39
   * The length of the work that needs to be done.
40
   */
41
      size_t m_input_length = 0;
42
};
43

44
/*
45
* Threaded_Fork constructor
46
*/
47
Threaded_Fork::Threaded_Fork(Filter* f1, Filter* f2, Filter* f3, Filter* f4) :
1✔
48
      Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
1✔
49
   Filter* filters[4] = {f1, f2, f3, f4};
1✔
50
   set_next(filters, 4);
1✔
51
}
1✔
52

53
/*
54
* Threaded_Fork constructor
55
*/
56
Threaded_Fork::Threaded_Fork(Filter* filters[], size_t count) :
1✔
57
      Fork(nullptr, static_cast<size_t>(0)), m_thread_data(new Threaded_Fork_Data) {
1✔
58
   set_next(filters, count);
1✔
59
}
1✔
60

61
Threaded_Fork::~Threaded_Fork() {
4✔
62
   m_thread_data->m_input = nullptr;
2✔
63
   m_thread_data->m_input_length = 0;
2✔
64

65
   m_thread_data->m_input_ready_semaphore.release(m_threads.size());
2✔
66

67
   for(auto& thread : m_threads)
9✔
68
      thread->join();
7✔
69
}
6✔
70

71
std::string Threaded_Fork::name() const { return "Threaded Fork"; }
×
72

73
void Threaded_Fork::set_next(Filter* f[], size_t n) {
2✔
74
   Fork::set_next(f, n);
2✔
75
   n = m_next.size();
2✔
76

77
   if(n < m_threads.size())
2✔
78
      m_threads.resize(n);
×
79
   else {
80
      m_threads.reserve(n);
2✔
81
      for(size_t i = m_threads.size(); i != n; ++i) {
9✔
82
         m_threads.push_back(std::make_shared<std::thread>(std::bind(&Threaded_Fork::thread_entry, this, m_next[i])));
7✔
83
      }
84
   }
85
}
2✔
86

87
void Threaded_Fork::send(const uint8_t input[], size_t length) {
920✔
88
   if(!m_write_queue.empty())
920✔
89
      thread_delegate_work(m_write_queue.data(), m_write_queue.size());
×
90
   thread_delegate_work(input, length);
920✔
91

92
   bool nothing_attached = true;
920✔
93
   for(size_t j = 0; j != total_ports(); ++j)
5,517✔
94
      if(m_next[j])
4,597✔
95
         nothing_attached = false;
4,597✔
96

97
   if(nothing_attached)
920✔
98
      m_write_queue += std::make_pair(input, length);
×
99
   else
100
      m_write_queue.clear();
920✔
101
}
920✔
102

103
void Threaded_Fork::thread_delegate_work(const uint8_t input[], size_t length) {
920✔
104
   //Set the data to do.
105
   m_thread_data->m_input = input;
920✔
106
   m_thread_data->m_input_length = length;
920✔
107

108
   //Let the workers start processing.
109
   m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
920✔
110
   m_thread_data->m_input_ready_semaphore.release(total_ports());
920✔
111

112
   //Wait for all the filters to finish processing.
113
   m_thread_data->m_input_complete_barrier.sync();
920✔
114

115
   //Reset the thread data
116
   m_thread_data->m_input = nullptr;
920✔
117
   m_thread_data->m_input_length = 0;
920✔
118
}
920✔
119

120
void Threaded_Fork::thread_entry(Filter* filter) {
7✔
121
   while(true) {
4,597✔
122
      m_thread_data->m_input_ready_semaphore.acquire();
4,604✔
123

124
      if(!m_thread_data->m_input)
4,604✔
125
         break;
126

127
      filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
4,597✔
128
      m_thread_data->m_input_complete_barrier.sync();
4,597✔
129
   }
130
}
7✔
131

132
}
133

134
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc