from flask import Flask, request, jsonify, send_file from flask_sqlalchemy import SQLAlchemy from flask_cors import CORS from datetime import datetime import os import shutil import json from functools import wraps import base64 app = Flask(__name__) CORS(app) # 认证配置 USERNAME = 'admin' PASSWORD = 'admin' def auth_required(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or auth.username != USERNAME or auth.password != PASSWORD: response = jsonify({ 'code': 1, 'message': 'Invalid username or password' }) response.status_code = 401 # 移除 WWW-Authenticate 头 response.headers.pop('WWW-Authenticate', None) return response return f(*args, **kwargs) return decorated # 配置数据库 app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///media.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 定义数据模型 class Media(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(200), nullable=False) type = db.Column(db.String(50), nullable=False) # game, book, movie, anime, other rating = db.Column(db.Float) notes = db.Column(db.Text) platform = db.Column(db.String(100)) # 平台信息 date = db.Column(db.Date) # 完成/观看日期 created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # 创建所有数据库表 with app.app_context(): db.create_all() # 添加数据库备份功能 def backup_database(): timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_dir = 'backups' if not os.path.exists(backup_dir): os.makedirs(backup_dir) backup_file = f'{backup_dir}/media_{timestamp}.db' shutil.copy2('media.db', backup_file) return backup_file @app.route('/api/backup', methods=['POST']) @auth_required def create_backup(): try: backup_file = backup_database() return jsonify({ "code": 0, "data": { "backup_file": backup_file, "timestamp": datetime.now().isoformat() }, "message": "Backup created successfully" }) except Exception as e: return jsonify({"code": 1, "data": {}, "message": str(e)}), 500 @app.route('/api/backup/list', methods=['GET']) @auth_required def list_backups(): try: backup_dir = 'backups' if not os.path.exists(backup_dir): return jsonify({ "code": 0, "data": {"backups": []}, "message": "No backups found" }) backups = [] for file in os.listdir(backup_dir): if file.endswith('.db'): file_path = os.path.join(backup_dir, file) backups.append({ "filename": file, "size": os.path.getsize(file_path), "created_at": datetime.fromtimestamp(os.path.getctime(file_path)).isoformat() }) return jsonify({ "code": 0, "data": {"backups": backups}, "message": "Success" }) except Exception as e: return jsonify({"code": 1, "data": {}, "message": str(e)}), 500 @app.route('/api/backup/restore/', methods=['POST']) @auth_required def restore_backup(filename): try: backup_file = os.path.join('backups', filename) if not os.path.exists(backup_file): return jsonify({"code": 1, "data": {}, "message": "Backup file not found"}), 404 # 先备份当前数据库 current_backup = backup_database() # 恢复选定的备份 shutil.copy2(backup_file, 'media.db') return jsonify({ "code": 0, "data": { "restored_file": filename, "current_backup": current_backup }, "message": "Backup restored successfully" }) except Exception as e: return jsonify({"code": 1, "data": {}, "message": str(e)}), 500 # API 路由 @app.route('/api/media/list', methods=['GET']) @auth_required def get_all_media(): try: media_list = Media.query.all() return jsonify({ "code": 0, "data": [{ 'id': media.id, 'title': media.title, 'type': media.type, 'rating': media.rating, 'notes': media.notes, 'platform': media.platform, 'date': media.date.isoformat() if media.date else None, 'created_at': media.created_at.isoformat(), 'updated_at': media.updated_at.isoformat() } for media in media_list], "message": "Success" }) except Exception as e: return jsonify({"code": 1, "data": {}, "message": str(e)}), 500 @app.route('/api/media/create', methods=['POST']) @auth_required def create_media(): try: data = request.json new_media = Media( title=data['title'], type=data['type'], rating=data.get('rating'), notes=data.get('notes'), platform=data.get('platform'), date=datetime.strptime(data['date'], '%Y-%m-%d').date() if data.get('date') else None ) db.session.add(new_media) db.session.commit() return jsonify({ "code": 0, "data": { 'id': new_media.id, 'title': new_media.title, 'type': new_media.type, 'rating': new_media.rating, 'notes': new_media.notes, 'platform': new_media.platform, 'date': new_media.date.isoformat() if new_media.date else None, 'created_at': new_media.created_at.isoformat(), 'updated_at': new_media.updated_at.isoformat() }, "message": "Created successfully" }), 201 except Exception as e: db.session.rollback() return jsonify({"code": 2, "data": {}, "message": str(e)}), 500 @app.route('/api/media/updateById/', methods=['PUT']) @auth_required def update_media(media_id): try: media = Media.query.get_or_404(media_id) data = request.json media.title = data.get('title', media.title) media.type = data.get('type', media.type) media.rating = data.get('rating', media.rating) media.notes = data.get('notes', media.notes) media.platform = data.get('platform', media.platform) if data.get('date'): media.date = datetime.strptime(data['date'], '%Y-%m-%d').date() db.session.commit() return jsonify({ "code": 0, "data": { 'id': media.id, 'title': media.title, 'type': media.type, 'rating': media.rating, 'notes': media.notes, 'platform': media.platform, 'date': media.date.isoformat() if media.date else None, 'created_at': media.created_at.isoformat(), 'updated_at': media.updated_at.isoformat() }, "message": "Updated successfully" }) except Exception as e: db.session.rollback() return jsonify({"code": 2, "data": {}, "message": str(e)}), 500 @app.route('/api/media/deleteById/', methods=['DELETE']) @auth_required def delete_media(media_id): try: media = Media.query.get_or_404(media_id) db.session.delete(media) db.session.commit() return jsonify({"code": 0, "data": {}, "message": "Deleted successfully"}) except Exception as e: db.session.rollback() return jsonify({"code": 2, "data": {}, "message": str(e)}), 500 @app.route('/api/media/page', methods=['GET']) @auth_required def get_media_page(): try: type = request.args.get('type') current_page = int(request.args.get('currentPage', 1)) page_size = int(request.args.get('pageSize', 10)) title = request.args.get('title', '') start_date = request.args.get('startDate') end_date = request.args.get('endDate') sort_by = request.args.get('sortBy', 'date') sort_type = request.args.get('sortType', 'desc') if not type: return jsonify({"code": 1, "data": {}, "message": "Type is required"}), 400 query = Media.query.filter(Media.type == type) if title: query = query.filter(Media.title.like(f'%{title}%')) if start_date: query = query.filter(Media.date >= datetime.strptime(start_date, '%Y-%m-%d').date()) if end_date: query = query.filter(Media.date <= datetime.strptime(end_date, '%Y-%m-%d').date()) if sort_by == 'date': if sort_type == 'asc': query = query.order_by(Media.date.asc()) else: query = query.order_by(Media.date.desc()) elif sort_by == 'score': if sort_type == 'asc': query = query.order_by(Media.rating.asc()) else: query = query.order_by(Media.rating.desc()) pagination = query.paginate(page=current_page, per_page=page_size, error_out=False) media_list = pagination.items return jsonify({ "code": 0, "data": { "list": [{ 'id': media.id, 'title': media.title, 'type': media.type, 'rating': media.rating, 'notes': media.notes, 'platform': media.platform, 'date': media.date.isoformat() if media.date else None, 'created_at': media.created_at.isoformat(), 'updated_at': media.updated_at.isoformat() } for media in media_list], "total": pagination.total, "currentPage": current_page, "pageSize": page_size }, "message": "Success" }) except Exception as e: return jsonify({"code": 1, "data": {}, "message": str(e)}), 500 if __name__ == '__main__': app.run(debug=True)