• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PyThaiNLP / pythainlp / 11626163864

01 Nov 2024 07:49AM UTC coverage: 14.17% (+14.2%) from 0.0%
11626163864

Pull #952

github

web-flow
Merge 8f2551bc9 into 89ea62ebc
Pull Request #952: Specify a limited test suite

44 of 80 new or added lines in 48 files covered. (55.0%)

1048 of 7396 relevant lines covered (14.17%)

0.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pythainlp/tokenize/newmm.py
1
# -*- coding: utf-8 -*-
2
# SPDX-FileCopyrightText: 2016-2024 PyThaiNLP Project
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
Dictionary-based maximal matching word segmentation, constrained by
6
Thai Character Cluster (TCC) boundaries with improved rules.
7

8
The codes are based on the notebooks created by Korakot Chaovavanich,
9
with heuristic graph size limit added to avoid exponential waiting time.
10

11
:See Also:
12
    * \
13
        https://colab.research.google.com/notebook#fileId=1V1Z657_5eSWPo8rLfVRwA0A5E4vkg7SI
14
    * \
15
        https://colab.research.google.com/drive/14Ibg-ngZXj15RKwjNwoZlOT32fQBOrBx#scrollTo=MYZ7NzAR7Dmw
16
"""
17
import re
×
18
from collections import defaultdict
×
19
from heapq import heappop, heappush
×
20
from typing import Generator, List
×
21

22
from pythainlp.tokenize import DEFAULT_WORD_DICT_TRIE
×
23
from pythainlp.tokenize.tcc_p import tcc_pos
×
NEW
24
from pythainlp.util import Trie
×
25

26
# match non-Thai tokens
27
# `|` is used as like "early return",
28
# which divides "abc123" to "abc", "123" for example.
29
_PAT_NONTHAI = re.compile(
×
30
    r"""(?x)
31
[-a-zA-Z]+|        # Latin characters
32
\d+([,\.]\d+)*|    # numbers
33
[ \t]+|            # spaces
34
\r?\n|             # newlines
35
[^\u0E00-\u0E7F \t\r\n]+  # other non-Thai characters, and stops matching until space/newline
36
"""
37
)
38

39
# match 2-consonant Thai tokens
40
_PAT_THAI_TWOCHARS = re.compile("[ก-ฮ]{,2}$")
×
41

42

43
# maximum graph size before cutoff
44
_MAX_GRAPH_SIZE = 50
×
45

46
# window size for safe mode
47
_TEXT_SCAN_POINT = 120
×
48
_TEXT_SCAN_LEFT = 20
×
49
_TEXT_SCAN_RIGHT = 20
×
50
_TEXT_SCAN_BEGIN = _TEXT_SCAN_POINT - _TEXT_SCAN_LEFT
×
51
_TEXT_SCAN_END = _TEXT_SCAN_POINT + _TEXT_SCAN_RIGHT
×
52
del _TEXT_SCAN_POINT
×
53
del _TEXT_SCAN_LEFT
×
54
del _TEXT_SCAN_RIGHT
×
55

56

57
def _bfs_paths_graph(
×
58
    graph: defaultdict, start: int, goal: int
59
) -> Generator[List[int], None, None]:
60
    queue = [(start, [start])]
×
61
    while queue:
×
62
        (vertex, path) = queue.pop(0)
×
63
        for pos in graph[vertex]:
×
64
            if pos == goal:
×
65
                yield path + [pos]
×
66
            else:
67
                queue.append((pos, path + [pos]))
×
68

69

70
def _onecut(text: str, custom_dict: Trie) -> Generator[str, None, None]:
×
71
    # main data structure:
72
    # - key is beginning position (int)
73
    # - value is possible ending positions (List[int])
74
    # if key is not found, value is empty list
75
    graph = defaultdict(list)
×
76

77
    graph_size = 0  # keep track of graph size, if too big, force cutoff
×
78

79
    valid_poss = tcc_pos(text)  # breaking positions that are TCC-valid
×
80

81
    len_text = len(text)
×
82
    pos_list = [0]  # priority queue of possible breaking positions
×
83
    end_pos = 0
×
84
    while pos_list[0] < len_text:
×
85
        begin_pos = heappop(pos_list)
×
86
        for word in custom_dict.prefixes(text[begin_pos:]):
×
87
            end_pos_candidate = begin_pos + len(word)
×
88
            if end_pos_candidate in valid_poss:
×
89
                graph[begin_pos].append(end_pos_candidate)
×
90
                graph_size = graph_size + 1
×
91

92
                if end_pos_candidate not in pos_list:
×
93
                    heappush(pos_list, end_pos_candidate)
×
94

95
                if graph_size > _MAX_GRAPH_SIZE:
×
96
                    break
×
97

98
        len_pos_list = len(pos_list)
×
99
        if len_pos_list == 1:  # one candidate, no longer ambiguous
×
100
            end_pos_candidates = next(
×
101
                _bfs_paths_graph(graph, end_pos, pos_list[0])
102
            )
103
            graph_size = 0
×
104
            for pos in end_pos_candidates[1:]:
×
105
                yield text[end_pos:pos]
×
106
                end_pos = pos
×
107
        elif len_pos_list == 0:  # no candidate, deal with non-dictionary word
×
108
            m = _PAT_NONTHAI.match(text[begin_pos:])
×
109
            if m:  # non-Thai token, skip to the end
×
110
                end_pos = begin_pos + m.end()
×
111
            else:  # Thai token, find minimum skip
112
                for pos in range(begin_pos + 1, len_text):
×
113
                    if pos in valid_poss:
×
114
                        prefix = text[pos:]
×
115
                        words = [
×
116
                            word
117
                            for word in custom_dict.prefixes(prefix)
118
                            if (
119
                                (pos + len(word) in valid_poss)
120
                                and not _PAT_THAI_TWOCHARS.match(word)
121
                            )
122
                        ]
123
                        if words:  # is a Thai token that longer than 2 chars
×
124
                            end_pos = pos
×
125
                            break
×
126

127
                        # is a non-Thai token
128
                        if _PAT_NONTHAI.match(prefix):
×
129
                            end_pos = pos
×
130
                            break
×
131
                else:
132
                    end_pos = len_text
×
133

134
            graph[begin_pos].append(end_pos)
×
135
            graph_size = graph_size + 1
×
136
            yield text[begin_pos:end_pos]
×
137
            heappush(pos_list, end_pos)
×
138

139

140
def segment(
×
141
    text: str,
142
    custom_dict: Trie = DEFAULT_WORD_DICT_TRIE,
143
    safe_mode: bool = False,
144
) -> List[str]:
145
    """Maximal-matching word segmentation constrained by Thai Character Cluster.
146

147
    A dictionary-based word segmentation using maximal matching algorithm,
148
    constrained by Thai Character Cluster boundaries.
149

150
    A custom dictionary can be supplied.
151

152
    :param text: text to be tokenized
153
    :type text: str
154
    :param custom_dict: tokenization dictionary,\
155
        defaults to DEFAULT_WORD_DICT_TRIE
156
    :type custom_dict: Trie, optional
157
    :param safe_mode: reduce chance for long processing time for long text\
158
        with many ambiguous breaking points, defaults to False
159
    :type safe_mode: bool, optional
160
    :return: list of tokens
161
    :rtype: List[str]
162
    """
163
    if not text or not isinstance(text, str):
×
164
        return []
×
165

166
    if not custom_dict:
×
167
        custom_dict = DEFAULT_WORD_DICT_TRIE
×
168

169
    if not safe_mode or len(text) < _TEXT_SCAN_END:
×
170
        return list(_onecut(text, custom_dict))
×
171

172
    # if the text is longer than the limit,
173
    # break them into smaller chunks, then tokenize each chunk
174
    text_parts = []
×
175
    while len(text) >= _TEXT_SCAN_END:
×
176
        sample = text[_TEXT_SCAN_BEGIN:_TEXT_SCAN_END]
×
177

178
        # find possible breaking positions
179
        cut_pos = _TEXT_SCAN_END
×
180

181
        # try to break by space first
182
        space_idx = sample.rfind(" ")
×
183
        if space_idx >= 0:
×
184
            cut_pos = space_idx + 1
×
185
        else:
186
            tokens = list(_onecut(sample, custom_dict))
×
187
            token_max_idx = 0
×
188
            token_max_len = 0
×
189
            for i, token in enumerate(tokens):
×
190
                if len(token) >= token_max_len:
×
191
                    token_max_len = len(token)
×
192
                    token_max_idx = i
×
193

194
            # choose the position that covers longest token
195
            cut_pos = _TEXT_SCAN_BEGIN
×
196
            for i in range(0, token_max_idx):
×
197
                cut_pos = cut_pos + len(tokens[i])
×
198

199
        text_parts.append(text[:cut_pos])
×
200
        text = text[cut_pos:]
×
201

202
    # append remaining text
203
    if len(text):
×
204
        text_parts.append(text)
×
205

206
    # tokenizes each text part
207
    tokens = []
×
208
    for text_part in text_parts:
×
209
        tokens.extend(list(_onecut(text_part, custom_dict)))
×
210

211
    return tokens
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc