• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

feeluown / feeluown-netease / 8986919738

07 May 2024 02:11PM UTC coverage: 22.263% (-0.06%) from 22.319%
8986919738

push

github

cosven
support toplist

4 of 24 new or added lines in 4 files covered. (16.67%)

1 existing line in 1 file now uncovered.

362 of 1626 relevant lines covered (22.26%)

0.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.39
/fuo_netease/provider.py
1
import logging
1✔
2

3
from feeluown.library import AbstractProvider, ProviderV2, ProviderFlags as PF, \
1✔
4
    CommentModel, BriefCommentModel, BriefUserModel, UserModel, \
5
    NoUserLoggedIn, LyricModel, ModelNotFound
6
from feeluown.media import Quality, Media
1✔
7
from feeluown.library import ModelType, SearchType
1✔
8
from feeluown.utils.reader import create_reader, SequentialReader
1✔
9
from .api import API, CodeShouldBe200
1✔
10

11

12
logger = logging.getLogger(__name__)
1✔
13
SOURCE = 'netease'
1✔
14

15

16
class NeteaseProvider(AbstractProvider, ProviderV2):
1✔
17
    class meta:
1✔
18
        identifier = SOURCE
1✔
19
        name = '网易云音乐'
1✔
20
        # TODO: remove
21
        flags = {
1✔
22
            ModelType.song: (PF.model_v2 | PF.similar | PF.multi_quality |
23
                             PF.get | PF.hot_comments | PF.web_url |
24
                             PF.lyric | PF.mv),
25
            ModelType.album: (PF.model_v2 | PF.get),
26
            ModelType.artist: (PF.model_v2 | PF.get | PF.songs_rd | PF.albums_rd),
27
            ModelType.video: (PF.model_v2 | PF.get | PF.multi_quality),
28
            ModelType.playlist: (PF.model_v2 | PF.get |
29
                                 PF.songs_rd | PF.add_song | PF.remove_song),
30
            ModelType.none: PF.current_user,
31
        }
32

33
    def __init__(self):
1✔
34
        super().__init__()
1✔
35
        self.api = API()
1✔
36

37
    @property
1✔
38
    def identifier(self):
1✔
39
        return SOURCE
×
40

41
    @property
1✔
42
    def name(self):
1✔
43
        return '网易云音乐'
×
44

45
    def auth(self, user):
1✔
46
        cookies, exists = user.cache_get('cookies')
×
47
        assert exists and cookies is not None
×
48
        self._user = user
×
49
        self.api.load_cookies(cookies)
×
50

51
    def has_current_user(self):
1✔
52
        return self._user is not None
×
53

54
    def get_current_user(self):
1✔
55
        if self._user is None:
×
56
            raise NoUserLoggedIn
×
57
        user = self.user_get(self._user.identifier)
×
58
        return user
×
59

60
    def user_get(self, identifier):
1✔
61
        data = self.api.user_profile(identifier)
×
62
        user = UserModel(identifier=str(identifier),
×
63
                         source=SOURCE,
64
                         name=data['nickname'],
65
                         avatar_url=data['avatarImg'])
66
        return user
×
67

68
    def current_user_fav_create_playlists_rd(self):
1✔
69
        return create_g(self.api.subscribed_djradio, NeteaseDjradioSchema, 'djRadios')
×
70

71
    def current_user_fav_create_artists_rd(self):
1✔
72
        return create_g(self.api.user_favorite_artists, V2BriefArtistSchema)
×
73

74
    def current_user_fav_create_albums_rd(self):
1✔
75
        return create_g(self.api.user_favorite_albums, V2BriefAlbumSchema)
×
76

77
    def current_user_cloud_songs(self):
1✔
78
        return create_cloud_songs_g(
×
79
            self.api.cloud_songs,
80
            self.api.cloud_songs_detail,
81
            V2SongSchemaForV3,
82
            NCloudSchema,
83
            data_key='simpleSong'
84
        )
85

86
    def current_user_playlists(self):
1✔
87
        user_id = str(self._user.identifier)
×
88
        data_playlists = self.api.user_playlists(user_id)
×
89
        playlists = []
×
90
        fav_playlists = []
×
91
        for pl in data_playlists:
×
92
            if str(pl['userId']) == user_id:
×
93
                playlists.append(pl)
×
94
            else:
95
                fav_playlists.append(pl)
×
96
        return [_deserialize(e, V2PlaylistSchema) for e in playlists], \
×
97
            [_deserialize(e, V2PlaylistSchema) for e in fav_playlists]
98

99
    def current_user_get_radio_songs(self):
1✔
100
        songs_data = self.api.get_radio_music()
×
101
        if songs_data is None:
×
102
            logger.error('data should not be None')
×
103
            return None
×
104
        return [_deserialize(song_data, V2SongSchema)
×
105
                for song_data in songs_data]
106

107
    def rec_list_daily_playlists(self):
1✔
108
        if not self.has_current_user():
×
109
            return []
×
110

111
        playlists_data = self.api.get_recommend_playlists()
×
112
        rec_playlists = []
×
113
        for playlist_data in playlists_data:
×
114
            # FIXME: GUI模式下无法显示歌单描述
115
            playlist_data['coverImgUrl'] = playlist_data['picUrl']
×
116
            playlist_data['description'] = None
×
117
            playlist = _deserialize(playlist_data, V2PlaylistSchema)
×
118
            rec_playlists.append(playlist)
×
119
        return rec_playlists
×
120

121
    def rec_list_daily_songs(self):
1✔
122
        songs_data = self.api.get_recommend_songs()
×
123
        return [_deserialize(song_data, V2SongSchemaForV3)
×
124
                for song_data in songs_data]
125

126
    def toplist_list(self):
1✔
NEW
127
        return [_deserialize(each, V2PlaylistSchema) for each in self.api.list_toplist()]
×
128

129
    def toplist_get(self, identifier):
1✔
NEW
130
        return self.playlist_get(identifier)
×
131

132
    def song_get(self, identifier):
1✔
133
        data = self.api.song_detail(int(identifier))
×
134
        return _deserialize(data, V2SongSchema)
×
135

136
    def song_list_similar(self, song):
1✔
137
        songs = self.api.get_similar_song(song.identifier)
×
138
        return [_deserialize(song, V2SongSchema) for song in songs]
×
139

140
    def song_get_lyric(self, song):
1✔
141
        data = self.api.get_lyric_by_songid(song.identifier)
×
142
        return LyricModel(
×
143
            source=SOURCE,
144
            identifier=self.identifier,
145
            content=data.get('lrc', {}).get('lyric', ''),
146
            trans_content=data.get('tlyric', {}).get('lyric', ''),
147
        )
148

149
    def song_get_mv(self, song):
1✔
150
        cache_key = 'mv_id'
×
151
        mvid, exists = song.cache_get(cache_key)
×
152
        if exists is not True:
×
153
            # FIXME: the following implicitly get mv_id attribute
154
            upgraded_song = self.song_get(song.identifier)
×
155
            mvid, exists = upgraded_song.cache_get(cache_key)
×
156
            assert exists is True
×
157
            song.cache_set(cache_key, mvid)
×
158

159
        if mvid:  # if mvid is valid
×
160
            data = self.api.get_mv_detail(mvid)
×
161
            mv = _deserialize(data, V2MvSchema)
×
162
            return mv
×
163
        return None
×
164

165
    def upload_song(self, path: str) -> bool:
1✔
166
        return self.api.cloud_song_upload(path) == 'STATUS_SUCCEEDED'
×
167

168
    def song_list_quality(self, song):
1✔
169
        return list(self._song_get_q_media_mapping(song))
×
170

171
    def song_list_hot_comments(self, song):
1✔
172
        comment_thread_id = self._model_cache_get_or_fetch(song, 'comment_thread_id')
×
173
        data = self.api.get_comment(comment_thread_id)
×
174
        hot_comments_data = data['hotComments']
×
175
        hot_comments = []
×
176
        for comment_data in hot_comments_data:
×
177
            user_data = comment_data['user']
×
178
            user = BriefUserModel(identifier=str(user_data['userId']),
×
179
                                  source=SOURCE,
180
                                  name=user_data['nickname'])
181
            be_replied = comment_data['beReplied']
×
182
            if be_replied:
×
183
                replied_comment_data = be_replied[0]
×
184
                parent = BriefCommentModel(
×
185
                    identifier=replied_comment_data['beRepliedCommentId'],
186
                    user_name=replied_comment_data['user']['nickname'],
187
                    content=replied_comment_data['content']
188
                )
189
            else:
190
                parent = None
×
191
            comment = CommentModel(identifier=str(comment_data['commentId']),
×
192
                                   source=SOURCE,
193
                                   user=user,
194
                                   content=comment_data['content'],
195
                                   liked_count=comment_data['likedCount'],
196
                                   time=comment_data['time'] // 1000,
197
                                   parent=parent,
198
                                   root_comment_id=str(comment_data['parentCommentId']))
199
            hot_comments.append(comment)
×
200
        return hot_comments
×
201

202
    def song_get_media(self, song, quality):
1✔
203
        q_media_mapping = self._song_get_q_media_mapping(song)
×
204
        if quality not in q_media_mapping:
×
205
            return None
×
206
        song_id = int(song.identifier)
×
207
        bitrate, url, format = q_media_mapping.get(quality)
×
208
        # None means the url is not fetched, so try to fetch it.
209
        if url is None:
×
210
            songs_data = self.api.weapi_songs_url([song_id], bitrate)
×
211
            if songs_data:
×
212
                song_data = songs_data[0]
×
213
                url = song_data['url']
×
214
                actual_bitrate = song_data['br']
×
215
                format = song_data['type']
×
216
                # Check the url bitrate while it is not empty. Api
217
                # may return a fallback bitrate when the expected bitrate
218
                # resource is not valid.
219
                if url and abs(actual_bitrate - bitrate) >= 10000:
×
220
                    logger.warning(
×
221
                        f'The actual bitrate is {actual_bitrate} '
222
                        f'while we want {bitrate}. '
223
                        f'[song:{song_id}].'
224
                    )
225
        if url:
×
226
            media = Media(url, bitrate=bitrate//1000, format=format)
×
227
            # update value in cache
228
            q_media_mapping[quality] = (bitrate, url, format)
×
229
            return media
×
230
        logger.error('This should not happend')
×
231
        return None
×
232

233
    def _song_get_q_media_mapping(self, song):
1✔
234
        q_media_mapping, exists = song.cache_get('q_media_mapping')
×
235
        if exists is True:
×
236
            return q_media_mapping
×
237

238
        song_id = int(song.identifier)
×
239
        songs_data = self.api.songs_detail_v3([song_id])
×
240
        if songs_data:
×
241
            q_media_mapping = {}  # {Quality.Audio: (bitrate, url, format)}
×
242
            song_data = songs_data[0]
×
243
            key_quality_mapping = {
×
244
                'h': Quality.Audio.hq,
245
                'm': Quality.Audio.sq,
246
                'l': Quality.Audio.lq,
247
            }
248

249
            # Trick: try to find the highest quality url
250
            # When the song is only for vip/paid user and current user is non-vip,
251
            # the highest bitrate is 0, which means this song is unavailable
252
            # for current user.
253
            songs_url_data = self.api.weapi_songs_url([song_id], 999000)
×
254
            assert songs_url_data, 'length should not be 0'
×
255
            song_url_data = songs_url_data[0]
×
256
            highest_bitrate = song_url_data['br']
×
257
            # When the bitrate is large than 320000, the quality is treated as
258
            # lossless. We set the threshold to 400000 here.
259
            # Note(cosven): From manual testing, the bitrate of lossless media
260
            # can be 740kbps, 883kbps, 1411kbps, 1777kbps.
261
            if song_url_data['url'] and 'privatecloud' in song_url_data['url']:
×
262
                # 对于云盘歌曲, netease会抛弃官方音乐地址, 只会返回自己上传的音乐链接
263
                # bitrate不由用户提供 由官方估算, 且不再是标准的320, 192, 128
264
                q_media_mapping[Quality.Audio.shq] = (highest_bitrate,
×
265
                                                      song_url_data['url'],
266
                                                      song_url_data['type'])
267
            else:
268
                if highest_bitrate > 400000:
×
269
                    q_media_mapping[Quality.Audio.shq] = (highest_bitrate,
×
270
                                                          song_url_data['url'],
271
                                                          song_url_data['type'])
272

273
                for key, quality in key_quality_mapping.items():
×
274
                    # Ensure the quality info exists.
275
                    if key in song_data and song_data[key] is not None:
×
276
                        # This resource is invalid for current user since the expected
277
                        # bitrate is large than the highest_bitrate
278
                        if (song_data[key]['br'] - highest_bitrate) > 10000:
×
279
                            continue
×
280
                        q_media_mapping[quality] = (song_data[key]['br'], None, None)
×
281

282
        ttl = 60 * 20
×
283
        song.cache_set('q_media_mapping', q_media_mapping, ttl)
×
284
        return q_media_mapping
×
285

286
    def song_get_web_url(self, song):
1✔
287
        return f'https://music.163.com/#/song?id={song.identifier}'
×
288

289
    def video_get(self, identifier):
1✔
290
        prefix, real_id = identifier.split('_')
×
291
        assert prefix == 'mv'
×
292
        data = self.api.get_mv_detail(real_id)
×
293
        mv = _deserialize(data, V2MvSchema)
×
294
        return mv
×
295

296
    def video_get_media(self, video, quality):
1✔
297
        q_media_mapping = self._model_cache_get_or_fetch(video, 'q_media_mapping')
×
298
        return q_media_mapping.get(quality)
×
299

300
    def video_list_quality(self, video):
1✔
301
        q_media_mapping = self._model_cache_get_or_fetch(video, 'q_media_mapping')
×
302
        return list(q_media_mapping.keys())
×
303

304
    def album_get(self, identifier):
1✔
305
        album_data = self.api.album_infos(identifier)
×
306
        if album_data is None:
×
307
            raise ModelNotFound
×
308
        description = self.api.album_desc(identifier)
×
309
        album_data['description'] = description
×
310
        album = _deserialize(album_data, V2AlbumSchema)
×
311
        return album
×
312

313
    def album_create_songs_rd(self, album):
1✔
314
        album_with_songs = self.album_get(album.identifier)
×
315
        return create_reader(album_with_songs.songs)
×
316

317
    def artist_get(self, identifier):
1✔
318
        artist_data = self.api.artist_infos(identifier)
×
319
        artist = artist_data['artist']
×
320
        artist['songs'] = artist_data['hotSongs'] or []
×
321
        description = self.api.artist_desc(identifier)
×
322
        artist['description'] = description
×
323
        artist['aliases'] = []
×
324
        model = _deserialize(artist, V2ArtistSchema)
×
325
        return model
×
326

327
    # TODO: artist create albums g
328
    # TODO: artist create songs g
329
    def artist_create_songs_rd(self, artist):
1✔
330
        data = self.api.artist_songs(artist.identifier, limit=0)
×
331
        count = int(data['total'])
×
332

333
        def g():
×
334
            offset = 0
×
335
            per = 50
×
336
            while offset < count:
×
337
                data = self.api.artist_songs(artist.identifier, offset, per)
×
338
                for song_data in data['songs']:
×
339
                    yield _deserialize(song_data, V2SongSchemaForV3)
×
340
                    # In reality, len(data['songs']) may smaller than per,
341
                    # which is a bug of netease server side, so we set
342
                    # offset to `offset + per` here.
343
                offset += per
×
344
                per = 100
×
345

346
        return SequentialReader(g(), count)
×
347

348
    def artist_create_albums_rd(self, artist):
1✔
349

350
        def g():
×
351
            data = self.api.artist_albums(artist.identifier)
×
352
            if data['code'] != 200:
×
353
                yield from ()
×
354
            else:
355
                cur = 1
×
356
                while True:
357
                    for album in data['hotAlbums']:
×
358
                        # the songs field will always be an empty list,
359
                        # we set it to None
360
                        album['songs'] = None
×
361
                        yield _deserialize(album, V2AlbumSchema)
×
362
                        cur += 1
×
363
                    if data['more']:
×
364
                        data = self.api.artist_albums(artist.identifier, offset=cur)
×
365
                    else:
366
                        break
×
367

368
        return create_reader(g())
×
369

370
    def playlist_get(self, identifier):
1✔
371
        data = self.api.playlist_detail_v3(identifier, limit=0)
1✔
372
        playlist = _deserialize(data, V2PlaylistSchema)
1✔
373
        return playlist
1✔
374

375
    def playlist_delete(self, identifier):
1✔
376
        try:
×
377
            self.api.delete_playlist(identifier)
×
378
        except CodeShouldBe200 as e:
×
379
            logger.warning(f'delete playlist failed, {e}')
×
380
            return False
×
381
        return True
×
382

383
    def playlist_create_by_name(self, name):
1✔
384
        """
385
        :raises NoUserLoggedIn:
386
        :raises CodeShouldBe200: create playlist failed
387
        """
388
        user = self.get_current_user()
×
389
        uid = user.identifier
×
390
        data = self.api.new_playlist(uid, name)
×
391
        return _deserialize(data, V2PlaylistSchema)
×
392

393
    def djradio_create_songs_rd(self, djradio_id):
1✔
394
        data = self.api.djradio_list(djradio_id, limit=1, offset=0)
×
395
        count = data.get('count', 0)
×
396

397
        def g():
×
398
            offset = 0
×
399
            per = 50  # speed up first request
×
400
            while offset < count:
×
401
                tracks_data = self.api.djradio_list(
×
402
                    djradio_id, limit=per, offset=offset)
403
                for track_data in tracks_data.get('programs', []):
×
404
                    yield _deserialize(track_data, NDjradioSchema)
×
405
                offset += per
×
406
        return create_reader(g())
×
407

408
    def playlist_create_songs_rd(self, playlist):
1✔
409
        if playlist.identifier.startswith(DjradioPrefix):
1✔
410
            return self.djradio_create_songs_rd(playlist.identifier[len(DjradioPrefix):])
×
411

412
        data = self.api.playlist_detail_v3(playlist.identifier, limit=0)
1✔
413
        track_ids = data['trackIds']  # [{'id': 1, 'v': 1}, ...]
1✔
414
        count = len(track_ids)
1✔
415

416
        def g():
1✔
417
            offset = 0
1✔
418
            # 第一次请求应该尽可能快一点,所以这里只获取少量的歌曲。
419
            # 综合页面展示以及请求速度,拍脑袋将值设置为 50。
420
            per = 50
1✔
421
            while offset < count:
1✔
422
                end = min(offset + per, count)
1✔
423
                if end <= offset:
1✔
424
                    break
×
425
                ids = [track_id['id'] for track_id in track_ids[offset: end]]
1✔
426
                # NOTE(cosven): 记忆中这里有个坑,传入的 id 个数和返回的歌曲个数
427
                # 不一定相等。比如在一个叫做“万首歌单”的歌单里面,有的歌曲是
428
                # 获取不到信息的。这也是这里为什么用 SequentialReader 而不是
429
                # RandomSequentialReader 的原因。
430
                tracks_data = self.api.songs_detail_v3(ids)
1✔
431
                for track_data in tracks_data:
1✔
432
                    yield _deserialize(track_data, V2SongSchemaForV3)
1✔
433
                offset += per
1✔
434
                # 这里设置为 800 主要是为 readall 的场景考虑的。假设这个值设置很小,
435
                # 那当这个歌单歌曲数量比较多的时候(比如一万首),需要很多个请求
436
                # 才能获取到全部的歌曲。
437
                per = 800
1✔
438

439
        return SequentialReader(g(), count)
1✔
440

441
    def playlist_remove_song(self, playlist, song):
1✔
442
        song_id = song.identifier
×
443
        rv = self.api.op_music_to_playlist(song_id, playlist.identifier, 'del')
×
444
        if rv != 1:
×
445
            return False
×
446
        return True
×
447

448
    def playlist_add_song(self, playlist, song):
1✔
449
        song_id = song.identifier
×
450
        rv = self.api.op_music_to_playlist(song_id, playlist.identifier, 'add')
×
451
        if rv == 1:
×
452
            song = provider.song_get(song_id)
×
453
            return True
×
454
        elif rv == -1:
×
455
            return True
×
456
        return False
×
457

458
    def search(self, keyword, type_, **kwargs):
1✔
459
        type_ = SearchType.parse(type_)
×
460
        type_type_map = {
×
461
            SearchType.so: 1,
462
            SearchType.al: 10,
463
            SearchType.ar: 100,
464
            SearchType.pl: 1000,
465
        }
466
        data = provider.api.search(keyword, stype=type_type_map[type_])
×
467
        data['q'] = keyword
×
468
        result = _deserialize(data, NeteaseSearchSchema)
×
469
        return result
×
470

471

472
provider = NeteaseProvider()
1✔
473

474

475
from .models import _deserialize, create_g, create_cloud_songs_g  # noqa
1✔
476
from .schemas import (  # noqa
1✔
477
    V2SongSchema,
478
    V2SongSchemaForV3,
479
    V2MvSchema,
480
    V2AlbumSchema,
481
    V2ArtistSchema,
482
    V2BriefArtistSchema,
483
    V2BriefAlbumSchema,
484
    V2PlaylistSchema,
485
    NeteaseDjradioSchema,
486
    NeteaseSearchSchema,
487
    NDjradioSchema,
488
    NCloudSchema,
489
    DjradioPrefix,
490
)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc