Files
score-backend/app.py

311 lines
11 KiB
Python

from flask import Flask, request, jsonify, send_file
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from datetime import datetime
import os
import shutil
import json
from functools import wraps
import base64
app = Flask(__name__)
CORS(app)
# 认证配置
USERNAME = 'admin'
PASSWORD = 'admin'
def auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or auth.username != USERNAME or auth.password != PASSWORD:
response = jsonify({
'code': 1,
'message': 'Invalid username or password'
})
response.status_code = 401
# 移除 WWW-Authenticate 头
response.headers.pop('WWW-Authenticate', None)
return response
return f(*args, **kwargs)
return decorated
# 配置数据库
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///media.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 定义数据模型
class Media(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
type = db.Column(db.String(50), nullable=False) # game, book, movie, anime, other
status = db.Column(db.String(50), nullable=False) # completed, in_progress, plan_to_watch
rating = db.Column(db.Float)
notes = db.Column(db.Text)
platform = db.Column(db.String(100)) # 平台信息
date = db.Column(db.Date) # 完成/观看日期
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 创建所有数据库表
with app.app_context():
db.create_all()
# 添加数据库备份功能
def backup_database():
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_dir = 'backups'
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
backup_file = f'{backup_dir}/media_{timestamp}.db'
shutil.copy2('media.db', backup_file)
return backup_file
@app.route('/api/backup', methods=['POST'])
@auth_required
def create_backup():
try:
backup_file = backup_database()
return jsonify({
"code": 0,
"data": {
"backup_file": backup_file,
"timestamp": datetime.now().isoformat()
},
"message": "Backup created successfully"
})
except Exception as e:
return jsonify({"code": 1, "data": {}, "message": str(e)}), 500
@app.route('/api/backup/list', methods=['GET'])
@auth_required
def list_backups():
try:
backup_dir = 'backups'
if not os.path.exists(backup_dir):
return jsonify({
"code": 0,
"data": {"backups": []},
"message": "No backups found"
})
backups = []
for file in os.listdir(backup_dir):
if file.endswith('.db'):
file_path = os.path.join(backup_dir, file)
backups.append({
"filename": file,
"size": os.path.getsize(file_path),
"created_at": datetime.fromtimestamp(os.path.getctime(file_path)).isoformat()
})
return jsonify({
"code": 0,
"data": {"backups": backups},
"message": "Success"
})
except Exception as e:
return jsonify({"code": 1, "data": {}, "message": str(e)}), 500
@app.route('/api/backup/restore/<filename>', methods=['POST'])
@auth_required
def restore_backup(filename):
try:
backup_file = os.path.join('backups', filename)
if not os.path.exists(backup_file):
return jsonify({"code": 1, "data": {}, "message": "Backup file not found"}), 404
# 先备份当前数据库
current_backup = backup_database()
# 恢复选定的备份
shutil.copy2(backup_file, 'media.db')
return jsonify({
"code": 0,
"data": {
"restored_file": filename,
"current_backup": current_backup
},
"message": "Backup restored successfully"
})
except Exception as e:
return jsonify({"code": 1, "data": {}, "message": str(e)}), 500
# API 路由
@app.route('/api/media/list', methods=['GET'])
@auth_required
def get_all_media():
try:
media_list = Media.query.all()
return jsonify({
"code": 0,
"data": [{
'id': media.id,
'title': media.title,
'type': media.type,
'status': media.status,
'rating': media.rating,
'notes': media.notes,
'platform': media.platform,
'date': media.date.isoformat() if media.date else None,
'created_at': media.created_at.isoformat(),
'updated_at': media.updated_at.isoformat()
} for media in media_list],
"message": "Success"
})
except Exception as e:
return jsonify({"code": 1, "data": {}, "message": str(e)}), 500
@app.route('/api/media/create', methods=['POST'])
@auth_required
def create_media():
try:
data = request.json
new_media = Media(
title=data['title'],
type=data['type'],
status=data['status'],
rating=data.get('rating'),
notes=data.get('notes'),
platform=data.get('platform'),
date=datetime.strptime(data['date'], '%Y-%m-%d').date() if data.get('date') else None
)
db.session.add(new_media)
db.session.commit()
return jsonify({
"code": 0,
"data": {
'id': new_media.id,
'title': new_media.title,
'type': new_media.type,
'status': new_media.status,
'rating': new_media.rating,
'notes': new_media.notes,
'platform': new_media.platform,
'date': new_media.date.isoformat() if new_media.date else None,
'created_at': new_media.created_at.isoformat(),
'updated_at': new_media.updated_at.isoformat()
},
"message": "Created successfully"
}), 201
except Exception as e:
db.session.rollback()
return jsonify({"code": 2, "data": {}, "message": str(e)}), 500
@app.route('/api/media/updateById/<int:media_id>', methods=['PUT'])
@auth_required
def update_media(media_id):
try:
media = Media.query.get_or_404(media_id)
data = request.json
media.title = data.get('title', media.title)
media.type = data.get('type', media.type)
media.status = data.get('status', media.status)
media.rating = data.get('rating', media.rating)
media.notes = data.get('notes', media.notes)
media.platform = data.get('platform', media.platform)
if data.get('date'):
media.date = datetime.strptime(data['date'], '%Y-%m-%d').date()
db.session.commit()
return jsonify({
"code": 0,
"data": {
'id': media.id,
'title': media.title,
'type': media.type,
'status': media.status,
'rating': media.rating,
'notes': media.notes,
'platform': media.platform,
'date': media.date.isoformat() if media.date else None,
'created_at': media.created_at.isoformat(),
'updated_at': media.updated_at.isoformat()
},
"message": "Updated successfully"
})
except Exception as e:
db.session.rollback()
return jsonify({"code": 2, "data": {}, "message": str(e)}), 500
@app.route('/api/media/deleteById/<int:media_id>', methods=['DELETE'])
@auth_required
def delete_media(media_id):
try:
media = Media.query.get_or_404(media_id)
db.session.delete(media)
db.session.commit()
return jsonify({"code": 0, "data": {}, "message": "Deleted successfully"}), 204
except Exception as e:
db.session.rollback()
return jsonify({"code": 2, "data": {}, "message": str(e)}), 500
@app.route('/api/media/page', methods=['GET'])
@auth_required
def get_media_page():
try:
type = request.args.get('type')
current_page = int(request.args.get('currentPage', 1))
page_size = int(request.args.get('pageSize', 10))
title = request.args.get('title', '')
start_date = request.args.get('startDate')
end_date = request.args.get('endDate')
sort_by = request.args.get('sortBy', 'date')
sort_type = request.args.get('sortType', 'desc')
if not type:
return jsonify({"code": 1, "data": {}, "message": "Type is required"}), 400
query = Media.query.filter(Media.type == type)
if title:
query = query.filter(Media.title.like(f'%{title}%'))
if start_date:
query = query.filter(Media.date >= datetime.strptime(start_date, '%Y-%m-%d').date())
if end_date:
query = query.filter(Media.date <= datetime.strptime(end_date, '%Y-%m-%d').date())
if sort_by == 'date':
if sort_type == 'asc':
query = query.order_by(Media.date.asc())
else:
query = query.order_by(Media.date.desc())
elif sort_by == 'score':
if sort_type == 'asc':
query = query.order_by(Media.rating.asc())
else:
query = query.order_by(Media.rating.desc())
pagination = query.paginate(page=current_page, per_page=page_size, error_out=False)
media_list = pagination.items
return jsonify({
"code": 0,
"data": {
"list": [{
'id': media.id,
'title': media.title,
'type': media.type,
'status': media.status,
'rating': media.rating,
'notes': media.notes,
'platform': media.platform,
'date': media.date.isoformat() if media.date else None,
'created_at': media.created_at.isoformat(),
'updated_at': media.updated_at.isoformat()
} for media in media_list],
"total": pagination.total,
"currentPage": current_page,
"pageSize": page_size
},
"message": "Success"
})
except Exception as e:
return jsonify({"code": 1, "data": {}, "message": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)