Files
lyroc/backend/modules/lyrics.py
2025-05-27 14:16:48 +08:00

436 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
歌词处理模块
负责歌词的解析、格式化、存储和获取
"""
import os
import re
import logging
import requests
import sqlite3
from urllib.parse import quote
from Crypto.Cipher import AES
import base64
import random
import codecs
import json
logger = logging.getLogger(__name__)
# 配置应用支持目录
APP_SUPPORT_DIR = os.path.expanduser('~/Library/Application Support/lyroc')
os.makedirs(APP_SUPPORT_DIR, exist_ok=True)
# SQLite数据库配置
DB_PATH = os.path.join(APP_SUPPORT_DIR, 'lyroc.db')
def init_db():
"""初始化SQLite数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建歌词表
cursor.execute('''
CREATE TABLE IF NOT EXISTS lyrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_name TEXT NOT NULL,
artist TEXT NOT NULL,
lyrics_content TEXT NOT NULL,
source TEXT DEFAULT 'netease',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(track_name, artist)
)
''')
conn.commit()
conn.close()
logger.debug(f"数据库初始化完成: {DB_PATH}")
# 初始化数据库
init_db()
def clear_lyrics_cache():
"""清空歌词缓存"""
global _lyrics_cache
_lyrics_cache = {
"track_name": None,
"artist": None,
"lyrics_text": None,
"source": None,
"deleted": False
}
# 歌词缓存
clear_lyrics_cache()
# 网易云音乐API配置
NETEASE_API_BASE = "https://music.163.com/weapi"
NETEASE_SEARCH_URL = f"{NETEASE_API_BASE}/search/get"
NETEASE_LYRIC_URL = f"{NETEASE_API_BASE}/song/lyric"
# 加密相关配置
MODULUS = '00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7'
NONCE = '0CoJUm6Qyw8W8jud'
PUBKEY = '010001'
IV = '0102030405060708'
class LyricLine:
def __init__(self, time, text):
self.time = time # 时间(秒)
self.text = text # 歌词文本
def load_lyrics_from_db(track_name, artist):
"""从数据库加载歌词"""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT lyrics_content FROM lyrics WHERE track_name = ? AND artist = ?
''', (track_name, artist))
result = cursor.fetchone()
conn.close()
if result:
return result[0]
except Exception as e:
logger.error(f"加载数据库歌词时出错: {e}", exc_info=True)
return None
def parse_time(time_str):
"""解析时间标记 [mm:ss.xxx]"""
try:
# 移除方括号
time_str = time_str.strip('[]')
# 分离分钟和秒
minutes, seconds = time_str.split(':')
# 转换为秒
total_seconds = float(minutes) * 60 + float(seconds)
return total_seconds
except:
return 0
def format_lrc_lyrics(lyrics_text, current_position=None):
"""格式化 LRC 歌词,找出当前、下一句和下下句歌词"""
if not lyrics_text:
return None, None, None, None
# 解析歌词行
lyric_lines = []
for line in lyrics_text.split('\n'):
line = line.strip()
if not line:
continue
# 查找所有时间标记
time_tags = re.findall(r'\[([0-9:.]+)\]', line)
if not time_tags:
continue
text = re.sub(r'\[[0-9:.]+\]', '', line).strip()
if not text:
continue
# 一行可能有多个时间标记
for time_tag in time_tags:
try:
time_seconds = parse_time(time_tag)
lyric_lines.append({"time": time_seconds, "text": text})
except Exception as e:
logger.error(f"解析时间标记出错: {time_tag}, 错误: {e}")
# 按时间排序
lyric_lines.sort(key=lambda x: x["time"])
# 如果没有有效歌词,返回空
if not lyric_lines:
return None, None, None, None
if not current_position:
# 如果没有提供当前位置,返回前三句歌词
current = lyric_lines[0]["text"] if len(lyric_lines) > 0 else None
next_ = lyric_lines[1]["text"] if len(lyric_lines) > 1 else None
next_next = lyric_lines[2]["text"] if len(lyric_lines) > 2 else None
return current, next_, next_next, lyric_lines[0]["time"]
# 找出当前歌词
current_index = -1
for i, line in enumerate(lyric_lines):
if line["time"] > current_position:
if i > 0:
current_index = i - 1
break
# 处理当前位置超过最后一行歌词的情况
if current_index == -1 and lyric_lines:
# 如果遍历完所有歌词都没找到大于当前位置的,说明当前位置超过了最后一行歌词
if current_position >= lyric_lines[-1]["time"]:
# 返回最后三句歌词,将最后一句作为当前歌词
last_index = len(lyric_lines) - 1
if last_index >= 2:
return lyric_lines[last_index]["text"], None, None, lyric_lines[last_index]["time"]
elif last_index == 1:
return lyric_lines[1]["text"], None, None, lyric_lines[1]["time"]
else:
return lyric_lines[0]["text"], None, None, lyric_lines[0]["time"]
# 如果找到了当前歌词
if current_index >= 0:
current = lyric_lines[current_index]["text"]
next_ = lyric_lines[current_index + 1]["text"] if current_index + 1 < len(lyric_lines) else None
next_next = lyric_lines[current_index + 2]["text"] if current_index + 2 < len(lyric_lines) else None
return current, next_, next_next, lyric_lines[current_index]["time"]
# 如果当前位置在所有歌词之前
if lyric_lines:
return None, lyric_lines[0]["text"], lyric_lines[1]["text"] if len(lyric_lines) > 1 else None, lyric_lines[0]["time"]
return None, None, None, None
def get_lyrics_data(status):
"""获取当前歌词(先获取状态,再从缓存或存储中获取歌词)"""
global _lyrics_cache
# 如果没有播放,清空缓存并返回空
if status.get("status") not in ["playing", "paused"]:
clear_lyrics_cache()
return {
"current_lyric_time": None,
"current_lyric": None,
"next_lyric": None,
"next_next_lyric": None,
"track_name": None,
"artist_name": None
}
track_name = status.get("track_name")
artist = status.get("artist")
position = status.get("position", 0)
# 检查是否切换了歌曲
if _lyrics_cache["track_name"] != track_name or _lyrics_cache["artist"] != artist:
logger.debug(f"歌曲切换,重新获取歌词: {track_name} - {artist}")
# 清空缓存
_lyrics_cache["track_name"] = track_name
_lyrics_cache["artist"] = artist
_lyrics_cache["lyrics_text"] = None
_lyrics_cache["deleted"] = False
# 1. 先尝试从数据库获取
lyrics_text = load_lyrics_from_db(track_name, artist)
if lyrics_text:
_lyrics_cache["lyrics_text"] = lyrics_text
_lyrics_cache["source"] = "db"
# 2. 如果本地没有且不是被删除的歌词尝试从网易云音乐API获取歌词
elif not _lyrics_cache["lyrics_text"] and not _lyrics_cache.get("deleted", False):
lyrics_text = search_lyrics(track_name, artist)
if lyrics_text:
# 保存到数据库
save_lyrics(track_name, artist, lyrics_text)
_lyrics_cache["lyrics_text"] = lyrics_text
_lyrics_cache["source"] = "netease"
# 如果有缓存的歌词,直接解析
if _lyrics_cache["lyrics_text"]:
if _lyrics_cache.get("deleted", False):
return {
"current_lyric_time": None,
"current_lyric": None,
"next_lyric": None,
"next_next_lyric": None,
}
# 根据播放位置解析当前歌词
current, next_, next_next, current_time = format_lrc_lyrics(_lyrics_cache["lyrics_text"], position)
if current or next_ or next_next:
return {
"current_lyric_time": current_time,
"current_lyric": current,
"next_lyric": next_,
"next_next_lyric": next_next,
"track_name": track_name,
"artist_name": artist,
"lyrics_source": _lyrics_cache["source"] # 标记歌词来源
}
# 都没找到返回空
return {
"current_lyric_time": None,
"current_lyric": None,
"next_lyric": None,
"next_next_lyric": None,
"track_name": track_name,
"artist_name": artist,
"lyrics_source": "none" # 标记歌词来源
}
def create_secret_key(size):
"""生成随机密钥"""
return ''.join(random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(size))
def aes_encrypt(text, key):
"""AES加密"""
pad = 16 - len(text) % 16
text = text + chr(pad) * pad
encryptor = AES.new(key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8'))
encrypt_text = encryptor.encrypt(text.encode('utf-8'))
encrypt_text = base64.b64encode(encrypt_text).decode('utf-8')
return encrypt_text
def rsa_encrypt(text, pubkey, modulus):
"""RSA加密"""
text = text[::-1]
rs = pow(int(codecs.encode(text.encode('utf-8'), 'hex'), 16), int(pubkey, 16), int(modulus, 16))
return format(rs, 'x').zfill(256)
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
'Referer': 'https://music.163.com/',
'Origin': 'https://music.163.com',
'Content-Type': 'application/x-www-form-urlencoded'
}
def encrypt_request(text):
"""加密请求数据"""
secret_key = create_secret_key(16)
params = aes_encrypt(text, NONCE)
params = aes_encrypt(params, secret_key)
encSecKey = rsa_encrypt(secret_key, PUBKEY, MODULUS)
return {
'params': params,
'encSecKey': encSecKey
}
def get_lyrics_from_netease(track_name, artist_name):
# 构建搜索关键词
search_keyword = f"{track_name} {artist_name}"
# 搜索歌曲
data = {
's': search_keyword,
'type': 1, # 1: 单曲, 10: 专辑, 100: 歌手, 1000: 歌单
'limit': 5,
'offset': 0,
'hlpretag': '<span class="s-fc7">',
'hlposttag': '</span>',
'total': True,
'csrf_token': ''
}
encrypted_data = encrypt_request(json.dumps(data))
response = requests.post(NETEASE_SEARCH_URL, data=encrypted_data, headers=headers)
logger.debug(f"网易云音乐搜索响应: {response.text}")
response.raise_for_status()
result = response.json()
return result
def get_lyrics_from_id(id):
"""从网易云音乐获取歌词"""
# 获取歌词
data = {
'id': id,
'lv': 1, # 获取歌词
'kv': 1, # 获取翻译
'tv': -1, # 不获取罗马音
'csrf_token': ''
}
encrypted_data = encrypt_request(json.dumps(data))
response = requests.post(NETEASE_LYRIC_URL, data=encrypted_data, headers=headers)
response.raise_for_status()
result = response.json()
return result
def search_lyrics(track_name, artist_name):
"""从网易云音乐搜索并获取歌词"""
try:
result = get_lyrics_from_netease(track_name, artist_name)
if result['code'] == 200 and result['result']['songCount'] > 0:
song = result['result']['songs'][0]
id = song['id']
lrc_result = get_lyrics_from_id(id)
if lrc_result['code'] == 200 and 'lrc' in lrc_result:
logger.debug(f"网易云音乐歌词: {lrc_result['lrc']['lyric']}")
return lrc_result['lrc']['lyric']
return None
except Exception as e:
logger.error(f"从网易云音乐获取歌词时出错: {e}", exc_info=True)
return None
def format_lyrics(lyrics):
"""格式化歌词文本,保留时间信息"""
if not lyrics:
logger.debug("歌词内容为空")
return []
lyric_lines = []
for line in lyrics.split('\n'):
line = line.strip()
if not line:
continue
# 查找所有时间标记
time_tags = re.findall(r'\[([0-9:.]+)\]', line)
if not time_tags:
continue
text = re.sub(r'\[[0-9:.]+\]', '', line).strip()
if not text:
continue
# 可能一行有多个时间标记
for time_tag in time_tags:
try:
time_seconds = parse_time(time_tag)
lyric_lines.append(LyricLine(time_seconds, text))
except Exception as e:
logger.error(f"解析时间标记出错: {time_tag}, 错误: {e}")
# 按时间排序
sorted_lyrics = sorted(lyric_lines, key=lambda x: x.time)
logger.debug(f"解析完成,共 {len(sorted_lyrics)} 行歌词")
return sorted_lyrics
def save_lyrics(track_name, artist, lyrics):
"""保存歌词到本地文件,如果已存在则更新"""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO lyrics (track_name, artist, lyrics_content)
VALUES (?, ?, ?)
ON CONFLICT(track_name, artist)
DO UPDATE SET
lyrics_content = excluded.lyrics_content,
created_at = CURRENT_TIMESTAMP
''', (track_name, artist, lyrics))
conn.commit()
conn.close()
logger.debug(f"歌词已保存/更新到数据库: {track_name} - {artist}")
except Exception as e:
logger.error(f"保存歌词时出错: {e}", exc_info=True)
def delete_lyrics(track_name, artist):
"""从数据库中删除歌词"""
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
DELETE FROM lyrics WHERE track_name = ? AND artist = ?
''', (track_name, artist))
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
if deleted:
logger.info(f"已删除歌词: {track_name} - {artist}")
# 标记这首歌的歌词已被删除,避免重新获取
_lyrics_cache["deleted"] = True
return True
else:
logger.warning(f"未找到要删除的歌词: {track_name} - {artist}")
return False
except Exception as e:
logger.error(f"删除歌词时出错: {e}", exc_info=True)
return False