""" 歌词处理模块 负责歌词的解析、格式化、存储和获取 """ import os import re import logging import requests import sqlite3 from urllib.parse import quote from Crypto.Cipher import AES import base64 import random import codecs import json logger = logging.getLogger(__name__) # 配置应用支持目录 APP_SUPPORT_DIR = os.path.expanduser('~/Library/Application Support/lyroc') os.makedirs(APP_SUPPORT_DIR, exist_ok=True) # SQLite数据库配置 DB_PATH = os.path.join(APP_SUPPORT_DIR, 'lyroc.db') def init_db(): """初始化SQLite数据库""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 创建歌词表 cursor.execute(''' CREATE TABLE IF NOT EXISTS lyrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, track_name TEXT NOT NULL, artist TEXT NOT NULL, lyrics_content TEXT NOT NULL, source TEXT DEFAULT 'netease', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(track_name, artist) ) ''') conn.commit() conn.close() logger.debug(f"数据库初始化完成: {DB_PATH}") # 初始化数据库 init_db() def clear_lyrics_cache(): """清空歌词缓存""" global _lyrics_cache _lyrics_cache = { "track_name": None, "artist": None, "lyrics_text": None, "source": None, "deleted": False } # 歌词缓存 clear_lyrics_cache() # 网易云音乐API配置 NETEASE_API_BASE = "https://music.163.com/weapi" NETEASE_SEARCH_URL = f"{NETEASE_API_BASE}/search/get" NETEASE_LYRIC_URL = f"{NETEASE_API_BASE}/song/lyric" # 加密相关配置 MODULUS = '00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7' NONCE = '0CoJUm6Qyw8W8jud' PUBKEY = '010001' IV = '0102030405060708' class LyricLine: def __init__(self, time, text): self.time = time # 时间(秒) self.text = text # 歌词文本 def load_lyrics_from_db(track_name, artist): """从数据库加载歌词""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT lyrics_content FROM lyrics WHERE track_name = ? AND artist = ? ''', (track_name, artist)) result = cursor.fetchone() conn.close() if result: return result[0] except Exception as e: logger.error(f"加载数据库歌词时出错: {e}", exc_info=True) return None def parse_time(time_str): """解析时间标记 [mm:ss.xxx]""" try: # 移除方括号 time_str = time_str.strip('[]') # 分离分钟和秒 minutes, seconds = time_str.split(':') # 转换为秒 total_seconds = float(minutes) * 60 + float(seconds) return total_seconds except: return 0 def format_lrc_lyrics(lyrics_text, current_position=None): """格式化 LRC 歌词,找出当前、下一句和下下句歌词""" if not lyrics_text: return None, None, None, None # 解析歌词行 lyric_lines = [] for line in lyrics_text.split('\n'): line = line.strip() if not line: continue # 查找所有时间标记 time_tags = re.findall(r'\[([0-9:.]+)\]', line) if not time_tags: continue text = re.sub(r'\[[0-9:.]+\]', '', line).strip() if not text: continue # 一行可能有多个时间标记 for time_tag in time_tags: try: time_seconds = parse_time(time_tag) lyric_lines.append({"time": time_seconds, "text": text}) except Exception as e: logger.error(f"解析时间标记出错: {time_tag}, 错误: {e}") # 按时间排序 lyric_lines.sort(key=lambda x: x["time"]) # 如果没有有效歌词,返回空 if not lyric_lines: return None, None, None, None if not current_position: # 如果没有提供当前位置,返回前三句歌词 current = lyric_lines[0]["text"] if len(lyric_lines) > 0 else None next_ = lyric_lines[1]["text"] if len(lyric_lines) > 1 else None next_next = lyric_lines[2]["text"] if len(lyric_lines) > 2 else None return current, next_, next_next, lyric_lines[0]["time"] # 找出当前歌词 current_index = -1 for i, line in enumerate(lyric_lines): if line["time"] > current_position: if i > 0: current_index = i - 1 break # 处理当前位置超过最后一行歌词的情况 if current_index == -1 and lyric_lines: # 如果遍历完所有歌词都没找到大于当前位置的,说明当前位置超过了最后一行歌词 if current_position >= lyric_lines[-1]["time"]: # 返回最后三句歌词,将最后一句作为当前歌词 last_index = len(lyric_lines) - 1 if last_index >= 2: return lyric_lines[last_index]["text"], None, None, lyric_lines[last_index]["time"] elif last_index == 1: return lyric_lines[1]["text"], None, None, lyric_lines[1]["time"] else: return lyric_lines[0]["text"], None, None, lyric_lines[0]["time"] # 如果找到了当前歌词 if current_index >= 0: current = lyric_lines[current_index]["text"] next_ = lyric_lines[current_index + 1]["text"] if current_index + 1 < len(lyric_lines) else None next_next = lyric_lines[current_index + 2]["text"] if current_index + 2 < len(lyric_lines) else None return current, next_, next_next, lyric_lines[current_index]["time"] # 如果当前位置在所有歌词之前 if lyric_lines: return None, lyric_lines[0]["text"], lyric_lines[1]["text"] if len(lyric_lines) > 1 else None, lyric_lines[0]["time"] return None, None, None, None def get_lyrics_data(status): """获取当前歌词(先获取状态,再从缓存或存储中获取歌词)""" global _lyrics_cache # 如果没有播放,清空缓存并返回空 if status.get("status") not in ["playing", "paused"]: clear_lyrics_cache() return { "current_lyric_time": None, "current_lyric": None, "next_lyric": None, "next_next_lyric": None, "track_name": None, "artist_name": None } track_name = status.get("track_name") artist = status.get("artist") position = status.get("position", 0) # 检查是否切换了歌曲 if _lyrics_cache["track_name"] != track_name or _lyrics_cache["artist"] != artist: logger.debug(f"歌曲切换,重新获取歌词: {track_name} - {artist}") # 清空缓存 _lyrics_cache["track_name"] = track_name _lyrics_cache["artist"] = artist _lyrics_cache["lyrics_text"] = None _lyrics_cache["deleted"] = False # 1. 先尝试从数据库获取 lyrics_text = load_lyrics_from_db(track_name, artist) if lyrics_text: _lyrics_cache["lyrics_text"] = lyrics_text _lyrics_cache["source"] = "db" # 2. 如果本地没有,且不是被删除的歌词,尝试从网易云音乐API获取歌词 elif not _lyrics_cache["lyrics_text"] and not _lyrics_cache.get("deleted", False): lyrics_text = search_lyrics(track_name, artist) if lyrics_text: # 保存到数据库 save_lyrics(track_name, artist, lyrics_text) _lyrics_cache["lyrics_text"] = lyrics_text _lyrics_cache["source"] = "netease" # 如果有缓存的歌词,直接解析 if _lyrics_cache["lyrics_text"]: if _lyrics_cache.get("deleted", False): return { "current_lyric_time": None, "current_lyric": None, "next_lyric": None, "next_next_lyric": None, } # 根据播放位置解析当前歌词 current, next_, next_next, current_time = format_lrc_lyrics(_lyrics_cache["lyrics_text"], position) if current or next_ or next_next: return { "current_lyric_time": current_time, "current_lyric": current, "next_lyric": next_, "next_next_lyric": next_next, "track_name": track_name, "artist_name": artist, "lyrics_source": _lyrics_cache["source"] # 标记歌词来源 } # 都没找到返回空 return { "current_lyric_time": None, "current_lyric": None, "next_lyric": None, "next_next_lyric": None, "track_name": track_name, "artist_name": artist, "lyrics_source": "none" # 标记歌词来源 } def create_secret_key(size): """生成随机密钥""" return ''.join(random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(size)) def aes_encrypt(text, key): """AES加密""" pad = 16 - len(text) % 16 text = text + chr(pad) * pad encryptor = AES.new(key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8')) encrypt_text = encryptor.encrypt(text.encode('utf-8')) encrypt_text = base64.b64encode(encrypt_text).decode('utf-8') return encrypt_text def rsa_encrypt(text, pubkey, modulus): """RSA加密""" text = text[::-1] rs = pow(int(codecs.encode(text.encode('utf-8'), 'hex'), 16), int(pubkey, 16), int(modulus, 16)) return format(rs, 'x').zfill(256) headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36', 'Referer': 'https://music.163.com/', 'Origin': 'https://music.163.com', 'Content-Type': 'application/x-www-form-urlencoded' } def encrypt_request(text): """加密请求数据""" secret_key = create_secret_key(16) params = aes_encrypt(text, NONCE) params = aes_encrypt(params, secret_key) encSecKey = rsa_encrypt(secret_key, PUBKEY, MODULUS) return { 'params': params, 'encSecKey': encSecKey } def get_lyrics_from_netease(track_name, artist_name): # 构建搜索关键词 search_keyword = f"{track_name} {artist_name}" # 搜索歌曲 data = { 's': search_keyword, 'type': 1, # 1: 单曲, 10: 专辑, 100: 歌手, 1000: 歌单 'limit': 5, 'offset': 0, 'hlpretag': '', 'hlposttag': '', 'total': True, 'csrf_token': '' } encrypted_data = encrypt_request(json.dumps(data)) response = requests.post(NETEASE_SEARCH_URL, data=encrypted_data, headers=headers) logger.debug(f"网易云音乐搜索响应: {response.text}") response.raise_for_status() result = response.json() return result def get_lyrics_from_id(id): """从网易云音乐获取歌词""" # 获取歌词 data = { 'id': id, 'lv': 1, # 获取歌词 'kv': 1, # 获取翻译 'tv': -1, # 不获取罗马音 'csrf_token': '' } encrypted_data = encrypt_request(json.dumps(data)) response = requests.post(NETEASE_LYRIC_URL, data=encrypted_data, headers=headers) response.raise_for_status() result = response.json() return result def search_lyrics(track_name, artist_name): """从网易云音乐搜索并获取歌词""" try: result = get_lyrics_from_netease(track_name, artist_name) if result['code'] == 200 and result['result']['songCount'] > 0: song = result['result']['songs'][0] id = song['id'] lrc_result = get_lyrics_from_id(id) if lrc_result['code'] == 200 and 'lrc' in lrc_result: logger.debug(f"网易云音乐歌词: {lrc_result['lrc']['lyric']}") return lrc_result['lrc']['lyric'] return None except Exception as e: logger.error(f"从网易云音乐获取歌词时出错: {e}", exc_info=True) return None def format_lyrics(lyrics): """格式化歌词文本,保留时间信息""" if not lyrics: logger.debug("歌词内容为空") return [] lyric_lines = [] for line in lyrics.split('\n'): line = line.strip() if not line: continue # 查找所有时间标记 time_tags = re.findall(r'\[([0-9:.]+)\]', line) if not time_tags: continue text = re.sub(r'\[[0-9:.]+\]', '', line).strip() if not text: continue # 可能一行有多个时间标记 for time_tag in time_tags: try: time_seconds = parse_time(time_tag) lyric_lines.append(LyricLine(time_seconds, text)) except Exception as e: logger.error(f"解析时间标记出错: {time_tag}, 错误: {e}") # 按时间排序 sorted_lyrics = sorted(lyric_lines, key=lambda x: x.time) logger.debug(f"解析完成,共 {len(sorted_lyrics)} 行歌词") return sorted_lyrics def save_lyrics(track_name, artist, lyrics): """保存歌词到本地文件,如果已存在则更新""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' INSERT INTO lyrics (track_name, artist, lyrics_content) VALUES (?, ?, ?) ON CONFLICT(track_name, artist) DO UPDATE SET lyrics_content = excluded.lyrics_content, created_at = CURRENT_TIMESTAMP ''', (track_name, artist, lyrics)) conn.commit() conn.close() logger.debug(f"歌词已保存/更新到数据库: {track_name} - {artist}") except Exception as e: logger.error(f"保存歌词时出错: {e}", exc_info=True) def delete_lyrics(track_name, artist): """从数据库中删除歌词""" try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' DELETE FROM lyrics WHERE track_name = ? AND artist = ? ''', (track_name, artist)) conn.commit() deleted = cursor.rowcount > 0 conn.close() if deleted: logger.info(f"已删除歌词: {track_name} - {artist}") # 标记这首歌的歌词已被删除,避免重新获取 _lyrics_cache["deleted"] = True return True else: logger.warning(f"未找到要删除的歌词: {track_name} - {artist}") return False except Exception as e: logger.error(f"删除歌词时出错: {e}", exc_info=True) return False