• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deepset-ai / haystack / 8096865523

29 Feb 2024 01:31PM UTC coverage: 89.905% (-0.2%) from 90.144%
8096865523

push

github

web-flow
chore: enforce kwarg logging (#7207)

* chore: add logger which eases logging of extras

* chore: start migrating to key value

* fix: import fixes

* tests: temporarily comment out breaking test

* refactor: move to kwarg based logging

* style: fix import order

* chore: implement self-review comments

* test: drop failing test

* chore: fix more import orders

* docs: add changelog

* tests: fix broken tests

* chore: fix getting the frames

* chore: add comment

* chore: cleanup

* chore: adapt remaining `%s` usages

5281 of 5874 relevant lines covered (89.9%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.36
haystack/document_stores/in_memory/document_store.py
1
import re
1✔
2
from typing import Any, Dict, Iterable, List, Literal, Optional
1✔
3

4
import numpy as np
1✔
5
from haystack_bm25 import rank_bm25
1✔
6
from tqdm.auto import tqdm
1✔
7

8
from haystack import default_from_dict, default_to_dict, logging
1✔
9
from haystack.dataclasses import Document
1✔
10
from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError
1✔
11
from haystack.document_stores.types import DuplicatePolicy
1✔
12
from haystack.utils import expit
1✔
13
from haystack.utils.filters import convert, document_matches_filter
1✔
14

15
logger = logging.getLogger(__name__)
1✔
16

17
# document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
18
# True (default). Scaling uses the expit function (inverse of the logit function) after applying a scaling factor
19
# (e.g., BM25_SCALING_FACTOR for the bm25_retrieval method).
20
# Larger scaling factor decreases scaled scores. For example, an input of 10 is scaled to 0.99 with BM25_SCALING_FACTOR=2
21
# but to 0.78 with BM25_SCALING_FACTOR=8 (default). The defaults were chosen empirically. Increase the default if most
22
# unscaled scores are larger than expected (>30) and otherwise would incorrectly all be mapped to scores ~1.
23
BM25_SCALING_FACTOR = 8
1✔
24
DOT_PRODUCT_SCALING_FACTOR = 100
1✔
25

26

27
class InMemoryDocumentStore:
1✔
28
    """
29
    Stores data in-memory. It's ephemeral and cannot be saved to disk.
30
    """
31

32
    def __init__(
1✔
33
        self,
34
        bm25_tokenization_regex: str = r"(?u)\b\w\w+\b",
35
        bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25L",
36
        bm25_parameters: Optional[Dict] = None,
37
        embedding_similarity_function: Literal["dot_product", "cosine"] = "dot_product",
38
    ):
39
        """
40
        Initializes the DocumentStore.
41

42
        :param bm25_tokenization_regex: The regular expression used to tokenize the text for BM25 retrieval.
43
        :param bm25_algorithm: The BM25 algorithm to use. One of "BM25Okapi", "BM25L", or "BM25Plus".
44
        :param bm25_parameters: Parameters for BM25 implementation in a dictionary format.
45
                                For example: {'k1':1.5, 'b':0.75, 'epsilon':0.25}
46
                                You can learn more about these parameters by visiting https://github.com/dorianbrown/rank_bm25.
47
                                By default, no parameters are set.
48
        :param embedding_similarity_function: The similarity function used to compare Documents embeddings.
49
                                              One of "dot_product" (default) or "cosine".
50
                                              To choose the most appropriate function, look for information about your embedding model.
51
        """
52
        self.storage: Dict[str, Document] = {}
1✔
53
        self._bm25_tokenization_regex = bm25_tokenization_regex
1✔
54
        self.tokenizer = re.compile(bm25_tokenization_regex).findall
1✔
55
        algorithm_class = getattr(rank_bm25, bm25_algorithm)
1✔
56
        if algorithm_class is None:
1✔
57
            raise ValueError(f"BM25 algorithm '{bm25_algorithm}' not found.")
×
58
        self.bm25_algorithm = algorithm_class
1✔
59
        self.bm25_parameters = bm25_parameters or {}
1✔
60
        self.embedding_similarity_function = embedding_similarity_function
1✔
61

62
    def to_dict(self) -> Dict[str, Any]:
1✔
63
        """
64
        Serializes the component to a dictionary.
65

66
        :returns:
67
            Dictionary with serialized data.
68
        """
69
        return default_to_dict(
1✔
70
            self,
71
            bm25_tokenization_regex=self._bm25_tokenization_regex,
72
            bm25_algorithm=self.bm25_algorithm.__name__,
73
            bm25_parameters=self.bm25_parameters,
74
            embedding_similarity_function=self.embedding_similarity_function,
75
        )
76

77
    @classmethod
1✔
78
    def from_dict(cls, data: Dict[str, Any]) -> "InMemoryDocumentStore":
1✔
79
        """
80
        Deserializes the component from a dictionary.
81

82
        :param data:
83
            The dictionary to deserialize from.
84
        :returns:
85
            The deserialized component.
86
        """
87
        return default_from_dict(cls, data)
1✔
88

89
    def count_documents(self) -> int:
1✔
90
        """
91
        Returns the number of how many documents are present in the DocumentStore.
92
        """
93
        return len(self.storage.keys())
1✔
94

95
    def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
1✔
96
        """
97
        Returns the documents that match the filters provided.
98

99
        For a detailed specification of the filters, refer to the DocumentStore.filter_documents() protocol documentation.
100

101
        :param filters: The filters to apply to the document list.
102
        :returns: A list of Documents that match the given filters.
103
        """
104
        if filters:
1✔
105
            if "operator" not in filters and "conditions" not in filters:
1✔
106
                filters = convert(filters)
1✔
107
            return [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)]
1✔
108
        return list(self.storage.values())
1✔
109

110
    def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
1✔
111
        """
112
        Refer to the DocumentStore.write_documents() protocol documentation.
113

114
        If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
115
        """
116
        if (
1✔
117
            not isinstance(documents, Iterable)
118
            or isinstance(documents, str)
119
            or any(not isinstance(doc, Document) for doc in documents)
120
        ):
121
            raise ValueError("Please provide a list of Documents.")
1✔
122

123
        if policy == DuplicatePolicy.NONE:
1✔
124
            policy = DuplicatePolicy.FAIL
1✔
125

126
        written_documents = len(documents)
1✔
127
        for document in documents:
1✔
128
            if policy != DuplicatePolicy.OVERWRITE and document.id in self.storage.keys():
1✔
129
                if policy == DuplicatePolicy.FAIL:
1✔
130
                    raise DuplicateDocumentError(f"ID '{document.id}' already exists.")
1✔
131
                if policy == DuplicatePolicy.SKIP:
1✔
132
                    logger.warning("ID '{document_id}' already exists", document_id=document.id)
1✔
133
                    written_documents -= 1
1✔
134
                    continue
1✔
135
            self.storage[document.id] = document
1✔
136
        return written_documents
1✔
137

138
    def delete_documents(self, document_ids: List[str]) -> None:
1✔
139
        """
140
        Deletes all documents with matching document_ids from the DocumentStore.
141
        :param document_ids: The object_ids to delete.
142
        """
143
        for doc_id in document_ids:
1✔
144
            if doc_id not in self.storage.keys():
1✔
145
                continue
1✔
146
            del self.storage[doc_id]
1✔
147

148
    def bm25_retrieval(
1✔
149
        self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = False
150
    ) -> List[Document]:
151
        """
152
        Retrieves documents that are most relevant to the query using BM25 algorithm.
153

154
        :param query: The query string.
155
        :param filters: A dictionary with filters to narrow down the search space.
156
        :param top_k: The number of top documents to retrieve. Default is 10.
157
        :param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
158
        :returns: A list of the top_k documents most relevant to the query.
159
        """
160
        if not query:
1✔
161
            raise ValueError("Query should be a non-empty string")
1✔
162

163
        content_type_filter = {
1✔
164
            "operator": "OR",
165
            "conditions": [
166
                {"field": "content", "operator": "!=", "value": None},
167
                {"field": "dataframe", "operator": "!=", "value": None},
168
            ],
169
        }
170
        if filters:
1✔
171
            if "operator" not in filters:
1✔
172
                filters = convert(filters)
1✔
173
            filters = {"operator": "AND", "conditions": [content_type_filter, filters]}
1✔
174
        else:
175
            filters = content_type_filter
1✔
176
        all_documents = self.filter_documents(filters=filters)
1✔
177

178
        # Lowercase all documents
179
        lower_case_documents = []
1✔
180
        for doc in all_documents:
1✔
181
            if doc.content is None and doc.dataframe is None:
1✔
182
                logger.info(
×
183
                    "Document '{document_id}' has no text or dataframe content. Skipping it.", document_id=doc.id
184
                )
185
            else:
186
                if doc.content is not None:
1✔
187
                    lower_case_documents.append(doc.content.lower())
1✔
188
                    if doc.dataframe is not None:
1✔
189
                        logger.warning(
1✔
190
                            "Document '{document_id}' has both text and dataframe content. "
191
                            "Using text content and skipping dataframe content.",
192
                            document_id=doc.id,
193
                        )
194
                        continue
1✔
195
                if doc.dataframe is not None:
1✔
196
                    str_content = doc.dataframe.astype(str)
1✔
197
                    csv_content = str_content.to_csv(index=False)
1✔
198
                    lower_case_documents.append(csv_content.lower())
1✔
199

200
        # Tokenize the entire content of the DocumentStore
201
        tokenized_corpus = [
1✔
202
            self.tokenizer(doc) for doc in tqdm(lower_case_documents, unit=" docs", desc="Ranking by BM25...")
203
        ]
204
        if len(tokenized_corpus) == 0:
1✔
205
            logger.info("No documents found for BM25 retrieval. Returning empty list.")
1✔
206
            return []
1✔
207

208
        # initialize BM25
209
        bm25_scorer = self.bm25_algorithm(tokenized_corpus, **self.bm25_parameters)
1✔
210
        # tokenize query
211
        tokenized_query = self.tokenizer(query.lower())
1✔
212
        # get scores for the query against the corpus
213
        docs_scores = bm25_scorer.get_scores(tokenized_query)
1✔
214
        if scale_score:
1✔
215
            docs_scores = [expit(float(score / BM25_SCALING_FACTOR)) for score in docs_scores]
1✔
216
        # get the last top_k indexes and reverse them
217
        top_docs_positions = np.argsort(docs_scores)[-top_k:][::-1]
1✔
218

219
        # BM25Okapi can return meaningful negative values, so they should not be filtered out when scale_score is False.
220
        # It's the only algorithm supported by rank_bm25 at the time of writing (2024) that can return negative scores.
221
        # see https://github.com/deepset-ai/haystack/pull/6889 for more context.
222
        negatives_are_valid = self.bm25_algorithm is rank_bm25.BM25Okapi and not scale_score
1✔
223

224
        # Create documents with the BM25 score to return them
225
        return_documents = []
1✔
226
        for i in top_docs_positions:
1✔
227
            doc = all_documents[i]
1✔
228
            score = docs_scores[i]
1✔
229
            if not negatives_are_valid and score <= 0.0:
1✔
230
                continue
1✔
231
            doc_fields = doc.to_dict()
1✔
232
            doc_fields["score"] = score
1✔
233
            return_document = Document.from_dict(doc_fields)
1✔
234
            return_documents.append(return_document)
1✔
235
        return return_documents
1✔
236

237
    def embedding_retrieval(
1✔
238
        self,
239
        query_embedding: List[float],
240
        filters: Optional[Dict[str, Any]] = None,
241
        top_k: int = 10,
242
        scale_score: bool = False,
243
        return_embedding: bool = False,
244
    ) -> List[Document]:
245
        """
246
        Retrieves documents that are most similar to the query embedding using a vector similarity metric.
247

248
        :param query_embedding: Embedding of the query.
249
        :param filters: A dictionary with filters to narrow down the search space.
250
        :param top_k: The number of top documents to retrieve. Default is 10.
251
        :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
252
        :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False.
253
        :returns: A list of the top_k documents most relevant to the query.
254
        """
255
        if len(query_embedding) == 0 or not isinstance(query_embedding[0], float):
1✔
256
            raise ValueError("query_embedding should be a non-empty list of floats.")
1✔
257

258
        filters = filters or {}
1✔
259
        all_documents = self.filter_documents(filters=filters)
1✔
260

261
        documents_with_embeddings = [doc for doc in all_documents if doc.embedding is not None]
1✔
262
        if len(documents_with_embeddings) == 0:
1✔
263
            logger.warning(
1✔
264
                "No Documents found with embeddings. Returning empty list. "
265
                "To generate embeddings, use a DocumentEmbedder."
266
            )
267
            return []
1✔
268
        elif len(documents_with_embeddings) < len(all_documents):
1✔
269
            logger.info(
1✔
270
                "Skipping some Documents that don't have an embedding. "
271
                "To generate embeddings, use a DocumentEmbedder."
272
            )
273

274
        scores = self._compute_query_embedding_similarity_scores(
1✔
275
            embedding=query_embedding, documents=documents_with_embeddings, scale_score=scale_score
276
        )
277

278
        # create Documents with the similarity score for the top k results
279
        top_documents = []
1✔
280
        for doc, score in sorted(zip(documents_with_embeddings, scores), key=lambda x: x[1], reverse=True)[:top_k]:
1✔
281
            doc_fields = doc.to_dict()
1✔
282
            doc_fields["score"] = score
1✔
283
            if return_embedding is False:
1✔
284
                doc_fields["embedding"] = None
1✔
285
            top_documents.append(Document.from_dict(doc_fields))
1✔
286

287
        return top_documents
1✔
288

289
    def _compute_query_embedding_similarity_scores(
1✔
290
        self, embedding: List[float], documents: List[Document], scale_score: bool = False
291
    ) -> List[float]:
292
        """
293
        Computes the similarity scores between the query embedding and the embeddings of the documents.
294

295
        :param embedding: Embedding of the query.
296
        :param documents: A list of Documents.
297
        :param scale_score: Whether to scale the scores of the Documents. Default is False.
298
        :returns: A list of scores.
299
        """
300

301
        query_embedding = np.array(embedding)
1✔
302
        if query_embedding.ndim == 1:
1✔
303
            query_embedding = np.expand_dims(a=query_embedding, axis=0)
1✔
304

305
        try:
1✔
306
            document_embeddings = np.array([doc.embedding for doc in documents])
1✔
307
        except ValueError as e:
1✔
308
            if "inhomogeneous shape" in str(e):
1✔
309
                raise DocumentStoreError(
1✔
310
                    "The embedding size of all Documents should be the same. "
311
                    "Please make sure that the Documents have been embedded with the same model."
312
                ) from e
313
            raise e
×
314
        if document_embeddings.ndim == 1:
1✔
315
            document_embeddings = np.expand_dims(a=document_embeddings, axis=0)
×
316

317
        if self.embedding_similarity_function == "cosine":
1✔
318
            # cosine similarity is a normed dot product
319
            query_embedding /= np.linalg.norm(x=query_embedding, axis=1, keepdims=True)
1✔
320
            document_embeddings /= np.linalg.norm(x=document_embeddings, axis=1, keepdims=True)
1✔
321

322
        try:
1✔
323
            scores = np.dot(a=query_embedding, b=document_embeddings.T)[0].tolist()
1✔
324
        except ValueError as e:
1✔
325
            if "shapes" in str(e) and "not aligned" in str(e):
1✔
326
                raise DocumentStoreError(
1✔
327
                    "The embedding size of the query should be the same as the embedding size of the Documents. "
328
                    "Please make sure that the query has been embedded with the same model as the Documents."
329
                ) from e
330
            raise e
×
331

332
        if scale_score:
1✔
333
            if self.embedding_similarity_function == "dot_product":
1✔
334
                scores = [expit(float(score / DOT_PRODUCT_SCALING_FACTOR)) for score in scores]
1✔
335
            elif self.embedding_similarity_function == "cosine":
×
336
                scores = [(score + 1) / 2 for score in scores]
×
337

338
        return scores
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc