• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PyThaiNLP / pythainlp / 11625814262

01 Nov 2024 07:14AM UTC coverage: 20.782% (+20.8%) from 0.0%
11625814262

Pull #952

github

web-flow
Merge c8385dcae into 515fe7ced
Pull Request #952: Specify a limited test suite

45 of 80 new or added lines in 48 files covered. (56.25%)

1537 of 7396 relevant lines covered (20.78%)

0.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pythainlp/summarize/keybert.py
1
# -*- coding: utf-8 -*-
2
# SPDX-FileCopyrightText: 2016-2024 PyThaiNLP Project
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
Minimal re-implementation of KeyBERT.
6

7
KeyBERT is a minimal and easy-to-use keyword extraction technique
8
that leverages BERT embeddings to create keywords and keyphrases
9
that are most similar to a document.
10

11
https://github.com/MaartenGr/KeyBERT
12
"""
13
from collections import Counter
×
NEW
14
from typing import Iterable, List, Optional, Tuple, Union
×
15

16
import numpy as np
×
17
from transformers import pipeline
×
18

19
from pythainlp.corpus import thai_stopwords
×
20
from pythainlp.tokenize import word_tokenize
×
21

22

23
class KeyBERT:
×
24
    def __init__(
×
25
        self, model_name: str = "airesearch/wangchanberta-base-att-spm-uncased"
26
    ):
27
        self.ft_pipeline = pipeline(
×
28
            "feature-extraction",
29
            tokenizer=model_name,
30
            model=model_name,
31
            revision="main",
32
        )
33

34
    def extract_keywords(
×
35
        self,
36
        text: str,
37
        keyphrase_ngram_range: Tuple[int, int] = (1, 2),
38
        max_keywords: int = 5,
39
        min_df: int = 1,
40
        tokenizer: str = "newmm",
41
        return_similarity=False,
42
        stop_words: Optional[Iterable[str]] = None,
43
    ) -> Union[List[str], List[Tuple[str, float]]]:
44
        """
45
        Extract Thai keywords and/or keyphrases with KeyBERT algorithm.
46
        See https://github.com/MaartenGr/KeyBERT.
47

48
        :param str text: text to be summarized
49
        :param Tuple[int, int] keyphrase_ngram_range: Number of token units to be defined as keyword.
50
                                The token unit varies w.r.t. `tokenizer_engine`.
51
                                For instance, (1, 1) means each token (unigram) can be a keyword (e.g. "เสา", "ไฟฟ้า"),
52
                                (1, 2) means one and two consecutive tokens (unigram and bigram) can be keywords
53
                                (e.g. "เสา", "ไฟฟ้า", "เสาไฟฟ้า")  (default: (1, 2))
54
        :param int max_keywords: Number of maximum keywords to be returned. (default: 5)
55
        :param int min_df: Minimum frequency required to be a keyword. (default: 1)
56
        :param str tokenizer: Name of tokenizer engine to use.
57
                                Refer to options in :func: `pythainlp.tokenize.word_tokenizer() (default: 'newmm')
58
        :param bool return_similarity: If `True`, return keyword scores. (default: False)
59
        :param Optional[Iterable[str]] stop_words: A list of stop words (a.k.a words to be ignored).
60
                                If not specified, :func:`pythainlp.corpus.thai_stopwords` is used. (default: None)
61

62
        :return: list of keywords with score
63

64
        :Example:
65
        ::
66

67
            from pythainlp.summarize.keybert import KeyBERT
68

69
            text = '''
70
                อาหาร หมายถึง ของแข็งหรือของเหลว
71
                ที่กินหรือดื่มเข้าสู่ร่างกายแล้ว
72
                จะทำให้เกิดพลังงานและความร้อนแก่ร่างกาย
73
                ทำให้ร่างกายเจริญเติบโต
74
                ซ่อมแซมส่วนที่สึกหรอ ควบคุมการเปลี่ยนแปลงต่างๆ ในร่างกาย
75
                ช่วยทำให้อวัยวะต่างๆ ทำงานได้อย่างปกติ
76
                อาหารจะต้องไม่มีพิษและไม่เกิดโทษต่อร่างกาย
77
            '''
78

79
            kb = KeyBERT()
80

81
            keywords = kb.extract_keyword(text)
82

83
            # output: ['อวัยวะต่างๆ',
84
            # 'ซ่อมแซมส่วน',
85
            # 'เจริญเติบโต',
86
            # 'ควบคุมการเปลี่ยนแปลง',
87
            # 'มีพิษ']
88

89
            keywords = kb.extract_keyword(text, max_keywords=10, return_similarity=True)
90

91
            # output: [('อวัยวะต่างๆ', 0.3228477063109462),
92
            # ('ซ่อมแซมส่วน', 0.31320597838000375),
93
            # ('เจริญเติบโต', 0.29115434699705506),
94
            # ('ควบคุมการเปลี่ยนแปลง', 0.2678430841321016),
95
            # ('มีพิษ', 0.24996827960821494),
96
            # ('ทำให้ร่างกาย', 0.23876962942443258),
97
            # ('ร่างกายเจริญเติบโต', 0.23191285218852364),
98
            # ('จะทำให้เกิด', 0.22425422716846247),
99
            # ('มีพิษและ', 0.22162962875299588),
100
            # ('เกิดโทษ', 0.20773497763458507)]
101

102
        """
103
        try:
×
104
            text = text.strip()
×
105
        except AttributeError:
×
106
            raise AttributeError(
×
107
                f"Unable to process data of type {type(text)}. "
108
                f"Please provide input of string type."
109
            )
110

111
        if not text:
×
112
            return []
×
113

114
        # generate all lists of keywords / keyphrases
115
        stop_words_ = stop_words if stop_words else thai_stopwords()
×
116
        kw_candidates = _generate_ngrams(
×
117
            text, keyphrase_ngram_range, min_df, tokenizer, stop_words_
118
        )
119

120
        # create document and word vectors
121
        doc_vector = self.embed(text)
×
122
        kw_vectors = self.embed(kw_candidates)
×
123

124
        # rank keywords
125
        keywords = _rank_keywords(
×
126
            doc_vector, kw_vectors, kw_candidates, max_keywords
127
        )
128

129
        if return_similarity:
×
130
            return keywords
×
131
        else:
132
            return [kw for kw, _ in keywords]
×
133

134
    def embed(self, docs: Union[str, List[str]]) -> np.ndarray:
×
135
        """
136
        Create an embedding of each input in `docs` by averaging vectors from the last hidden layer.
137
        """
138
        embs = self.ft_pipeline(docs)
×
139
        if isinstance(docs, str) or len(docs) == 1:
×
140
            # embed doc. return shape = [1, hidden_size]
141
            emb_mean = np.array(embs).mean(axis=1)
×
142
        else:
143
            # mean of embedding of each word
144
            # return shape = [len(docs), hidden_size]
145
            emb_mean = np.stack(
×
146
                [np.array(emb[0]).mean(axis=0) for emb in embs]
147
            )
148

149
        return emb_mean
×
150

151

152
def _generate_ngrams(
×
153
    doc: str,
154
    keyphrase_ngram_range: Tuple[int, int],
155
    min_df: int,
156
    tokenizer_engine: str,
157
    stop_words: Iterable[str],
158
) -> List[str]:
159
    assert keyphrase_ngram_range[0] >= 1, (
×
160
        f"`keyphrase_ngram_range` must start from 1. "
161
        f"current value={keyphrase_ngram_range}."
162
    )
163

164
    assert keyphrase_ngram_range[0] <= keyphrase_ngram_range[1], (
×
165
        f"The value first argument of `keyphrase_ngram_range` must not exceed the second. "
166
        f"current value={keyphrase_ngram_range}."
167
    )
168

169
    def _join_ngram(ngrams: List[Tuple[str, str]]) -> List[str]:
×
170
        ngrams_joined = []
×
171
        for ng in ngrams:
×
172
            joined = "".join(ng)
×
173
            if joined.strip() == joined:
×
174
                # ngram must not start or end with whitespace as this may cause duplication.
175
                ngrams_joined.append(joined)
×
176
        return ngrams_joined
×
177

178
    words = word_tokenize(doc, engine=tokenizer_engine)
×
179
    all_grams = []
×
180
    ngram_range = (keyphrase_ngram_range[0], keyphrase_ngram_range[1] + 1)
×
181
    for n in range(*ngram_range):
×
182
        if n == 1:
×
183
            # filter out space
184
            ngrams = [word for word in words if word.strip()]
×
185
        else:
186
            ngrams_tuple = zip(*[words[i:] for i in range(n)])
×
187
            ngrams = _join_ngram(ngrams_tuple)
×
188

189
        ngrams_cnt = Counter(ngrams)
×
190
        ngrams = [
×
191
            word
192
            for word, freq in ngrams_cnt.items()
193
            if (freq >= min_df) and (word not in stop_words)
194
        ]
195
        all_grams.extend(ngrams)
×
196

197
    return all_grams
×
198

199

200
def _rank_keywords(
×
201
    doc_vector: np.ndarray,
202
    word_vectors: np.ndarray,
203
    keywords: List[str],
204
    max_keywords: int,
205
) -> List[Tuple[str, float]]:
206
    def l2_norm(v: np.ndarray) -> np.ndarray:
×
207
        vec_size = v.shape[1]
×
208
        result = np.divide(
×
209
            v,
210
            np.linalg.norm(v, axis=1).reshape(-1, 1).repeat(vec_size, axis=1),
211
        )
212
        assert np.isclose(
×
213
            np.linalg.norm(result, axis=1), 1
214
        ).all(), "Cannot normalize a vector to unit vector."
215
        return result
×
216

217
    def cosine_sim(a: np.ndarray, b: np.ndarray) -> np.ndarray:
×
218
        return (np.matmul(a, b.T).T).sum(axis=1)
×
219

220
    doc_vector = l2_norm(doc_vector)
×
221
    word_vectors = l2_norm(word_vectors)
×
222
    cosine_sims = cosine_sim(doc_vector, word_vectors)
×
223
    ranking_desc = np.argsort(-cosine_sims)
×
224

225
    final_ranks = [
×
226
        (keywords[r], cosine_sims[r]) for r in ranking_desc[:max_keywords]
227
    ]
228
    return final_ranks
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc