• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 16468558849

23 Jul 2025 10:48AM UTC coverage: 90.735% (-0.01%) from 90.749%
16468558849

push

github

web-flow
feat: added `return_embedding` attr in `in_memory/document_store` (#9622)

* feat: added  to init

* feat: added return_embedding in to_dict

* feat: added  return_embedding to filter_documents

* feat: added return_embedding to  bm25_retrieval

* refactor: embedding_retrieval to use return_embedding attribute rather than parameter passed

* docs: added releasenote

* fix: pop from doc_fields instead of changing return_documents attr to none

* fix: made return_embedding an optional field and removed deprecation warning

* fix: give return_embedding a higher priority than self.return_embedding

* feat: changed default behaviour of return_embedding to True

* chore: update tests after InMemory Document store update

* Update releasenotes/notes/update-in-memory-document-store-17f555695caf9d52.yaml

Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>

* chore: update docs

* chore: enhanced clarity and redability of expression

* test: return_embedding is set to false during initialization

* test: overriding  return_embedding inside

* fix: changed the use of self.filter_documents to actual implementation inside `embedding_retrieval`

Signed-off-by: rafaeljohn9 <rafaeljohb@gmail.com>

---------

Signed-off-by: rafaeljohn9 <rafaeljohb@gmail.com>
Co-authored-by: Sebastian Husch Lee <10526848+sjrl@users.noreply.github.com>

12525 of 13804 relevant lines covered (90.73%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.44
haystack/document_stores/in_memory/document_store.py
1
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
2
#
3
# SPDX-License-Identifier: Apache-2.0
4

5
import asyncio
1✔
6
import json
1✔
7
import math
1✔
8
import re
1✔
9
import uuid
1✔
10
from collections import Counter
1✔
11
from concurrent.futures import ThreadPoolExecutor
1✔
12
from dataclasses import dataclass
1✔
13
from pathlib import Path
1✔
14
from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
1✔
15

16
import numpy as np
1✔
17

18
from haystack import default_from_dict, default_to_dict, logging
1✔
19
from haystack.dataclasses import Document
1✔
20
from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError
1✔
21
from haystack.document_stores.types import DuplicatePolicy
1✔
22
from haystack.utils import expit
1✔
23
from haystack.utils.filters import document_matches_filter
1✔
24

25
logger = logging.getLogger(__name__)
1✔
26

27
# document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
28
# True (default). Scaling uses the expit function (inverse of the logit function) after applying a scaling factor
29
# (e.g., BM25_SCALING_FACTOR for the bm25_retrieval method).
30
# Larger scaling factor decreases scaled scores. For example, an input of 10 is scaled to 0.99 with
31
# BM25_SCALING_FACTOR=2 but to 0.78 with BM25_SCALING_FACTOR=8 (default). The defaults were chosen empirically.
32
# Increase the default if most unscaled scores are larger than expected (>30) and otherwise would incorrectly all be
33
# mapped to scores ~1.
34
BM25_SCALING_FACTOR = 8
1✔
35
DOT_PRODUCT_SCALING_FACTOR = 100
1✔
36

37

38
@dataclass
1✔
39
class BM25DocumentStats:
1✔
40
    """
41
    A dataclass for managing document statistics for BM25 retrieval.
42

43
    :param freq_token: A Counter of token frequencies in the document.
44
    :param doc_len: Number of tokens in the document.
45
    """
46

47
    freq_token: Dict[str, int]
1✔
48
    doc_len: int
1✔
49

50

51
# Global storage for all InMemoryDocumentStore instances, indexed by the index name.
52
_STORAGES: Dict[str, Dict[str, Document]] = {}
1✔
53
_BM25_STATS_STORAGES: Dict[str, Dict[str, BM25DocumentStats]] = {}
1✔
54
_AVERAGE_DOC_LEN_STORAGES: Dict[str, float] = {}
1✔
55
_FREQ_VOCAB_FOR_IDF_STORAGES: Dict[str, Counter] = {}
1✔
56

57

58
class InMemoryDocumentStore:
1✔
59
    """
60
    Stores data in-memory. It's ephemeral and cannot be saved to disk.
61
    """
62

63
    def __init__(  # pylint: disable=too-many-positional-arguments
1✔
64
        self,
65
        bm25_tokenization_regex: str = r"(?u)\b\w\w+\b",
66
        bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25L",
67
        bm25_parameters: Optional[Dict] = None,
68
        embedding_similarity_function: Literal["dot_product", "cosine"] = "dot_product",
69
        index: Optional[str] = None,
70
        async_executor: Optional[ThreadPoolExecutor] = None,
71
        return_embedding: bool = True,
72
    ):
73
        """
74
        Initializes the DocumentStore.
75

76
        :param bm25_tokenization_regex: The regular expression used to tokenize the text for BM25 retrieval.
77
        :param bm25_algorithm: The BM25 algorithm to use. One of "BM25Okapi", "BM25L", or "BM25Plus".
78
        :param bm25_parameters: Parameters for BM25 implementation in a dictionary format.
79
            For example: `{'k1':1.5, 'b':0.75, 'epsilon':0.25}`
80
            You can learn more about these parameters by visiting https://github.com/dorianbrown/rank_bm25.
81
        :param embedding_similarity_function: The similarity function used to compare Documents embeddings.
82
            One of "dot_product" (default) or "cosine". To choose the most appropriate function, look for information
83
            about your embedding model.
84
        :param index: A specific index to store the documents. If not specified, a random UUID is used.
85
            Using the same index allows you to store documents across multiple InMemoryDocumentStore instances.
86
        :param async_executor:
87
            Optional ThreadPoolExecutor to use for async calls. If not provided, a single-threaded
88
            executor will be initialized and used.
89
        :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is True.
90
        """
91
        self.bm25_tokenization_regex = bm25_tokenization_regex
1✔
92
        self.tokenizer = re.compile(bm25_tokenization_regex).findall
1✔
93

94
        if index is None:
1✔
95
            index = str(uuid.uuid4())
1✔
96

97
        self.index = index
1✔
98
        if self.index not in _STORAGES:
1✔
99
            _STORAGES[self.index] = {}
1✔
100

101
        self.bm25_algorithm = bm25_algorithm
1✔
102
        self.bm25_algorithm_inst = self._dispatch_bm25()
1✔
103
        self.bm25_parameters = bm25_parameters or {}
1✔
104
        self.embedding_similarity_function = embedding_similarity_function
1✔
105

106
        # Per-document statistics
107
        if self.index not in _BM25_STATS_STORAGES:
1✔
108
            _BM25_STATS_STORAGES[self.index] = {}
1✔
109

110
        if self.index not in _AVERAGE_DOC_LEN_STORAGES:
1✔
111
            _AVERAGE_DOC_LEN_STORAGES[self.index] = 0.0
1✔
112

113
        if self.index not in _FREQ_VOCAB_FOR_IDF_STORAGES:
1✔
114
            _FREQ_VOCAB_FOR_IDF_STORAGES[self.index] = Counter()
1✔
115

116
        # keep track of whether we own the executor if we created it we must also clean it up
117
        self._owns_executor = async_executor is None
1✔
118
        self.executor = (
1✔
119
            ThreadPoolExecutor(thread_name_prefix=f"async-inmemory-docstore-executor-{id(self)}", max_workers=1)
120
            if async_executor is None
121
            else async_executor
122
        )
123
        self.return_embedding = return_embedding
1✔
124

125
    def __del__(self):
1✔
126
        """
127
        Cleanup when the instance is being destroyed.
128
        """
129
        if hasattr(self, "_owns_executor") and self._owns_executor and hasattr(self, "executor"):
1✔
130
            self.executor.shutdown(wait=True)
1✔
131

132
    def shutdown(self):
1✔
133
        """
134
        Explicitly shutdown the executor if we own it.
135
        """
136
        if self._owns_executor:
1✔
137
            self.executor.shutdown(wait=True)
1✔
138

139
    @property
1✔
140
    def storage(self) -> Dict[str, Document]:
1✔
141
        """
142
        Utility property that returns the storage used by this instance of InMemoryDocumentStore.
143
        """
144
        return _STORAGES.get(self.index, {})
1✔
145

146
    @property
1✔
147
    def _bm25_attr(self) -> Dict[str, BM25DocumentStats]:
1✔
148
        return _BM25_STATS_STORAGES.get(self.index, {})
1✔
149

150
    @property
1✔
151
    def _avg_doc_len(self) -> float:
1✔
152
        return _AVERAGE_DOC_LEN_STORAGES.get(self.index, 0.0)
1✔
153

154
    @_avg_doc_len.setter
1✔
155
    def _avg_doc_len(self, value: float) -> None:
1✔
156
        _AVERAGE_DOC_LEN_STORAGES[self.index] = value
1✔
157

158
    @property
1✔
159
    def _freq_vocab_for_idf(self) -> Counter:
1✔
160
        return _FREQ_VOCAB_FOR_IDF_STORAGES.get(self.index, Counter())
1✔
161

162
    def _dispatch_bm25(self):
1✔
163
        """
164
        Select the correct BM25 algorithm based on user specification.
165

166
        :returns:
167
            The BM25 algorithm method.
168
        """
169
        table = {"BM25Okapi": self._score_bm25okapi, "BM25L": self._score_bm25l, "BM25Plus": self._score_bm25plus}
1✔
170

171
        if self.bm25_algorithm not in table:
1✔
172
            raise ValueError(f"BM25 algorithm '{self.bm25_algorithm}' is not supported.")
1✔
173
        return table[self.bm25_algorithm]
1✔
174

175
    def _tokenize_bm25(self, text: str) -> List[str]:
1✔
176
        """
177
        Tokenize text using the BM25 tokenization regex.
178

179
        Here we explicitly create a tokenization method to encapsulate
180
        all pre-processing logic used to create BM25 tokens, such as
181
        lowercasing. This helps track the exact tokenization process
182
        used for BM25 scoring at any given time.
183

184
        :param text:
185
            The text to tokenize.
186
        :returns:
187
            A list of tokens.
188
        """
189
        text = text.lower()
1✔
190
        return self.tokenizer(text)
1✔
191

192
    def _score_bm25l(self, query: str, documents: List[Document]) -> List[Tuple[Document, float]]:
1✔
193
        """
194
        Calculate BM25L scores for the given query and filtered documents.
195

196
        :param query:
197
            The query string.
198
        :param documents:
199
            The list of documents to score, should be produced by
200
            the filter_documents method; may be an empty list.
201
        :returns:
202
            A list of tuples, each containing a Document and its BM25L score.
203
        """
204
        k = self.bm25_parameters.get("k1", 1.5)
1✔
205
        b = self.bm25_parameters.get("b", 0.75)
1✔
206
        delta = self.bm25_parameters.get("delta", 0.5)
1✔
207

208
        def _compute_idf(tokens: List[str]) -> Dict[str, float]:
1✔
209
            """Per-token IDF computation for all tokens."""
210
            idf = {}
1✔
211
            n_corpus = len(self._bm25_attr)
1✔
212
            for tok in tokens:
1✔
213
                n = self._freq_vocab_for_idf.get(tok, 0)
1✔
214
                idf[tok] = math.log((n_corpus + 1.0) / (n + 0.5)) * int(n != 0)
1✔
215
            return idf
1✔
216

217
        def _compute_tf(token: str, freq: Dict[str, int], doc_len: int) -> float:
1✔
218
            """Per-token BM25L computation."""
219
            freq_term = freq.get(token, 0.0)
1✔
220
            ctd = freq_term / (1 - b + b * doc_len / self._avg_doc_len)
1✔
221
            return (1.0 + k) * (ctd + delta) / (k + ctd + delta)
1✔
222

223
        idf = _compute_idf(self._tokenize_bm25(query))
1✔
224
        bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents}
1✔
225

226
        ret = []
1✔
227
        for doc in documents:
1✔
228
            doc_stats = bm25_attr[doc.id]
1✔
229
            freq = doc_stats.freq_token
1✔
230
            doc_len = doc_stats.doc_len
1✔
231

232
            score = 0.0
1✔
233
            for tok in idf.keys():  # pylint: disable=consider-using-dict-items
1✔
234
                score += idf[tok] * _compute_tf(tok, freq, doc_len)
1✔
235
            ret.append((doc, score))
1✔
236

237
        return ret
1✔
238

239
    def _score_bm25okapi(self, query: str, documents: List[Document]) -> List[Tuple[Document, float]]:
1✔
240
        """
241
        Calculate BM25Okapi scores for the given query and filtered documents.
242

243
        :param query:
244
            The query string.
245
        :param documents:
246
            The list of documents to score, should be produced by
247
            the filter_documents method; may be an empty list.
248
        :returns:
249
            A list of tuples, each containing a Document and its BM25L score.
250
        """
251
        k = self.bm25_parameters.get("k1", 1.5)
1✔
252
        b = self.bm25_parameters.get("b", 0.75)
1✔
253
        epsilon = self.bm25_parameters.get("epsilon", 0.25)
1✔
254

255
        def _compute_idf(tokens: List[str]) -> Dict[str, float]:
1✔
256
            """Per-token IDF computation for all tokens."""
257
            sum_idf = 0.0
1✔
258
            neg_idf_tokens = []
1✔
259

260
            # Although this is a global statistic, we compute it here
261
            # to make the computation more self-contained. And the
262
            # complexity is O(vocab_size), which is acceptable.
263
            idf = {}
1✔
264
            for tok, n in self._freq_vocab_for_idf.items():
1✔
265
                idf[tok] = math.log((len(self._bm25_attr) - n + 0.5) / (n + 0.5))
1✔
266
                sum_idf += idf[tok]
1✔
267
                if idf[tok] < 0:
1✔
268
                    neg_idf_tokens.append(tok)
1✔
269

270
            eps = epsilon * sum_idf / len(self._freq_vocab_for_idf)
1✔
271
            for tok in neg_idf_tokens:
1✔
272
                idf[tok] = eps
1✔
273
            return {tok: idf.get(tok, 0.0) for tok in tokens}
1✔
274

275
        def _compute_tf(token: str, freq: Dict[str, int], doc_len: int) -> float:
1✔
276
            """Per-token BM25L computation."""
277
            freq_term = freq.get(token, 0.0)
1✔
278
            freq_norm = freq_term + k * (1 - b + b * doc_len / self._avg_doc_len)
1✔
279
            return freq_term * (1.0 + k) / freq_norm
1✔
280

281
        idf = _compute_idf(self._tokenize_bm25(query))
1✔
282
        bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents}
1✔
283

284
        ret = []
1✔
285
        for doc in documents:
1✔
286
            doc_stats = bm25_attr[doc.id]
1✔
287
            freq = doc_stats.freq_token
1✔
288
            doc_len = doc_stats.doc_len
1✔
289

290
            score = 0.0
1✔
291
            for tok in idf.keys():
1✔
292
                score += idf[tok] * _compute_tf(tok, freq, doc_len)
1✔
293
            ret.append((doc, score))
1✔
294

295
        return ret
1✔
296

297
    def _score_bm25plus(self, query: str, documents: List[Document]) -> List[Tuple[Document, float]]:
1✔
298
        """
299
        Calculate BM25+ scores for the given query and filtered documents.
300

301
        This implementation follows the document on BM25 Wikipedia page,
302
        which add 1 (smoothing factor) to document frequency when computing IDF.
303

304
        :param query:
305
            The query string.
306
        :param documents:
307
            The list of documents to score, should be produced by
308
            the filter_documents method; may be an empty list.
309
        :returns:
310
            A list of tuples, each containing a Document and its BM25+ score.
311
        """
312
        k = self.bm25_parameters.get("k1", 1.5)
1✔
313
        b = self.bm25_parameters.get("b", 0.75)
1✔
314
        delta = self.bm25_parameters.get("delta", 1.0)
1✔
315

316
        def _compute_idf(tokens: List[str]) -> Dict[str, float]:
1✔
317
            """Per-token IDF computation."""
318
            idf = {}
1✔
319
            n_corpus = len(self._bm25_attr)
1✔
320
            for tok in tokens:
1✔
321
                n = self._freq_vocab_for_idf.get(tok, 0)
1✔
322
                idf[tok] = math.log(1 + (n_corpus - n + 0.5) / (n + 0.5)) * int(n != 0)
1✔
323
            return idf
1✔
324

325
        def _compute_tf(token: str, freq: Dict[str, int], doc_len: float) -> float:
1✔
326
            """Per-token normalized term frequency."""
327
            freq_term = freq.get(token, 0.0)
1✔
328
            freq_damp = k * (1 - b + b * doc_len / self._avg_doc_len)
1✔
329
            return freq_term * (1.0 + k) / (freq_term + freq_damp) + delta
1✔
330

331
        idf = _compute_idf(self._tokenize_bm25(query))
1✔
332
        bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents}
1✔
333

334
        ret = []
1✔
335
        for doc in documents:
1✔
336
            doc_stats = bm25_attr[doc.id]
1✔
337
            freq = doc_stats.freq_token
1✔
338
            doc_len = doc_stats.doc_len
1✔
339

340
            score = 0.0
1✔
341
            for tok in idf.keys():  # pylint: disable=consider-using-dict-items
1✔
342
                score += idf[tok] * _compute_tf(tok, freq, doc_len)
1✔
343
            ret.append((doc, score))
1✔
344

345
        return ret
1✔
346

347
    def to_dict(self) -> Dict[str, Any]:
1✔
348
        """
349
        Serializes the component to a dictionary.
350

351
        :returns:
352
            Dictionary with serialized data.
353
        """
354
        return default_to_dict(
1✔
355
            self,
356
            bm25_tokenization_regex=self.bm25_tokenization_regex,
357
            bm25_algorithm=self.bm25_algorithm,
358
            bm25_parameters=self.bm25_parameters,
359
            embedding_similarity_function=self.embedding_similarity_function,
360
            index=self.index,
361
            return_embedding=self.return_embedding,
362
        )
363

364
    @classmethod
1✔
365
    def from_dict(cls, data: Dict[str, Any]) -> "InMemoryDocumentStore":
1✔
366
        """
367
        Deserializes the component from a dictionary.
368

369
        :param data:
370
            The dictionary to deserialize from.
371
        :returns:
372
            The deserialized component.
373
        """
374
        return default_from_dict(cls, data)
1✔
375

376
    def save_to_disk(self, path: str) -> None:
1✔
377
        """
378
        Write the database and its' data to disk as a JSON file.
379

380
        :param path: The path to the JSON file.
381
        """
382
        data: Dict[str, Any] = self.to_dict()
1✔
383
        data["documents"] = [doc.to_dict(flatten=False) for doc in self.storage.values()]
1✔
384
        with open(path, "w") as f:
1✔
385
            json.dump(data, f)
1✔
386

387
    @classmethod
1✔
388
    def load_from_disk(cls, path: str) -> "InMemoryDocumentStore":
1✔
389
        """
390
        Load the database and its' data from disk as a JSON file.
391

392
        :param path: The path to the JSON file.
393
        :returns: The loaded InMemoryDocumentStore.
394
        """
395
        if Path(path).exists():
1✔
396
            try:
1✔
397
                with open(path, "r") as f:
1✔
398
                    data = json.load(f)
1✔
399
            except Exception as e:
×
400
                raise Exception(f"Error loading InMemoryDocumentStore from disk. error: {e}")
×
401

402
            documents = data.pop("documents")
1✔
403
            cls_object = default_from_dict(cls, data)
1✔
404
            cls_object.write_documents(
1✔
405
                documents=[Document(**doc) for doc in documents], policy=DuplicatePolicy.OVERWRITE
406
            )
407
            return cls_object
1✔
408

409
        else:
410
            raise FileNotFoundError(f"File {path} not found.")
×
411

412
    def count_documents(self) -> int:
1✔
413
        """
414
        Returns the number of how many documents are present in the DocumentStore.
415
        """
416
        return len(self.storage.keys())
1✔
417

418
    def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
1✔
419
        """
420
        Returns the documents that match the filters provided.
421

422
        For a detailed specification of the filters, refer to the DocumentStore.filter_documents() protocol
423
        documentation.
424

425
        :param filters: The filters to apply to the document list.
426
        :returns: A list of Documents that match the given filters.
427
        """
428
        if filters:
1✔
429
            if "operator" not in filters and "conditions" not in filters:
1✔
430
                raise ValueError(
×
431
                    "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
432
                )
433
            docs = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)]
1✔
434
        else:
435
            docs = list(self.storage.values())
1✔
436

437
        if not self.return_embedding:
1✔
438
            for doc in docs:
1✔
439
                doc.embedding = None
1✔
440

441
        return docs
1✔
442

443
    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
1✔
444
        """
445
        Refer to the DocumentStore.write_documents() protocol documentation.
446

447
        If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
448
        """
449
        if (
1✔
450
            not isinstance(documents, Iterable)
451
            or isinstance(documents, str)
452
            or any(not isinstance(doc, Document) for doc in documents)
453
        ):
454
            raise ValueError("Please provide a list of Documents.")
1✔
455

456
        if policy == DuplicatePolicy.NONE:
1✔
457
            policy = DuplicatePolicy.FAIL
1✔
458

459
        written_documents = len(documents)
1✔
460
        for document in documents:
1✔
461
            if policy != DuplicatePolicy.OVERWRITE and document.id in self.storage.keys():
1✔
462
                if policy == DuplicatePolicy.FAIL:
1✔
463
                    raise DuplicateDocumentError(f"ID '{document.id}' already exists.")
1✔
464
                if policy == DuplicatePolicy.SKIP:
1✔
465
                    logger.warning("ID '{document_id}' already exists", document_id=document.id)
1✔
466
                    written_documents -= 1
1✔
467
                    continue
1✔
468

469
            # Since the statistics are updated in an incremental manner,
470
            # we need to explicitly remove the existing document to revert
471
            # the statistics before updating them with the new document.
472
            if document.id in self.storage.keys():
1✔
473
                self.delete_documents([document.id])
1✔
474

475
            tokens = []
1✔
476
            if document.content is not None:
1✔
477
                tokens = self._tokenize_bm25(document.content)
1✔
478

479
            self.storage[document.id] = document
1✔
480

481
            self._bm25_attr[document.id] = BM25DocumentStats(Counter(tokens), len(tokens))
1✔
482
            self._freq_vocab_for_idf.update(set(tokens))
1✔
483
            self._avg_doc_len = (len(tokens) + self._avg_doc_len * len(self._bm25_attr)) / (len(self._bm25_attr) + 1)
1✔
484
        return written_documents
1✔
485

486
    def delete_documents(self, document_ids: List[str]) -> None:
1✔
487
        """
488
        Deletes all documents with matching document_ids from the DocumentStore.
489

490
        :param document_ids: The object_ids to delete.
491
        """
492
        for doc_id in document_ids:
1✔
493
            if doc_id not in self.storage.keys():
1✔
494
                continue
1✔
495
            del self.storage[doc_id]
1✔
496

497
            # Update statistics accordingly
498
            doc_stats = self._bm25_attr.pop(doc_id)
1✔
499
            freq = doc_stats.freq_token
1✔
500
            doc_len = doc_stats.doc_len
1✔
501

502
            self._freq_vocab_for_idf.subtract(Counter(freq.keys()))
1✔
503
            try:
1✔
504
                self._avg_doc_len = (self._avg_doc_len * (len(self._bm25_attr) + 1) - doc_len) / len(self._bm25_attr)
1✔
505
            except ZeroDivisionError:
1✔
506
                self._avg_doc_len = 0
1✔
507

508
    def bm25_retrieval(
1✔
509
        self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = False
510
    ) -> List[Document]:
511
        """
512
        Retrieves documents that are most relevant to the query using BM25 algorithm.
513

514
        :param query: The query string.
515
        :param filters: A dictionary with filters to narrow down the search space.
516
        :param top_k: The number of top documents to retrieve. Default is 10.
517
        :param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
518
        :returns: A list of the top_k documents most relevant to the query.
519
        """
520
        if not query:
1✔
521
            raise ValueError("Query should be a non-empty string")
1✔
522

523
        content_type_filter = {"field": "content", "operator": "!=", "value": None}
1✔
524
        if filters:
1✔
525
            if "operator" not in filters:
×
526
                raise ValueError(
×
527
                    "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
528
                )
529
            filters = {"operator": "AND", "conditions": [content_type_filter, filters]}
×
530
        else:
531
            filters = content_type_filter
1✔
532

533
        all_documents = self.filter_documents(filters=filters)
1✔
534
        if len(all_documents) == 0:
1✔
535
            logger.info("No documents found for BM25 retrieval. Returning empty list.")
1✔
536
            return []
1✔
537

538
        results = sorted(self.bm25_algorithm_inst(query, all_documents), key=lambda x: x[1], reverse=True)[:top_k]
1✔
539

540
        # BM25Okapi can return meaningful negative values, so they should not be filtered out when scale_score is False.
541
        # It's the only algorithm supported by rank_bm25 at the time of writing (2024) that can return negative scores.
542
        # see https://github.com/deepset-ai/haystack/pull/6889 for more context.
543
        negatives_are_valid = self.bm25_algorithm == "BM25Okapi" and not scale_score
1✔
544

545
        # Create documents with the BM25 score to return them
546
        return_documents = []
1✔
547
        for doc, score in results:
1✔
548
            if scale_score:
1✔
549
                score = expit(score / BM25_SCALING_FACTOR)
1✔
550

551
            if not negatives_are_valid and score <= 0.0:
1✔
552
                continue
1✔
553

554
            doc_fields = doc.to_dict()
1✔
555
            doc_fields["score"] = score
1✔
556

557
            if not self.return_embedding and "embedding" in doc_fields:
1✔
558
                doc_fields.pop("embedding")
1✔
559

560
            return_document = Document.from_dict(doc_fields)
1✔
561

562
            return_documents.append(return_document)
1✔
563

564
        return return_documents
1✔
565

566
    def embedding_retrieval(  # pylint: disable=too-many-positional-arguments
1✔
567
        self,
568
        query_embedding: List[float],
569
        filters: Optional[Dict[str, Any]] = None,
570
        top_k: int = 10,
571
        scale_score: bool = False,
572
        return_embedding: Optional[bool] = False,
573
    ) -> List[Document]:
574
        """
575
        Retrieves documents that are most similar to the query embedding using a vector similarity metric.
576

577
        :param query_embedding: Embedding of the query.
578
        :param filters: A dictionary with filters to narrow down the search space.
579
        :param top_k: The number of top documents to retrieve. Default is 10.
580
        :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
581
        :param return_embedding: Whether to return the embedding of the retrieved Documents.
582
            If not provided, the value of the `return_embedding` parameter set at component
583
            initialization will be used. Default is False.
584
        :returns: A list of the top_k documents most relevant to the query.
585
        """
586
        if len(query_embedding) == 0 or not isinstance(query_embedding[0], float):
1✔
587
            raise ValueError("query_embedding should be a non-empty list of floats.")
1✔
588

589
        if filters:
1✔
590
            if "operator" not in filters and "conditions" not in filters:
×
591
                raise ValueError(
×
592
                    "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
593
                )
594
            all_documents = [
×
595
                doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)
596
            ]
597
        else:
598
            all_documents = list(self.storage.values())
1✔
599

600
        documents_with_embeddings = [doc for doc in all_documents if doc.embedding is not None]
1✔
601
        if len(documents_with_embeddings) == 0:
1✔
602
            logger.warning(
1✔
603
                "No Documents found with embeddings. Returning empty list. "
604
                "To generate embeddings, use a DocumentEmbedder."
605
            )
606
            return []
1✔
607
        elif len(documents_with_embeddings) < len(all_documents):
1✔
608
            logger.info(
1✔
609
                "Skipping some Documents that don't have an embedding. To generate embeddings, use a DocumentEmbedder."
610
            )
611

612
        scores = self._compute_query_embedding_similarity_scores(
1✔
613
            embedding=query_embedding, documents=documents_with_embeddings, scale_score=scale_score
614
        )
615

616
        resolved_return_embedding = self.return_embedding if return_embedding is None else return_embedding
1✔
617

618
        # create Documents with the similarity score for the top k results
619
        top_documents = []
1✔
620
        for doc, score in sorted(zip(documents_with_embeddings, scores), key=lambda x: x[1], reverse=True)[:top_k]:
1✔
621
            doc_fields = doc.to_dict()
1✔
622
            doc_fields["score"] = score
1✔
623
            if resolved_return_embedding is False:
1✔
624
                doc_fields["embedding"] = None
1✔
625
            top_documents.append(Document.from_dict(doc_fields))
1✔
626

627
        return top_documents
1✔
628

629
    def _compute_query_embedding_similarity_scores(
1✔
630
        self, embedding: List[float], documents: List[Document], scale_score: bool = False
631
    ) -> List[float]:
632
        """
633
        Computes the similarity scores between the query embedding and the embeddings of the documents.
634

635
        :param embedding: Embedding of the query.
636
        :param documents: A list of Documents.
637
        :param scale_score: Whether to scale the scores of the Documents. Default is False.
638
        :returns: A list of scores.
639
        """
640

641
        query_embedding = np.array(embedding)
1✔
642
        if query_embedding.ndim == 1:
1✔
643
            query_embedding = np.expand_dims(a=query_embedding, axis=0)
1✔
644

645
        try:
1✔
646
            document_embeddings = np.array([doc.embedding for doc in documents])
1✔
647
        except ValueError as e:
1✔
648
            if "inhomogeneous shape" in str(e):
1✔
649
                raise DocumentStoreError(
1✔
650
                    "The embedding size of all Documents should be the same. "
651
                    "Please make sure that the Documents have been embedded with the same model."
652
                ) from e
653
            raise e
×
654
        if document_embeddings.ndim == 1:
1✔
655
            document_embeddings = np.expand_dims(a=document_embeddings, axis=0)
×
656

657
        if self.embedding_similarity_function == "cosine":
1✔
658
            # cosine similarity is a normed dot product
659
            query_embedding /= np.linalg.norm(x=query_embedding, axis=1, keepdims=True)
1✔
660
            document_embeddings /= np.linalg.norm(x=document_embeddings, axis=1, keepdims=True)
1✔
661

662
        try:
1✔
663
            scores = np.dot(a=query_embedding, b=document_embeddings.T)[0].tolist()
1✔
664
        except ValueError as e:
1✔
665
            if "shapes" in str(e) and "not aligned" in str(e):
1✔
666
                raise DocumentStoreError(
1✔
667
                    "The embedding size of the query should be the same as the embedding size of the Documents. "
668
                    "Please make sure that the query has been embedded with the same model as the Documents."
669
                ) from e
670
            raise e
×
671

672
        if scale_score:
1✔
673
            if self.embedding_similarity_function == "dot_product":
1✔
674
                scores = [expit(float(score / DOT_PRODUCT_SCALING_FACTOR)) for score in scores]
1✔
675
            elif self.embedding_similarity_function == "cosine":
×
676
                scores = [(score + 1) / 2 for score in scores]
×
677

678
        return scores
1✔
679

680
    async def count_documents_async(self) -> int:
1✔
681
        """
682
        Returns the number of how many documents are present in the DocumentStore.
683
        """
684
        return len(self.storage.keys())
1✔
685

686
    async def filter_documents_async(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
1✔
687
        """
688
        Returns the documents that match the filters provided.
689

690
        For a detailed specification of the filters, refer to the DocumentStore.filter_documents() protocol
691
        documentation.
692

693
        :param filters: The filters to apply to the document list.
694
        :returns: A list of Documents that match the given filters.
695
        """
696
        return await asyncio.get_event_loop().run_in_executor(
1✔
697
            self.executor, lambda: self.filter_documents(filters=filters)
698
        )
699

700
    async def write_documents_async(
1✔
701
        self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
702
    ) -> int:
703
        """
704
        Refer to the DocumentStore.write_documents() protocol documentation.
705

706
        If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
707
        """
708
        return await asyncio.get_event_loop().run_in_executor(
1✔
709
            self.executor, lambda: self.write_documents(documents=documents, policy=policy)
710
        )
711

712
    async def delete_documents_async(self, document_ids: List[str]) -> None:
1✔
713
        """
714
        Deletes all documents with matching document_ids from the DocumentStore.
715

716
        :param document_ids: The object_ids to delete.
717
        """
718
        await asyncio.get_event_loop().run_in_executor(
1✔
719
            self.executor, lambda: self.delete_documents(document_ids=document_ids)
720
        )
721

722
    async def bm25_retrieval_async(
1✔
723
        self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = False
724
    ) -> List[Document]:
725
        """
726
        Retrieves documents that are most relevant to the query using BM25 algorithm.
727

728
        :param query: The query string.
729
        :param filters: A dictionary with filters to narrow down the search space.
730
        :param top_k: The number of top documents to retrieve. Default is 10.
731
        :param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
732
        :returns: A list of the top_k documents most relevant to the query.
733
        """
734
        return await asyncio.get_event_loop().run_in_executor(
1✔
735
            self.executor,
736
            lambda: self.bm25_retrieval(query=query, filters=filters, top_k=top_k, scale_score=scale_score),
737
        )
738

739
    async def embedding_retrieval_async(  # pylint: disable=too-many-positional-arguments
1✔
740
        self,
741
        query_embedding: List[float],
742
        filters: Optional[Dict[str, Any]] = None,
743
        top_k: int = 10,
744
        scale_score: bool = False,
745
        return_embedding: bool = False,
746
    ) -> List[Document]:
747
        """
748
        Retrieves documents that are most similar to the query embedding using a vector similarity metric.
749

750
        :param query_embedding: Embedding of the query.
751
        :param filters: A dictionary with filters to narrow down the search space.
752
        :param top_k: The number of top documents to retrieve. Default is 10.
753
        :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
754
        :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False.
755
        :returns: A list of the top_k documents most relevant to the query.
756
        """
757
        return await asyncio.get_event_loop().run_in_executor(
1✔
758
            self.executor,
759
            lambda: self.embedding_retrieval(
760
                query_embedding=query_embedding,
761
                filters=filters,
762
                top_k=top_k,
763
                scale_score=scale_score,
764
                return_embedding=return_embedding,
765
            ),
766
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc