• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gcivil-nyu-org / Wednesday-Fall2023-Team-1 / #614980386

13 Dec 2023 12:50AM UTC coverage: 86.973%. First build
#614980386

Pull #257

travis-ci

Pull Request #257: Final sprint

702 of 804 new or added lines in 23 files covered. (87.31%)

1382 of 1589 relevant lines covered (86.97%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.3
/dashboard/vibe_calc.py
1
from django.utils import timezone
1✔
2
from utils import vibe_calc_threads
1✔
3
import lyricsgenius
1✔
4
import os
1✔
5
import openai
1✔
6
import time
1✔
7
from gradio_client import Client
1✔
8
import numpy as np
1✔
9
import re
1✔
10
from dashboard.models import TrackVibe, EmotionVector
1✔
11
from user_profile.models import Vibe
1✔
12
import pandas as pd
1✔
13
from collections import Counter
1✔
14
from django.apps import apps
1✔
15

16
MAX_RETRIES = 2
1✔
17

18
client = Client("https://alfredo273-vibecheck-fasttext.hf.space/", serialize=False)
1✔
19

20

21
def calculate_vibe_async(
1✔
22
    track_names, track_artists, track_ids, audio_features_list, user_id
23
):
24
    audio_vibe, lyric_vibe = check_vibe(
1✔
25
        track_names, track_artists, track_ids, audio_features_list
26
    )
27
    vibe_result = audio_vibe
×
28
    if lyric_vibe:
×
29
        vibe_result += " " + lyric_vibe
×
30

31
    current_time = timezone.now().astimezone(timezone.utc)
×
32

NEW
33
    if len(track_artists) > 1:
×
NEW
34
        artist_string = ",".join(track_artists)
×
35
    else:
NEW
36
        artist_string = track_artists[0]
×
NEW
37
    description = vibe_description(vibe_result, artist_string)
×
38

39
    vibe_data = Vibe(
×
40
        user_id=user_id,
41
        vibe_time=current_time,
42
        user_lyrics_vibe=lyric_vibe,
43
        user_audio_vibe=audio_vibe,
44
        recent_track=track_ids,
45
        user_acousticness=get_feature_average(audio_features_list, "acousticness"),
46
        user_danceability=get_feature_average(audio_features_list, "danceability"),
47
        user_energy=get_feature_average(audio_features_list, "energy"),
48
        user_valence=get_feature_average(audio_features_list, "valence"),
49
        description=description,
50
    )
51
    vibe_data.save()
×
52

53
    # Thread is finished calculating, delete from current thread dictionary
54
    vibe_calc_threads.pop(user_id, None)
×
55

56

57
def check_vibe(track_names, track_artists, track_ids, audio_features_list):
1✔
58
    # Fetch existing vibes from the database
59
    existing_vibes = TrackVibe.objects.filter(track_id__in=track_ids)
1✔
60
    existing_vibes_dict = {vibe.track_id: vibe for vibe in existing_vibes}
1✔
61

62
    # Ids and features of new tracks that need audio analysis
63
    track_needing_audio = []
1✔
64
    # Audio vibes of old tracks that had analysis already
65
    track_has_audio = []
1✔
66

67
    # Ids of new tracks that need lyric analysis
68
    tracks_needing_lyrics = []
1✔
69
    # Lyric vibes of old tracks that had analysis already
70
    tracks_has_lyrics = []
1✔
71

72
    for name, artist, track_id, audio_features in zip(
1✔
73
        track_names, track_artists, track_ids, audio_features_list
74
    ):
75
        track_vibe = existing_vibes_dict.get(track_id)
1✔
76
        if not track_vibe:
1✔
NEW
77
            track_needing_audio.append((track_id, audio_features))
×
NEW
78
            tracks_needing_lyrics.append((name, artist, track_id))
×
79
        else:
80
            track_has_audio.append(track_vibe.track_audio_vibe)
1✔
81
            if track_vibe.track_lyrics_vibe is None:
1✔
82
                tracks_needing_lyrics.append((name, artist, track_id))
1✔
83
            else:
NEW
84
                tracks_has_lyrics.append(track_vibe.track_lyrics_vibe)
×
85

86
    # Audio vibe analysis for tracks that need it, also saves track audio vibes into database
87
    audio_vibes_new = (
1✔
88
        deduce_audio_vibe(*zip(*track_needing_audio)) if track_needing_audio else []
89
    )
90
    # Get final audio vibe with new audio vibes and audio vibes already in database
91
    audio_final_vibe = get_most_count(audio_vibes_new + track_has_audio)
1✔
92

93
    # Lyric vibe analysis for tracks that need it, also saves track lyric vibes into database
94
    names, artists, ids = zip(*tracks_needing_lyrics)
1✔
95
    lyrics_vibes_new = (
1✔
96
        deduce_lyrics(names, artists, ids) if tracks_needing_lyrics else []
97
    )
98
    # Get final lyric vibe with new lyric vibes and lyric vibes already in database
NEW
99
    lyrics_final_vibe = lyrics_vectorize(lyrics_vibes_new + tracks_has_lyrics)
×
100

NEW
101
    return audio_final_vibe, lyrics_final_vibe
×
102

103

104
def deduce_audio_vibe(track_ids, audio_features_list):
1✔
105
    # Create a DataFrame from the list of audio features dictionaries
NEW
106
    spotify_data = pd.DataFrame(audio_features_list)
×
107

108
    # Rename 'duration_ms' to 'length' and normalize by dividing by the maximum value
NEW
109
    spotify_data.rename(columns={"duration_ms": "length"}, inplace=True)
×
NEW
110
    if not spotify_data["length"].empty:
×
NEW
111
        max_length = spotify_data["length"].max()
×
NEW
112
        spotify_data["length"] = spotify_data["length"] / max_length
×
113

114
    # Reorder columns based on the model's expectations
NEW
115
    ordered_features = [
×
116
        "length",
117
        "danceability",
118
        "acousticness",
119
        "energy",
120
        "instrumentalness",
121
        "liveness",
122
        "valence",
123
        "loudness",
124
        "speechiness",
125
        "tempo",
126
        "key",
127
        "time_signature",
128
    ]
129

130
    # Ensure the DataFrame has all the required columns in the correct order
NEW
131
    spotify_data = spotify_data[ordered_features]
×
132

133
    # Predict the moods using the model
NEW
134
    model = apps.get_app_config("dashboard").model
×
NEW
135
    pred = model.predict(spotify_data)
×
136

137
    # Define the mood dictionary
NEW
138
    mood_dict = {
×
139
        0: "happy",
140
        1: "sad",
141
        2: "energetic",
142
        3: "calm",
143
        4: "anxious",
144
        5: "cheerful",
145
        6: "gloomy",
146
        7: "content",
147
    }
148

NEW
149
    audio_vibes = []
×
150

151
    # Save track audio into database
NEW
152
    for track_id, prediction in zip(track_ids, pred):
×
NEW
153
        mood = mood_dict[prediction]
×
154

NEW
155
        existing = TrackVibe.objects.filter(track_id=track_id).first()
×
NEW
156
        if not existing:
×
NEW
157
            track_data = TrackVibe(
×
158
                track_id=track_id,
159
                track_audio_vibe=mood,
160
            )
NEW
161
            track_data.save()
×
162

NEW
163
        audio_vibes.append(mood)
×
164

NEW
165
    return audio_vibes
×
166

167

168
def get_most_count(vibes):
1✔
169
    # Returns the most commonly appeared word in a list of words
170

171
    vibe_counts = Counter(vibes)
1✔
172
    most_common_vibe = vibe_counts.most_common(1)[0][0]
1✔
173
    return most_common_vibe
1✔
174

175

176
def deduce_lyrics(track_names, track_artists, track_ids):
1✔
177
    genius = lyricsgenius.Genius(os.getenv("GENIUS_CLIENT_ACCESS_TOKEN"))
1✔
178
    genius.timeout = 15
1✔
179

180
    lyrics_vibes = []
1✔
181

182
    lyrics_data = {}
1✔
183
    for track, artist, id in zip(track_names, track_artists, track_ids):
1✔
184
        genius_retries = 0
1✔
185
        while genius_retries < MAX_RETRIES:
1✔
186
            try:
1✔
187
                query = f'"{track}" "{artist}"'
1✔
188
                song = genius.search_song(query)
1✔
189

NEW
190
            except Exception as e:
×
NEW
191
                print(f"Error getting genius for {track}: {e}")
×
NEW
192
                genius_retries += 1
×
NEW
193
                continue
×
194

195
            if song is not None:
1✔
196
                # Genius song object sometimes has trailing space, so need to strip
197
                geniusTitle = song.title.lower().replace("\u200b", " ").strip()
1✔
198
                geniusArtist = song.artist.lower().replace("\u200b", " ").strip()
1✔
199
                if geniusTitle == track.lower() and geniusArtist == artist.lower():
1✔
200
                    print("Inputting lyrics..")
1✔
201
                    lyrics_data[(track, artist, id)] = song.lyrics
1✔
202

203
            break
1✔
204

205
    openai.api_key = os.getenv("OPEN_AI_TOKEN")
1✔
206

207
    for (track, artist, id), lyrics in lyrics_data.items():
1✔
208
        short_lyrics = lyrics[:2048]
1✔
209
        retries = 0
1✔
210
        while retries < MAX_RETRIES:
1✔
211
            try:
1✔
212
                print(f"Processing song. Track: {track}, Artist: {artist}, ID: {id}")
1✔
213
                print(f"Lyrics: {short_lyrics[:200]}")
1✔
214
                response = openai.ChatCompletion.create(
1✔
215
                    model="gpt-3.5-turbo",
216
                    messages=[
217
                        {"role": "system", "content": "You are a helpful assistant."},
218
                        {
219
                            "role": "user",
220
                            "content": f"You are a mood analyzer that can only return a single word. Based on these song lyrics, return a single word that matches this song's mood: '{short_lyrics}'",
221
                        },
222
                    ],
223
                    request_timeout=5,
224
                )
225
                vibe = response.choices[0].message["content"].strip()
1✔
226
                checkLength = vibe.split()
1✔
227
                if len(checkLength) == 1:
1✔
228
                    lyrics_vibes.append(vibe.lower())
1✔
229
                    track_entry = TrackVibe.objects.filter(track_id=id).first()
1✔
230
                    if track_entry:
1✔
231
                        # track_entry should always exist since we did audio analysis first!
NEW
232
                        track_entry.track_lyrics_vibe = vibe.lower()
×
NEW
233
                        track_entry.save()
×
234

235
                print(f"The vibe for {track} is: {vibe}")
1✔
236

237
                break
1✔
238
            except Exception as e:
×
239
                print(f"Error processing the vibe for {track}: {e}")
×
240
                retries += 1
×
241

242
            if retries >= MAX_RETRIES:
×
243
                print(f"Retries maxed out processing the vibe for {track}.")
×
244
                break
×
245
            else:
246
                time.sleep(1)
×
247

248
    return lyrics_vibes
1✔
249

250

251
def lyrics_vectorize(lyrics_vibes):
1✔
252
    if lyrics_vibes:
1✔
253
        for i in lyrics_vibes:
1✔
254
            print("Lyrics vibes: " + i)
1✔
255
        avg_lyr_vibe = average_vector(lyrics_vibes)
1✔
256
        closest_emotion = find_closest_emotion(avg_lyr_vibe)
1✔
257
        return str(closest_emotion)
1✔
258
    else:
259
        return None
1✔
260

261

262
def average_vector(words):
1✔
263
    # Compute the average vector for a list of words.
264
    vectors = []
1✔
265
    for word in words:
1✔
266
        try:
1✔
267
            str_vector = client.predict("get_vector", word, api_name="/predict")
1✔
268
            vector = string_to_vector(str_vector)
1✔
269
            vectors.append(vector)
1✔
270
        except Exception as e:
×
271
            print(f"Error processing word '{word}': {e}")
×
272

273
    if vectors:
1✔
274
        return np.mean(vectors, axis=0)
1✔
275
    else:
276
        # Return a zeros vector of 300 dimension
277
        return np.zeros(300)
×
278

279

280
def string_to_vector(str):
1✔
281
    clean = re.sub(r"[\[\]\n\t]", "", str)
1✔
282
    clean = clean.split()
1✔
283
    clean = [float(e) for e in clean]
1✔
284
    return clean
1✔
285

286

287
def find_closest_emotion(final_vibe):
1✔
288
    emotion_words = [
1✔
289
        "happy",
290
        "sad",
291
        "angry",
292
        "anxious",
293
        "content",
294
        "excited",
295
        "bored",
296
        "nostalgic",
297
        "frustrated",
298
        "hopeful",
299
        "afraid",
300
        "confident",
301
        "jealous",
302
        "grateful",
303
        "lonely",
304
        "rebellious",
305
        "relaxed",
306
        "amused",
307
        "curious",
308
        "ashamed",
309
        "sympathetic",
310
        "disappointed",
311
        "proud",
312
        "enthusiastic",
313
        "empathetic",
314
        "shocked",
315
        "calm",
316
        "inspired",
317
        "indifferent",
318
        "romantic",
319
        "tense",
320
        "euphoric",
321
        "restless",
322
        "serene",
323
        "sensual",
324
        "reflective",
325
        "playful",
326
        "dark",
327
        "optimistic",
328
        "mysterious",
329
        "seductive",
330
        "regretful",
331
        "detached",
332
        "melancholic",
333
    ]
334

335
    max_similarity = -1
1✔
336
    closest_emotion = None
1✔
337
    for word in emotion_words:
1✔
338
        word_vec = get_emotion_vector(word)
1✔
339
        similarity = cosine_similarity(final_vibe, word_vec)
1✔
340
        if similarity > max_similarity:
1✔
341
            max_similarity = similarity
1✔
342
            closest_emotion = word
1✔
343
    return closest_emotion
1✔
344

345

346
def get_emotion_vector(input_emotion):
1✔
347
    input_emotion = input_emotion.lower()
1✔
348
    vector_str = EmotionVector.objects.filter(emotion=input_emotion).first()
1✔
349

350
    if not vector_str:
1✔
351
        # We should always get vector string stored in our database,
352
        # but if somehow is not in database..
NEW
353
        try:
×
NEW
354
            vector_str = client.predict(
×
355
                "get_vector", input_emotion, api_name="/predict"
356
            )
NEW
357
        except Exception:
×
NEW
358
            return np.zeros(300)
×
359

360
    else:
361
        vector_str = vector_str.vector
1✔
362

363
    return string_to_vector(vector_str)
1✔
364

365

366
def cosine_similarity(vec_a, vec_b):
1✔
367
    return np.dot(vec_a, vec_b) / (np.linalg.norm(vec_a) * np.linalg.norm(vec_b))
1✔
368

369

370
def vibe_description(final_vibe, artist_string):
1✔
371
    openai.api_key = os.getenv("OPEN_AI_TOKEN")
×
372

373
    try:
×
374
        response = openai.ChatCompletion.create(
×
375
            model="gpt-3.5-turbo",
376
            messages=[
377
                {"role": "system", "content": "You are a helpful assistant."},
378
                {
379
                    "role": "user",
380
                    "content": f"This is the output of a program that takes Spotify listening history of a person "
381
                    f"and their lyrics and classifies a daily vibe. Take the daily vibe, this being: '{final_vibe}', "
382
                    f"and describe this person's music vibe and energy today as if you were talking to them. "
383
                    f"We know this person listens to the following artists: '{artist_string}'. Only mention a few of the artists you actually have knowledge about. "
384
                    f"Use pop culture terms and be brief but precise, under 185 words. Make sure to describe their daily vibe. ",
385
                },
386
            ],
387
            request_timeout=50,
388
        )
389

390
        response = response.choices[0].message["content"].strip()
×
391
        return response
×
392

393
    except Exception:
×
394
        return None
×
395

396

397
def get_feature_average(list, feature):
1✔
398
    total = sum(track[feature] for track in list)
1✔
399
    average = total / len(list)
1✔
400
    return average
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc