fastapi-template/src/utils/file_upload.py

169 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import logging
import mimetypes
import os
import random
import shutil
import time
import traceback
import urllib.request
from fastapi import UploadFile
from .qiniu_tools import QiniuYuProvider
# http_base = "" # 外网范围基础地址
# path_base = "" # 文件保存基础地址
logger = logging.getLogger(__name__)
def init(baseurl, base_dir):
"""初始化"""
global http_base, path_base
http_base = baseurl
path_base = base_dir
def del_cloud_file(cfg, key):
"""
删除云服务文件
:param cfg:
:param key:
:return:
"""
api = QiniuYuProvider(**cfg)
return api.del_file(key=key)
def save_cloud_file(file: UploadFile, cfg, user=None):
"""
保存文件至云服务
:param file:
:param cfg:
:param user:
:return:
"""
try:
ext = get_file_ext(file.filename)
if ext not in ['jpg', 'png', 'jpeg', 'gif', 'aac', 'mp3', 'amr', 'apk', 'webm', 'mp4', 'mov', 'flv', 'mkv',
'3gp', 'avi']:
return 403, '文件格式不合法,请上传正确的文件格式', None
filename = build_file_name(ext) # 新文件名
file.filename = filename
uploaded_file_path = os.path.join("files", filename)
# flask 原有的图片保存方式
# file.save(uploaded_file_path)
# fastApi 实现的图片保存方式
with open(uploaded_file_path, "wb+") as buffer:
shutil.copyfileobj(file.file, buffer)
api = QiniuYuProvider(**cfg)
res = api.upload_file(file=uploaded_file_path, types=ext)
if res[0] == 200:
if os.path.exists(uploaded_file_path):
os.remove(uploaded_file_path)
return res
except Exception as e:
logger.error(traceback.format_exc())
return 500, '保存文件失败', None
def save_file(file, save_db=None, user=None):
"""保存文件,返回文件路径"""
try:
ext = get_file_ext(file.filename)
if ext not in ['jpg', 'png', 'jpeg', 'gif', 'aac', 'mp3', 'amr', 'apk', 'webm', 'mp4', 'mov', 'flv', 'mkv',
'3gp', 'avi']:
return 403, '文件格式不合法,请上传正确的文件格式', None
filename = build_file_name(ext) # 新文件名
date_str = time.strftime('%Y%m%d')
base_path = os.path.join(path_base, 'static', 'mr', date_str)
if not os.path.exists(base_path): # 确保当日的文件夹存在
os.makedirs(base_path)
filepath = os.path.join(base_path, filename) # 实际文件路径
file.save(filepath) # 保存文件
if save_db:
data = {
'http_base': http_base,
'file_base': path_base,
'file_name': file.filename, # 原始文件名
'file_path': '/static/mr/{date_str}/{filename}'.format(date_str=date_str, filename=filename),
'file_type': get_file_ext(file.filename), # 文件扩展名
'mime_type': mimetypes.guess_type(file.filename)[0], # 媒体类型
'file_size': get_file_size(filepath),
'file_size_str': get_file_size_str(filepath)
}
return save_db(data, user)
image_url = "{}/static/mr/{}/{}".format(http_base, date_str, filename)
return 200, "", image_url
except Exception as e:
logger.error(e)
return 500, '保存文件失败', None
def save_image(body):
"""
保存文件返回文件url
@param body 文件内容字典包括url,md5,ext,size等信息
"""
url = body.get('url', '')
ext = body.get('ext', 'jpg')
filename = build_file_name(ext) # 新文件名
if url != '':
date_str = time.strftime('%Y%m%d')
base_path = os.path.join(path_base, 'static', 'im', date_str)
if not os.path.exists(base_path): # 确保当日的文件夹存在
os.makedirs(base_path)
filepath = os.path.join(base_path, filename) # 实际文件路径
try:
urllib.request.urlretrieve(url, filepath)
url = "{}/static/im/{}/{}".format(http_base, date_str, filename)
except Exception as e:
logger.error(e)
return url # 返回图片url
def get_file_ext(filename):
"""获取文件扩展名"""
if '.' in filename:
return filename.rsplit('.', 1)[1].lower()
return None
def build_file_name(ext):
"""生成文件名"""
filename = time.strftime('%H%M%S') + str(random.random())[-6:]
return '{}.{}'.format(filename, ext) # 新文件名
def get_file_path(filename):
"""获取文件网站路径"""
return "{path_base}/{filename}".format(path_base=path_base, filename=filename)
def get_file_size(file_path):
"""获取文件大小"""
file_size = os.path.getsize(file_path)
return file_size
def get_file_size_str(file_path):
"""获取文件大小字符串"""
file_size = get_file_size(file_path)
def strofsize(integer, remainder, level):
if integer >= 1024:
remainder = integer % 1024
integer //= 1024
level += 1
return strofsize(integer, remainder, level)
else:
return integer, remainder, level
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
integer, remainder, level = strofsize(file_size, 0, 0)
if level + 1 > len(units):
level = -1
return '{}.{:>03d} {}'.format(integer, remainder, units[level])