• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

xapi-project / xen-api / 13457957190

21 Feb 2025 01:35PM CUT coverage: 78.516%. Remained the same
13457957190

Pull #6312

github

Vincent-lau
CA-407033: Call `receive_finalize2` synchronously

`Remote.receive_finalize2` is called at the end of SXM to clean things
up and compose the base and leaf images together. The compose operation
should only be called while the VDI is deactivated. Currently a thread
is created to call `receive_finalize2`, which could caused problems
where the VM itself gets started while the `receive_finalize2`/`VDI.compose`
is still in progress. This is not a safe operation to do.

The fix here is to simply remove the thread and make the whole operation
sequential.

Signed-off-by: Vincent Liu <shuntian.liu2@cloud.com>
Pull Request #6312: CA-407033: Call `receive_finalize2` synchronously

3512 of 4473 relevant lines covered (78.52%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.97
/python3/packages/observer.py
1
# Copyright (C) Cloud Software Group.
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU Lesser General Public License as published
5
# by the Free Software Foundation; version 2.1 only. with the special
6
# exception on linking described in file LICENSE.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU Lesser General Public License for more details.
12
#
13

14
"""
15
Calls the passed script with its original arguments, instrumenting it to make a
16
trace of all function calls if at least one *observer.conf file exists in the
17
OBSERVER_CONFIG_DIR directory.
18

19
If there are no *observer.conf files or something fails, this script runs the
20
passed script without any instrumentation.
21
"""
22

23
import time
1✔
24

25
def to_otel_timestamp(ts):
1✔
26
    return int(ts * 1000000000)
1✔
27

28
observer_ts_start = to_otel_timestamp(time.time())
1✔
29
observer_mono_start = time.monotonic()
1✔
30

31
def current_otel_time():
1✔
32
    return observer_ts_start + to_otel_timestamp(time.monotonic() - observer_mono_start)
1✔
33

34
import configparser
1✔
35
import functools
1✔
36
import inspect
1✔
37
import logging
1✔
38
import os
1✔
39
import runpy
1✔
40
import sys
1✔
41
import traceback
1✔
42
import types
1✔
43
from datetime import datetime, timezone
1✔
44
from logging.handlers import SysLogHandler
1✔
45
from typing import List, Sequence
1✔
46

47
# The OpenTelemetry library may generate exceptions we aren't expecting: This code
48
# must not fail or it will cause the pass-through script to fail when at worst
49
# this script should be a noop. As such, we sometimes need to catch broad exceptions:
50
# pylint: disable=broad-exception-caught, too-many-locals, too-many-statements
51
# wrapt.decorator adds the extra parameters so we shouldn't provide them:
52
# pylint: disable=no-value-for-parameter
53
# We only want to import OpenTelemetry libraries when instrumentation is enabled
54
# pylint: disable=import-outside-toplevel
55

56
DEBUG_ENABLED = os.getenv("XAPI_TEST")
1✔
57
DEFAULT_MODULES = "LVHDSR,XenAPI,SR,SRCommand,util"
1✔
58
FORMAT = "observer.py: %(message)s"
1✔
59
handler = SysLogHandler(facility="local5", address="/dev/log")
1✔
60
logging.basicConfig(format=FORMAT, handlers=[handler])
1✔
61
syslog = logging.getLogger(__name__)
1✔
62
if DEBUG_ENABLED:
1✔
63
    syslog.setLevel(logging.DEBUG)
1✔
64
else:
65
    syslog.setLevel(logging.INFO)
1✔
66
debug = syslog.debug
1✔
67

68

69
def _get_configs_list(config_dir):
1✔
70
    try:
1✔
71
        # There can be many observer config files in the configuration directory
72
        return [
1✔
73
            f"{config_dir}/{f}"
74
            for f in os.listdir(config_dir)
75
            if os.path.isfile(os.path.join(config_dir, f))
76
            and f.endswith("observer.conf")
77
        ]
78
    except FileNotFoundError as err:
1✔
79
        debug("configs exception: %s", err)
1✔
80
        return []
1✔
81

82

83
def read_config(config_path, header):
1✔
84
    """Read a config file and return a dictionary of key-value pairs."""
85

86
    parser = configparser.ConfigParser(interpolation=None)
1✔
87
    with open(config_path, encoding="utf-8") as config_file:
1✔
88
        try:
1✔
89
            parser.read_string(f"[{header}]\n{config_file.read()}")
1✔
90
        except configparser.ParsingError as e:
1✔
91
            debug("read_config(): invalid config file %s: %s", config_path, e)
1✔
92
            return {}
1✔
93

94
    config = {k: v.strip("'") for k, v in dict(parser[header]).items()}
1✔
95
    debug("%s: %s", config_path, config)
1✔
96
    return config
1✔
97

98

99
def _span_noop(wrapped=None, span_name_prefix=""):
1✔
100
    """Noop decorator. Overridden by _init_tracing() if there are configs."""
101
    if wrapped is None:
1✔
102
        return functools.partial(_span_noop, span_name_prefix=span_name_prefix)
1✔
103

104
    return wrapped
1✔
105

106

107
def _patch_module_noop(_):
1✔
108
    """Noop patch_module. Overridden by _init_tracing() if there are configs."""
109

110

111
def _init_tracing(configs: List[str], config_dir: str):
1✔
112
    """
113
    Initialise tracing with the given configuration files.
114

115
    If configs is empty, return the noop span and patch_module functions.
116
    If configs are passed:
117
    - Import the OpenTelemetry packages
118
    - Read the configuration file
119
    - Create a tracer
120
    - Trace the script
121
    - Return the span and patch_module functions for tracing the program
122
    """
123
    if not configs:
1✔
124
        return _span_noop, _patch_module_noop
1✔
125

126
    try:
1✔
127
        from warnings import simplefilter
1✔
128

129
        # On 3.10-3.12, the import of wrapt might trigger warnings, filter them:
130
        simplefilter(action="ignore", category=DeprecationWarning)
1✔
131

132
        import_ts_start = current_otel_time()
1✔
133

134
        import wrapt # type: ignore[import-untyped]
1✔
135
        from opentelemetry import context, trace
1✔
136
        from opentelemetry.baggage.propagation import W3CBaggagePropagator
1✔
137
        from opentelemetry.exporter.zipkin.json import ZipkinExporter
1✔
138
        from opentelemetry.sdk.resources import Resource
1✔
139
        from opentelemetry.sdk.trace import TracerProvider
1✔
140
        from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExportResult
1✔
141
        from opentelemetry.trace.propagation.tracecontext import (
1✔
142
            TraceContextTextMapPropagator,
143
        )
144

145
        import_ts_end = current_otel_time()
1✔
146
    except ImportError as err:
×
147
        syslog.error("missing opentelemetry dependencies: %s", err)
×
148
        return _span_noop, _patch_module_noop
×
149

150
    try:
1✔
151
        config_dict = read_config(f"{config_dir}/all.conf", header="default")
1✔
152
    except FileNotFoundError:
1✔
153
        config_dict = {}
1✔
154
    module_names = config_dict.get("module_names", DEFAULT_MODULES).split(",")
1✔
155
    debug("module_names: %s", module_names)
1✔
156

157
    # pylint: disable=too-few-public-methods
158
    class FileZipkinExporter(ZipkinExporter):
1✔
159
        """Class to export spans to a file in Zipkin format."""
160

161
        def __init__(self, *args, **kwargs):
1✔
162
            self.bugtool_filename_callback = kwargs.pop("filename_callback")
×
163
            self.bugtool_filename = self.bugtool_filename_callback()
×
164
            self.trace_log_dir = kwargs.pop("trace_log_dir")
×
165
            debug("bugtool filename=%s", self.bugtool_filename)
×
166
            self.bytes_written = 0
×
167
            super().__init__(*args, **kwargs)
×
168

169
        def export(self, spans: Sequence[trace.Span]) -> SpanExportResult:
1✔
170
            """Export the given spans to the file endpoint."""
171

172
            data = self.encoder.serialize(spans, self.local_node)
×
173
            datastr = str(data)
×
174
            debug("data.type=%s,data.len=%s", type(data), len(datastr))
×
175
            debug("data=%s", datastr)
×
176
            os.makedirs(name=self.trace_log_dir, exist_ok=True)
×
177

178
            with open(self.bugtool_filename, "a", encoding="utf-8") as bugtool_file:
×
179
                bugtool_file.write(f"{datastr}\n")  # ndjson
×
180
            self.bytes_written += len(data)
×
181

182
            # Create new file if it gets > 1MB
183
            if self.bytes_written > 1024 * 1024:
×
184
                self.bugtool_filename = self.bugtool_filename_callback()
×
185
                self.bytes_written = 0
×
186

187
            return SpanExportResult.SUCCESS
×
188

189
    def create_tracer_from_config(path):
1✔
190
        """Create a tracer from a config file."""
191

192
        otelvars = "opentelemetry-python.readthedocs.io/en/latest/sdk/environment_variables.html"
1✔
193
        config = read_config(path, header=otelvars)
1✔
194
        config_otel_resource_attrs = config.get("otel_resource_attributes", "")
1✔
195

196
        if config_otel_resource_attrs:
1✔
197
            # OTEL requires some attributes e.g. service.name
198
            # to be in the environment variable
199
            os.environ["OTEL_RESOURCE_ATTRIBUTES"] = config_otel_resource_attrs
1✔
200

201
        trace_log_dir = config.get("xs_exporter_bugtool_endpoint", "")
1✔
202

203
        zipkin_endpoints = config.get("xs_exporter_zipkin_endpoints")
1✔
204
        otel_exporter_zipkin_endpoints = (
1✔
205
            zipkin_endpoints.split(",") if zipkin_endpoints else []
206
        )
207
        otel_resource_attrs = dict(
1✔
208
            item.split("=")
209
            for item in config.get("otel_resource_attributes", "").split(",")
210
            if "=" in item
211
        )
212

213
        service_name = config.get(
1✔
214
            "otel_service_name", otel_resource_attrs.get("service.name", "unknown")
215
        )
216
        host_uuid = otel_resource_attrs.get("xs.host.uuid", "unknown")
1✔
217
        # Remove . to prevent users changing directories in the bugtool_filenamer
218
        tracestate = os.getenv("TRACESTATE", "unknown").strip("'").replace(".", "")
1✔
219

220
        # rfc3339
221
        def bugtool_filenamer():
1✔
222
            """Return an rfc3339-compliant ndjson file name."""
223
            now = datetime.now(timezone.utc).isoformat()
×
224
            return (
×
225
                f"{trace_log_dir}/{service_name}-{host_uuid}-{tracestate}-{now}.ndjson"
226
            )
227

228
        traceparent = os.getenv("TRACEPARENT", None)
1✔
229
        propagator = TraceContextTextMapPropagator()
1✔
230
        context_with_traceparent = propagator.extract({"traceparent": traceparent})
1✔
231

232
        context.attach(context_with_traceparent)
1✔
233

234
        # Create a tracer provider with the given resource attributes
235
        provider = TracerProvider(
1✔
236
            resource=Resource.create(
237
                W3CBaggagePropagator().extract({}, otel_resource_attrs)
238
            )
239
        )
240

241
        # Add a span processor for each endpoint defined in the config
242
        if trace_log_dir:
1✔
243
            processor_file_zipkin = BatchSpanProcessor(
1✔
244
                FileZipkinExporter(
245
                    filename_callback=bugtool_filenamer,
246
                    trace_log_dir=trace_log_dir
247
                )
248
            )
249
            provider.add_span_processor(processor_file_zipkin)
1✔
250
        for zipkin_endpoint in otel_exporter_zipkin_endpoints:
1✔
251
            processor_zipkin = BatchSpanProcessor(
×
252
                ZipkinExporter(endpoint=zipkin_endpoint)
253
            )
254
            provider.add_span_processor(processor_zipkin)
×
255

256
        trace.set_tracer_provider(provider)
1✔
257
        return trace.get_tracer(__name__)
1✔
258

259
    tracers = list(map(create_tracer_from_config, configs))
1✔
260
    debug("tracers=%s", tracers)
1✔
261

262
    def span_of_tracers(wrapped=None, span_name_prefix=""):
1✔
263
        """
264
        Public decorator that creates a trace around a function.
265

266
        If there are no tracers, the function is called without any tracing.
267
        If there are tracers, the function is called with a trace around it.
268

269
        It creates a span with the given span name prefix and then clones
270
        the returned span for each of the existing traces to produce a nested
271
        trace for each of them.
272

273
        Args:
274
            wrapped: The function to be traced.
275
            span_name_prefix: The prefix to be added to the span name.
276

277
        Returns:
278
            The decorated function or a partial function if wrapped is None.
279

280
        If wrapped is None, the decorator is being used with parameters and
281
        a partial function is returned instead of the decorated function so
282
        that the function is decorated properly on the second pass.
283
        """
284
        if wrapped is None:  # handle decorators with parameters
1✔
285
            return functools.partial(span_of_tracers, span_name_prefix=span_name_prefix)
1✔
286

287
        @wrapt.decorator
1✔
288
        def instrument_function(wrapped, _, args, kwargs):
1✔
289
            """Decorator that creates a trace around a function."""
290
            if not tracers:
1✔
291
                return wrapped(*args, **kwargs)
×
292

293
            module_name = wrapped.__module__ if hasattr(wrapped, "__module__") else ""
1✔
294
            qual_name = wrapped.__qualname__ if hasattr(wrapped, "__qualname__") else ""
1✔
295

296
            if not module_name and not qual_name:
1✔
297
                span_name = str(wrapped)
×
298
            else:
299
                prefix = f"{span_name_prefix}:" if span_name_prefix else ""
1✔
300
                span_name = f"{prefix}{module_name}:{qual_name}"
1✔
301

302
            tracer = tracers[0]
1✔
303
            with tracer.start_as_current_span(span_name) as aspan:
1✔
304
                if inspect.isclass(wrapped):
1✔
305
                    # class or classmethod
306
                    aspan.set_attribute("xs.span.args.str", str(args))
×
307
                    aspan.set_attribute("xs.span.kwargs.str", str(kwargs))
×
308
                elif isinstance(wrapped, wrapt.PartialCallableObjectProxy):
1✔
309
                    pass
310
                elif isinstance(wrapped, (types.FunctionType, types.MethodType)):
1✔
311
                    # function, staticmethod or instancemethod
312
                    bound_args = inspect.signature(wrapped).bind(*args, **kwargs)
1✔
313
                    bound_args.apply_defaults()
1✔
314
                    for k, v in bound_args.arguments.items():
1✔
315
                        aspan.set_attribute(f"xs.span.arg.{k}", str(v))
1✔
316

317
                # must be inside "aspan" to produce nested trace
318
                result = wrapped(*args, **kwargs)
1✔
319
            return result
1✔
320

321
        def autoinstrument_class(aclass):
1✔
322
            """Auto-instrument a class."""
323

324
            t = tracers[0]
×
325
            class_name = f"{aclass.__module__}:{aclass.__qualname__}"
×
326

327
            with t.start_as_current_span(f"auto_instrumentation.add_class: {class_name}"):
×
328
                for method_name, method in aclass.__dict__.items():
×
329
                    if not callable(getattr(aclass, method_name)):
×
330
                        continue
×
331

332
                    with t.start_as_current_span(
×
333
                        f"class.instrument:{class_name}.{method_name}"
334
                    ):
335
                        # Avoid RecursionError:
336
                        # 'maximum recursion depth exceeded in comparison'
337
                        # in the XenAPI module (triggered by XMLRPC calls in it):
338
                        if method_name in ["__getattr__", "__call__", "__init__"]:
×
339
                            continue
×
340
                        try:
×
341
                            setattr(aclass, method_name, instrument_function(method))
×
342
                        except Exception:
343
                            debug(
344
                                "setattr.instrument_function: Exception %s",
345
                                traceback.format_exc(),
346
                            )
347

348

349
        def autoinstrument_module(amodule):
1✔
350
            """Autoinstrument the classes and functions in a module."""
351

352
            with tracers[0].start_as_current_span(f"auto_instrumentation.add_module: {amodule}"):
×
353
                # Instrument the methods of the classes in the module
354
                for _, aclass in inspect.getmembers(amodule, inspect.isclass):
×
355
                    try:
×
356
                        autoinstrument_class(aclass)
×
357
                    except Exception:
358
                        debug("instrument_function: Exception %s", traceback.format_exc())
359

360
                # Instrument the module-level functions of the module
361
                for fname, afunction in inspect.getmembers(amodule, inspect.isfunction):
×
362
                    setattr(amodule, fname, instrument_function(afunction))
×
363

364
        if inspect.ismodule(wrapped):
1✔
365
            autoinstrument_module(wrapped)
×
366

367
        return instrument_function(wrapped)
1✔
368

369
    def _patch_module(module_name):
1✔
370
        wrapt.importer.discover_post_import_hooks(module_name)
1✔
371
        wrapt.importer.when_imported(module_name)(
1✔
372
            lambda hook: span_of_tracers(wrapped=hook)
373
        )
374

375
    for m in module_names:
1✔
376
        _patch_module(m)
1✔
377

378
    # Create spans to track observer.py's setup duration
379
    t = tracers[0]
1✔
380
    with t.start_as_current_span("observer.py:init_tracing", start_time=observer_ts_start):
1✔
381
        import_span = t.start_span("observer.py:imports", start_time=import_ts_start)
1✔
382
        import_span.end(end_time=import_ts_end)
1✔
383

384
    return span_of_tracers, _patch_module
1✔
385

386

387
observer_config_dir = os.getenv("OBSERVER_CONFIG_DIR", default=".").strip("'")
1✔
388
observer_configs = _get_configs_list(observer_config_dir)
1✔
389
debug("configs = %s", observer_configs)
1✔
390

391
try:
1✔
392
    # If there are configs, span and patch_module are now operational
393
    # and can be used to trace the program.
394
    # If there are no configs, or an exception is raised, span and patch_module
395
    # are not overridden and will be the defined no-op functions.
396
    span, patch_module = _init_tracing(observer_configs, observer_config_dir)
1✔
397

398
    # If tracing is now operational, explicitly set "OTEL_SDK_DISABLED" to "false".
399
    # In our case, different from the standard, we want the tracing disabled by
400
    # default, so if the env variable is not set the noop implementation is used.
401
    os.environ["OTEL_SDK_DISABLED"] = "false"
1✔
402
except Exception as exc:
×
403
    syslog.error("Exception while setting up tracing, running script untraced: %s", exc)
×
404
    span, patch_module = _span_noop, _patch_module_noop
×
405

406

407
def main():
1✔
408
    """
409
    Run the passed python script using the runpy module, passing the given arguments.
410

411
    The program will be automatically instrumented when the corresponding module
412
    in the program is imported.
413
    """
414

415
    # When sys.argv has only argv[0], but no command to call, exit with an error message
416
    if len(sys.argv) < 2:
1✔
417
        print(__file__ + ": usage: command argument list", file=sys.stderr)
1✔
418
        return 31  # EINVAL
1✔
419

420
    # Shift the arguments by one so that the program to run is first in sys.argv
421
    sys.argv = sys.argv[1:]
1✔
422
    argv0 = sys.argv[0]
1✔
423

424
    @span(span_name_prefix=argv0)
1✔
425
    def run(file):
1✔
426
        """Run the given python file calling its __main__ function."""
427

428
        # Defensive error handling should hopefully only be needed in exceptional cases
429
        # but in case things go wrong, this may be a starting point for logging it:
430
        try:
1✔
431
            runpy.run_path(file, run_name="__main__")
1✔
432
            return 0
1✔
433
        except FileNotFoundError as e:
1✔
434
            print(
1✔
435
                f"{__file__} {' '.join(sys.argv)}:\nScript not found: {e.filename}",
436
                file=sys.stderr,
437
            )
438
            return 2
1✔
439
        except Exception as e:
1✔
440
            print(f"{__file__} {' '.join(sys.argv)}:", file=sys.stderr)  # the command
1✔
441
            print("Exception in the traced script:", file=sys.stderr)
1✔
442
            print(e, file=sys.stderr)  # Print the exception message
1✔
443
            print(traceback.format_exc(), file=sys.stderr)  # Print the traceback
1✔
444
            return 139  # This is what the default SIGSEGV handler on Linux returns
1✔
445
        except SystemExit as e: # catch SystemExit so we can close gracefully
1✔
446
            _exit_code = e.code if e.code is not None else 0
1✔
447
            debug("Script exited with code %i", _exit_code)
1✔
448
            # Print the traceback if _exit_code is non-zero
449
            if _exit_code:
1✔
450
                print(traceback.format_exc(), file=sys.stderr)
1✔
451
            return _exit_code
1✔
452

453
    return run(argv0)
1✔
454

455

456
if __name__ == "__main__":
1✔
457
    # Only use sys.exit(ret) raising SystemExit() if the return code is not 0
458
    # to allow test_observer_as_script() to get the globals of observer.py:
459

460
    exit_code = main()  # pylint: disable=invalid-name
1✔
461
    logging.shutdown()  # Reduces the unclosed socket warnings by PYTHONDEVMODE=yes
1✔
462
    if exit_code:
1✔
463
        sys.exit(exit_code)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc