• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jacquesg / p5-Neovim-Ext / 453

pending completion
453

push

azure-pipelines

Jacques Germishuys
Changes: update

780 of 953 relevant lines covered (81.85%)

676.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.4
/lib/Neovim/Ext/MsgPack/RPC/EventLoop.pm
1
package Neovim::Ext::MsgPack::RPC::EventLoop;
2

3
use strict;
275✔
4
use warnings;
275✔
5
use base qw/Class::Accessor/;
275✔
6
use Scalar::Util qw/weaken/;
275✔
7
use IPC::Open3 qw/open3/;
275✔
8
use IO::Handle;
275✔
9
use IO::Async::Loop;
275✔
10
use IO::Async::Signal;
275✔
11
use IO::Async::Stream;
275✔
12
use IO::Socket::INET;
275✔
13
use IO::Socket::UNIX;
275✔
14
use Time::HiRes qw/usleep/;
275✔
15
use Socket qw/SOCK_STREAM/;
275✔
16
__PACKAGE__->mk_accessors (qw/loop stream data_cb _transport_type _can_close _pid signals/);
17

18

19
sub new
20
{
21
        my $this = shift;
270✔
22
        my $transport_type = shift;
270✔
23

24
        my $class = ref ($this) || $this;
270✔
25
        my $self =
270✔
26
        {
27
                loop => IO::Async::Loop->new,
28
                signals => [],
29
                _transport_type => $transport_type,
30
        };
31

32
        my $obj = bless $self, $class;
270✔
33
        if ($transport_type eq 'stdio')
270✔
34
        {
35
                $obj->connect_stdio (@_);
×
36
        }
37
        elsif ($transport_type eq 'tcp')
38
        {
39
                $obj->connect_tcp (@_);
260✔
40
        }
41
        elsif ($transport_type eq 'socket')
42
        {
43
                $obj->connect_socket (@_);
5✔
44
        }
45
        elsif ($transport_type eq 'child')
46
        {
47
                $obj->connect_child (@_);
5✔
48
        }
49
        else
50
        {
51
                die "Unsupported transport type: $transport_type\n";
×
52
        }
53

54
        return $obj;
265✔
55
}
56

57

58

59
sub DESTROY
60
{
61
        my ($this) = @_;
5✔
62

63
        if ($this->_pid)
5✔
64
        {
65
                waitpid ($this->_pid, 0);
×
66
        }
67
}
68

69

70

71
sub connect_stdio
72
{
73
        my ($this) = @_;
×
74

75
        binmode STDIN;
×
76
        binmode STDOUT;
×
77

78
        STDIN->blocking (0);
×
79
        STDOUT->blocking (0);
×
80

81
        $this->_create_stream (read_handle  => \*STDIN, write_handle => \*STDOUT);
×
82
}
83

84

85

86
sub connect_tcp
87
{
88
        my ($this, $address, $port, $retries, $retryInterval) = @_;
260✔
89

90
        $retries //= 0;
260✔
91
        $retryInterval //= 100;
260✔
92

93
AGAIN:
283✔
94
        my $socket = IO::Socket::INET->new
95
        (
96
                PeerAddr => $address,
97
                PeerPort => $port,
98
                Proto    => 'tcp',
99
                Type     => SOCK_STREAM
100
        );
101

102
        if (!$socket)
283✔
103
        {
104
                if ($retries == 0)
23✔
105
                {
106
                        die "Couldn't connect to $address:$port: $!\n";
×
107
                }
108

109
                --$retries;
23✔
110
                usleep ($retryInterval*1000);
23✔
111
                goto AGAIN;
23✔
112
        }
113

114
        $socket->blocking (0);
260✔
115

116
        $this->_create_stream (handle => $socket);
260✔
117
        $this->_can_close (1);
260✔
118
}
119

120

121

122
sub connect_socket
123
{
124
        my ($this, $path, $retries, $retryInterval) = @_;
5✔
125

126
        $retries //= 0;
5✔
127
        $retryInterval //= 100;
5✔
128

129
AGAIN:
255✔
130
        my $socket = IO::Socket::UNIX->new
131
        (
132
                Type => SOCK_STREAM,
133
                Peer => $path,
134
        );
135

136
        if (!$socket)
255✔
137
        {
138
                if ($retries == 0)
255✔
139
                {
140
                        die "Couldn't connect to $path: $!\n";
5✔
141
                }
142

143
                --$retries;
250✔
144
                usleep ($retryInterval*1000);
250✔
145
                goto AGAIN;
250✔
146
        }
147

148
        $socket->blocking (0);
×
149

150
        $this->_create_stream (handle => $socket);
×
151
        $this->_can_close (1);
×
152
}
153

154

155

156
sub connect_child
157
{
158
        my ($this, $argv) = @_;
5✔
159

160
        if ($^O eq 'MSWin32')
5✔
161
        {
162
                die "Not supported!";
×
163
        }
164

165
        $this->_pid (open3 (\*CHILD_IN, \*CHILD_OUT, \*ERR, @$argv));
5✔
166

167
        CHILD_IN->blocking (0);
5✔
168
        CHILD_OUT->blocking (0);
5✔
169

170
        $this->_create_stream (read_handle => \*CHILD_OUT, write_handle => \*CHILD_IN);
5✔
171
        $this->_can_close (1);
5✔
172
}
173

174

175

176
sub _create_stream
177
{
178
        my ($this, %options) = @_;
265✔
179

180
        $this->stream (IO::Async::Stream->new
265✔
181
                (
182
                        %options,
183
                        read_all => 1,
184
                        autoflush => 1,
185
                        close_on_read_eof => 1,
186
                        on_read => $this->_on_read(),
187
                        on_read_error => $this->_on_read_error(),
188
                        on_read_eof => $this->_on_read_eof(),
189
                )
190
        );
191

192
        $this->loop->add ($this->stream);
265✔
193
}
194

195

196

197
sub _on_read
198
{
199
        my ($this) = @_;
265✔
200

201
        my $loop = $this;
265✔
202
        weaken ($loop);
265✔
203

204
        return sub
205
        {
206
                my ($self, $buffref, $eof) = @_;
4,710✔
207

208
                # Consume all the data
209
                my $data = $$buffref;
4,710✔
210
                $$buffref = '';
4,710✔
211

212
                $loop->data_cb->($data);
4,710✔
213

214
                return 0;
4,710✔
215
        };
265✔
216
}
217

218

219

220
sub _on_read_error
221
{
222
        my ($this) = @_;
265✔
223

224
        my $loop = $this;
265✔
225
        weaken ($loop);
265✔
226

227
        return sub
228
        {
229
                my (undef, $error) = @_;
×
230
                $loop->stop();
×
231
                die "handle read error: $error\n";
×
232
        };
265✔
233
}
234

235

236

237
sub _on_read_eof
238
{
239
        my ($this) = @_;
265✔
240

241
        my $loop = $this;
265✔
242
        weaken ($loop);
265✔
243

244
        return sub
245
        {
246
                $loop->stop();
10✔
247
                die "handle read eof\n";
10✔
248
        };
265✔
249
}
250

251

252

253
sub _on_signal
254
{
255
        my ($this) = @_;
7,620✔
256

257
        my $loop = $this;
7,620✔
258
        weaken ($loop);
7,620✔
259

260
        return sub
261
        {
262
                my ($signal) = @_;
×
263

264
                if ($loop->_transport_type eq 'stdio' && $signal eq 'INT')
×
265
                {
266
                        # Probably running as a nvim child process, we don't want
267
                        # to be killed by ctrl+C
268
                        return;
×
269
                }
270

271
                $loop->stop();
×
272
        };
7,620✔
273
}
274

275

276

277
sub send
278
{
279
        my ($this, $data) = @_;
4,200✔
280
        $this->stream->write ($data);
4,200✔
281
}
282

283

284

285
sub create_future
286
{
287
        my ($this) = @_;
3,880✔
288
        return $this->loop->new_future;
3,880✔
289
}
290

291

292

293
sub await
294
{
295
        my ($this, $future) = @_;
90✔
296

297
        my @result = $this->loop->await ($future);
90✔
298
        return @result;
90✔
299
}
300

301

302

303
sub _setup_signals
304
{
305
        my ($this, @signals) = @_;
3,810✔
306

307
        return if ($^O eq 'MSWin32');
3,810✔
308

309
        foreach my $signal (@signals)
3,810✔
310
        {
311
                my $handler = $this->_on_signal();
7,620✔
312

313
                my $signal = IO::Async::Signal->new
314
                (
315
                        name => $signal,
316
                        on_receipt => sub
317
                        {
318
                                $handler->($signal);
×
319
                        }
320
                );
7,620✔
321

322
                $this->loop->add ($signal);
7,620✔
323
                push @{$this->signals}, $signal;
7,620✔
324
        }
325
}
326

327

328

329
sub _teardown_signals
330
{
331
        my ($this) = @_;
3,800✔
332

333
        while (my $signal = shift @{$this->signals})
3,800✔
334
        {
335
                $this->loop->remove ($signal);
7,600✔
336
        }
337
}
338

339

340

341
sub run
342
{
343
        my ($this, $data_cb, $setup_cb) = @_;
3,810✔
344

345
        $this->loop->later ($setup_cb) if ($setup_cb);
3,810✔
346

347
        $this->data_cb ($data_cb);
3,810✔
348

349
        $this->_setup_signals ('INT', 'TERM');
3,810✔
350
        $this->loop->run;
3,810✔
351
        $this->_teardown_signals();
3,800✔
352
}
353

354

355

356
sub stop
357
{
358
        my ($this) = @_;
3,810✔
359
        $this->loop->stop;
3,810✔
360
}
361

362

363

364
sub close
365
{
366
        my ($this) = @_;
×
367

368
        $this->stream->close_now if ($this->_can_close);
×
369
}
370

371
=head1 NAME
372

373
Neovim::Ext::MsgPack::RPC::EventLoop - Neovim::Ext::MsgPack::RPC::EventLoop class
374

375
=head1 SYNOPSIS
376

377
        use Neovim::Ext;
378

379
=head1 METHODS
380

381
=head2 connect_stdio( )
382

383
Connect using stdin/stdout.
384

385
=head2 connect_tcp( $address, $port )
386

387
Connect to tcp/ip C<$address>:C<port>.
388

389
=head2 connect_socket( $path )
390

391
Connect to socket at C<$path>.
392

393
=head2 connect_child( \@argv )
394

395
Connect to a new Nvim instance. Uses C<\@argv> as the argument vector to
396
spawn an embedded Nvim. This isn't support on Windows.
397

398
=head2 create_future( )
399

400
Create a future.
401

402
=head2 await( $future)
403

404
Wait for C<$future> to complete.
405

406
=head2 close( )
407

408
Stop the event loop.
409

410
=head2 send( $data )
411

412
Queue C<$data> for sending to Nvim.
413

414
=head2 run( $data_cb )
415

416
Run the event loop, calling C<$data_cb> for each message received.
417

418
=head2 stop( )
419

420
Stop the event loop.
421

422
=cut
423

424
1;
425

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc