Files
Class-Widgets/network_thread.py

484 lines
19 KiB
Python
Raw Permalink Normal View History

2025-05-29 22:29:58 +08:00
import json
import os
import shutil
import zipfile # 解压插件zip
from datetime import datetime
import requests
from PyQt5.QtCore import QThread, pyqtSignal, QEventLoop
from loguru import logger
from packaging.version import Version
import conf
import utils
import weather_db as db
from conf import base_directory
from file import config_center
headers = {"User-Agent": "Mozilla/5.0", "Cache-Control": "no-cache"} # 设置请求头
# proxies = {"http": "http://127.0.0.1:10809", "https": "http://127.0.0.1:10809"} # 加速访问
proxies = {"http": None, "https": None}
MIRROR_PATH = f"{base_directory}/config/mirror.json"
PLAZA_REPO_URL = "https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/"
PLAZA_REPO_DIR = "https://api.github.com/repos/Class-Widgets/plugin-plaza/contents/"
threads = []
# 读取镜像配置
mirror_list = []
try:
with open(MIRROR_PATH, 'r', encoding='utf-8') as file:
mirror_dict = json.load(file).get('gh_mirror')
except Exception as e:
logger.error(f"读取镜像配置失败: {e}")
for name in mirror_dict:
mirror_list.append(name)
if config_center.read_conf('Plugin', 'mirror') not in mirror_list: # 如果当前配置不在镜像列表中,则设置为默认镜像
logger.warning(f"当前配置不在镜像列表中,设置为默认镜像: {mirror_list[0]}")
config_center.write_conf('Plugin', 'mirror', mirror_list[0])
class getRepoFileList(QThread): # 获取仓库文件目录
repo_signal = pyqtSignal(dict)
def __init__(
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Banner/banner.json'
):
super().__init__()
self.download_url = url
def run(self):
try:
plugin_info_data = self.get_plugin_info()
self.repo_signal.emit(plugin_info_data)
except Exception as e:
logger.error(f"触发banner信息失败: {e}")
def get_plugin_info(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"获取banner信息失败{response.status_code}")
return {"error": response.status_code}
except Exception as e:
logger.error(f"获取banner信息失败{e}")
return {"error": e}
class getPluginInfo(QThread): # 获取插件信息(json)
repo_signal = pyqtSignal(dict)
def __init__(
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Plugins/plugin_list.json'
):
super().__init__()
self.download_url = url
def run(self):
try:
plugin_info_data = self.get_plugin_info()
self.repo_signal.emit(plugin_info_data)
except Exception as e:
logger.error(f"触发插件信息失败: {e}")
def get_plugin_info(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"获取插件信息失败:{response.status_code}")
return {}
except Exception as e:
logger.error(f"获取插件信息失败:{e}")
return {}
class getTags(QThread): # 获取插件标签(json)
repo_signal = pyqtSignal(dict)
def __init__(
self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Plugins/plaza_detail.json'
):
super().__init__()
self.download_url = url
def run(self):
try:
plugin_info_data = self.get_plugin_info()
self.repo_signal.emit(plugin_info_data)
except Exception as e:
logger.error(f"触发Tag信息失败: {e}")
def get_plugin_info(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers) # 禁用代理
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"获取Tag信息失败{response.status_code}")
return {}
except Exception as e:
logger.error(f"获取Tag信息失败{e}")
return {}
class getImg(QThread): # 获取图片
repo_signal = pyqtSignal(bytes)
def __init__(self, url='https://raw.githubusercontent.com/Class-Widgets/plugin-plaza/main/Banner/banner_1.png'):
super().__init__()
self.download_url = url
def run(self):
try:
banner_data = self.get_banner()
if banner_data is not None:
self.repo_signal.emit(banner_data)
else:
with open(f"{base_directory}/img/plaza/banner_pre.png", 'rb') as default_img: # 读取默认图片
self.repo_signal.emit(default_img.read())
except Exception as e:
logger.error(f"触发图片失败: {e}")
def get_banner(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
response = requests.get(url, proxies=proxies, headers=headers)
if response.status_code == 200:
return response.content
else:
logger.error(f"获取图片失败:{response.status_code}")
return None
except Exception as e:
logger.error(f"获取图片失败:{e}")
return None
class getReadme(QThread): # 获取README
html_signal = pyqtSignal(str)
def __init__(self, url='https://raw.githubusercontent.com/Class-Widgets/Class-Widgets/main/README.md'):
super().__init__()
self.download_url = url
def run(self):
try:
readme_data = self.get_readme()
self.html_signal.emit(readme_data)
except Exception as e:
logger.error(f"触发README失败: {e}")
def get_readme(self):
try:
mirror_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')]
url = f"{mirror_url}{self.download_url}"
# print(url)
response = requests.get(url, proxies=proxies)
if response.status_code == 200:
return response.text
else:
logger.error(f"获取README失败{response.status_code}")
return ''
except Exception as e:
logger.error(f"获取README失败{e}")
return ''
class getCity(QThread):
def __init__(self, url='https://qifu-api.baidubce.com/ip/local/geo/v1/district'):
super().__init__()
self.download_url = url
def run(self):
try:
city_data = self.get_city()
config_center.write_conf('Weather', 'city', db.search_code_by_name(city_data))
except Exception as e:
logger.error(f"获取城市失败: {e}")
def get_city(self):
try:
req = requests.get(self.download_url, proxies=proxies)
if req.status_code == 200:
data = req.json()
# {"code":"Success","data":{"continent":"","country":"中国","zipcode":"","owner":"","isp":"","adcode":"","prov":"","city":"","district":""},"ip":"45.192.96.246"}
if data['code'] == 'Success':
data = data['data']
logger.info(f"获取城市成功:{data['city']}, {data['district']}")
return (data['city'], data['district'])
else:
logger.error(f"获取城市失败:{data['message']}")
return ('', '')
else:
logger.error(f"获取城市失败:{req.status_code}")
return ('', '')
except Exception as e:
logger.error(f"获取城市失败:{e}")
return ('', '')
class VersionThread(QThread): # 获取最新版本号
version_signal = pyqtSignal(dict)
_instance_running = False
def __init__(self):
super().__init__()
def run(self):
version = self.get_latest_version()
self.version_signal.emit(version)
@classmethod
def is_running(cls):
return cls._instance_running
@staticmethod
def get_latest_version():
url = "https://classwidgets.rinlit.cn/version.json"
try:
logger.info(f"正在获取版本信息")
response = requests.get(url, proxies=proxies, timeout=30)
logger.debug(f"更新请求响应: {response.status_code}")
if response.status_code == 200:
data = response.json()
return data
else:
logger.error(f"无法获取版本信息 错误代码:{response.status_code},响应内容: {response.text}")
return {'error': f"请求失败,错误代码:{response.status_code}"}
except requests.exceptions.RequestException as e:
logger.error(f"请求失败,错误详情:{str(e)}")
return {"error": f"请求失败\n{str(e)}"}
class getDownloadUrl(QThread):
# 定义信号,通知下载进度或完成
geturl_signal = pyqtSignal(str)
def __init__(self, username, repo):
super().__init__()
self.username = username
self.repo = repo
def run(self):
try:
url = f"https://api.github.com/repos/{self.username}/{self.repo}/releases/latest"
response = requests.get(url, proxies=proxies)
if response.status_code == 200:
data = response.json()
for asset in data['assets']: # 遍历下载链接
if isinstance(asset, dict) and 'browser_download_url' in asset:
asset_url = asset['browser_download_url']
self.geturl_signal.emit(asset_url)
elif response.status_code == 403: # 触发API限制
logger.warning("到达Github API限制请稍后再试")
response = requests.get('https://api.github.com/users/octocat', proxies=proxies)
reset_time = response.headers.get('X-RateLimit-Reset')
reset_time = datetime.fromtimestamp(int(reset_time))
self.geturl_signal.emit(f"ERROR: 由于请求次数过多到达Github API限制请在{reset_time.minute}分钟后再试")
else:
logger.error(f"网络连接错误:{response.status_code}")
except Exception as e:
logger.error(f"获取下载链接错误: {e}")
self.geturl_signal.emit(f"获取下载链接错误: {e}")
class DownloadAndExtract(QThread): # 下载并解压插件
progress_signal = pyqtSignal(float) # 进度
status_signal = pyqtSignal(str) # 状态
def __init__(self, url, plugin_name='test_114'):
super().__init__()
self.download_url = url
print(self.download_url)
self.cache_dir = "cache"
self.plugin_name = plugin_name
self.extract_dir = conf.PLUGINS_DIR # 插件目录
def run(self):
try:
enabled_plugins = conf.load_plugin_config() # 加载启用的插件
os.makedirs(self.cache_dir, exist_ok=True)
os.makedirs(self.extract_dir, exist_ok=True)
zip_path = os.path.join(self.cache_dir, f'{self.plugin_name}.zip')
self.status_signal.emit("DOWNLOADING")
self.download_file(zip_path)
self.status_signal.emit("EXTRACTING")
self.extract_zip(zip_path)
os.remove(zip_path)
print(enabled_plugins)
if (
self.plugin_name not in enabled_plugins['enabled_plugins']
and config_center.read_conf('Plugin', 'auto_enable_plugin') == '1'
):
logger.info(f"自动启用插件: {self.plugin_name}")
enabled_plugins['enabled_plugins'].append(self.plugin_name)
conf.save_plugin_config(enabled_plugins)
self.status_signal.emit("DONE")
except Exception as e:
self.status_signal.emit(f"错误: {e}")
logger.error(f"插件下载/解压失败: {e}")
def stop(self):
self._running = False
self.terminate()
def download_file(self, file_path):
# time.sleep(555) # 模拟下载时间
try:
self.download_url = mirror_dict[config_center.read_conf('Plugin', 'mirror')] + self.download_url
print(self.download_url)
response = requests.get(self.download_url, stream=True, proxies=proxies)
if response.status_code != 200:
logger.error(f"插件下载失败,错误代码: {response.status_code}")
self.status_signal.emit(f'ERROR: 网络连接错误:{response.status_code}')
return
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
with open(file_path, 'wb') as file:
for chunk in response.iter_content(1024):
file.write(chunk)
downloaded_size += len(chunk)
progress = (downloaded_size / total_size) * 100 if total_size > 0 else 0 # 计算进度
self.progress_signal.emit(progress)
except Exception as e:
self.status_signal.emit(f'ERROR: {e}')
logger.error(f"插件下载错误: {e}")
def extract_zip(self, zip_path):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(self.extract_dir)
for p_dir in os.listdir(self.extract_dir):
if p_dir.startswith(self.plugin_name) and len(p_dir) > len(self.plugin_name):
new_name = p_dir.rsplit('-', 1)[0]
if os.path.exists(os.path.join(self.extract_dir, new_name)):
shutil.copytree(
os.path.join(self.extract_dir, p_dir), os.path.join(self.extract_dir, new_name),
dirs_exist_ok=True)
shutil.rmtree(os.path.join(self.extract_dir, p_dir))
else:
os.rename(os.path.join(self.extract_dir, p_dir), os.path.join(self.extract_dir, new_name))
except Exception as e:
logger.error(f"解压失败: {e}")
def check_update():
global threads
if VersionThread.is_running():
logger.debug("已存在版本检查线程在运行,跳过本检查")
return
# 清理已终止的线程
threads = [t for t in threads if t.isRunning()]
# 创建新的版本检查线程
version_thread = VersionThread()
threads.append(version_thread)
version_thread.version_signal.connect(check_version)
version_thread.start()
def check_version(version): # 检查更新
global threads
for thread in threads:
thread.terminate()
threads = []
if 'error' in version:
utils.tray_icon.push_error_notification(
"检查更新失败!",
f"检查更新失败!\n{version['error']}"
)
return False
channel = int(config_center.read_conf("Other", "version_channel"))
server_version = version['version_release' if channel == 0 else 'version_beta']
local_version = config_center.read_conf("Other", "version")
logger.debug(f"服务端版本: {Version(server_version)},本地版本: {Version(local_version)}")
if Version(server_version) > Version(local_version):
utils.tray_icon.push_update_notification(f"新版本速递:{server_version}\n请在“设置”中了解更多。")
class weatherReportThread(QThread): # 获取最新天气信息
weather_signal = pyqtSignal(dict)
def __init__(self):
super().__init__()
def run(self):
try:
weather_data = self.get_weather_data()
self.weather_signal.emit(weather_data)
except Exception as e:
logger.error(f"触发天气信息失败: {e}")
finally:
self.deleteLater()
@staticmethod
def get_weather_data():
location_key = config_center.read_conf('Weather', 'city')
if location_key == '0':
city_thread = getCity()
loop = QEventLoop()
city_thread.finished.connect(loop.quit)
city_thread.start()
loop.exec_() # 阻塞到完成
location_key = config_center.read_conf('Weather', 'city')
if location_key == '0' or not location_key:
location_key = 101010100
days = 1
key = config_center.read_conf('Weather', 'api_key')
url = db.get_weather_url().format(location_key=location_key, days=days, key=key)
alert_url = db.get_weather_alert_url()
try:
data_group = {'now': {}, 'alert': {}}
response_now = requests.get(url, proxies=proxies) # 禁用代理
if alert_url == 'NotSupported':
logger.warning(f"当前API不支持天气预警信息")
elif alert_url is None:
logger.warning(f"无单独天气预警信息API")
else:
alert_url = alert_url.format(location_key=location_key, key=key)
response_alert = requests.get(alert_url, proxies=proxies)
if response_alert.status_code == 200:
data_alert = response_alert.json()
data_group['alert'] = data_alert
else:
logger.error(f"获取天气预警信息失败:{response_alert.status_code}")
if response_now.status_code == 200:
data = response_now.json()
data_group['now'] = data
return data_group
else:
logger.error(f"获取天气信息失败:{response_now.status_code}")
return {'error': {'info': {'value': '错误', 'unit': response_now.status_code}}}
except requests.exceptions.RequestException as e: # 请求失败
logger.error(f"获取天气信息失败:{e}")
return {'error': {'info': {'value': '错误', 'unit': ''}}}
except Exception as e:
logger.error(f"获取天气信息失败:{e}")
return {'error': {'info': {'value': '错误', 'unit': ''}}}