首次填充文件内容

This commit is contained in:
xyb
2025-11-10 17:28:10 +08:00
Unverified
parent f5ac78c8b4
commit d5955bbbdd
16 changed files with 1351 additions and 0 deletions

View File

@@ -0,0 +1,21 @@
"""
EZProxy Core Module
==================
核心功能模块包含v2ray管理、代理管理和更新管理
"""
# 导入核心组件
from .v2ray_manager import V2RayManager
from .proxy_manager import ProxyManager
from .update_manager import UpdateManager
# 定义包的公开接口
__all__ = [
'V2RayManager',
'ProxyManager',
'UpdateManager'
]
# 包版本信息
__version__ = "1.0.0"

View File

@@ -0,0 +1,76 @@
import os
import sys
import logging
import subprocess
from pathlib import Path
from utils.system_utils import get_architecture
class ProxyManager:
def __init__(self):
self.is_enabled = False
self.http_proxy = "127.0.0.1:1081"
self.socks_proxy = "127.0.0.1:1080"
def enable(self):
"""启用系统代理"""
if self.is_enabled:
return True
try:
# 获取当前网络服务
service_cmd = "networksetup -listnetworkserviceorder | grep $(route -n get default | grep 'interface' | awk '{print $2}') -A1 | grep -o '[^ ]*$'"
service = subprocess.check_output(service_cmd, shell=True, text=True).strip()
# 设置HTTP/HTTPS代理
http_cmd = f"networksetup -setwebproxy '{service}' {self.http_proxy.split(':')[0]} {self.http_proxy.split(':')[1]}"
https_cmd = f"networksetup -setsecurewebproxy '{service}' {self.http_proxy.split(':')[0]} {self.http_proxy.split(':')[1]}"
# 设置SOCKS代理
socks_cmd = f"networksetup -setsocksfirewallproxy '{service}' {self.socks_proxy.split(':')[0]} {self.socks_proxy.split(':')[1]}"
# 使用AppleScript请求管理员权限
script = f"""
do shell script "{http_cmd}" with administrator privileges
do shell script "{https_cmd}" with administrator privileges
do shell script "{socks_cmd}" with administrator privileges
do shell script "networksetup -setwebproxystate '{service}' on" with administrator privileges
do shell script "networksetup -setsecurewebproxystate '{service}' on" with administrator privileges
do shell script "networksetup -setsocksfirewallproxystate '{service}' on" with administrator privileges
"""
subprocess.run(['osascript', '-e', script], check=True)
self.is_enabled = True
logging.info("System proxy enabled successfully")
return True
except Exception as e:
logging.error(f"Failed to enable system proxy: {str(e)}")
return False
def disable(self):
"""禁用系统代理"""
if not self.is_enabled:
return True
try:
# 获取当前网络服务
service_cmd = "networksetup -listnetworkserviceorder | grep $(route -n get default | grep 'interface' | awk '{print $2}') -A1 | grep -o '[^ ]*$'"
service = subprocess.check_output(service_cmd, shell=True, text=True).strip()
# 使用AppleScript请求管理员权限
script = f"""
do shell script "networksetup -setwebproxystate '{service}' off" with administrator privileges
do shell script "networksetup -setsecurewebproxystate '{service}' off" with administrator privileges
do shell script "networksetup -setsocksfirewallproxystate '{service}' off" with administrator privileges
"""
subprocess.run(['osascript', '-e', script], check=True)
self.is_enabled = False
logging.info("System proxy disabled successfully")
return True
except Exception as e:
logging.error(f"Failed to disable system proxy: {str(e)}")
return False
def get_proxy_status(self):
"""获取代理状态"""
return self.is_enabled

View File

@@ -0,0 +1,115 @@
import yaml
import requests
import zipfile
from pathlib import Path
from utils.config_utils import load_config, save_config
class UpdateManager:
UPDATE_URL = "https://o.nmgjg.com.cn/EZProxy/update.yaml"
CONFIG_URL = "https://o.nmgjg.com.cn/EZProxy/v2ray.json"
APP_URL = "https://o.nmgjg.com.cn/EZProxy/main.zip"
def __init__(self, app_dir):
self.app_dir = Path(app_dir)
self.local_update_yaml = self.app_dir / "conf" / "update.yaml"
self.config = load_config()
def check_updates(self):
"""检查更新"""
try:
response = requests.get(self.UPDATE_URL, timeout=10)
response.raise_for_status()
remote = yaml.safe_load(response.text)
# 检查配置更新
config_update = self._check_config_update(remote)
# 检查应用更新
app_update = self._check_app_update(remote)
return {
'config': config_update,
'app': app_update,
'changelog_url': remote.get('changelog_url', '')
}
except Exception as e:
logging.error(f"检查更新失败: {str(e)}")
return None
def _check_config_update(self, remote):
"""检查配置模板更新"""
local_version = self.config.get('config_version', '0.0')
remote_version = remote.get('config_version', '0.0')
if remote_version != local_version:
return {
'available': True,
'version': remote_version,
'force': remote.get('force_config_update', False)
}
return {'available': False}
def update_config(self):
"""更新配置模板"""
try:
response = requests.get(self.CONFIG_URL, timeout=15)
response.raise_for_status()
template_path = self.app_dir / "conf" / "v2ray_template.json"
with open(template_path, 'w') as f:
f.write(response.text)
# 更新本地版本号
self.config['config_version'] = self._get_remote_version()['config_version']
save_config(self.config)
return True
except Exception as e:
logging.error(f"更新配置失败: {str(e)}")
return False
def _prepare_update_helper(self):
"""准备更新助手"""
helper_path = self.app_dir / "update_helper.py"
temp_dir = get_temp_dir()
temp_dir.mkdir(parents=True, exist_ok=True)
# 复制更新助手到临时目录
temp_helper = temp_dir / "update_helper.py"
with open(helper_path, 'r') as src, open(temp_helper, 'w') as dst:
dst.write(src.read())
# 设置执行权限
temp_helper.chmod(0o755)
return temp_helper
def start_app_update(self):
"""启动应用更新流程"""
temp_dir = get_temp_dir()
zip_path = temp_dir / "main.zip"
try:
# 下载更新包
response = requests.get(self.APP_URL, stream=True, timeout=30)
response.raise_for_status()
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 准备更新助手
helper = self._prepare_update_helper()
# 启动更新助手(在新进程中)
import subprocess
subprocess.Popen([
sys.executable,
str(helper),
str(zip_path),
str(self.app_dir),
str(temp_dir)
])
return True
except Exception as e:
logging.error(f"更新准备失败: {str(e)}")
return False

View File

@@ -0,0 +1,64 @@
import json
import logging
import subprocess
from pathlib import Path
from threading import Thread
from queue import Queue
class V2RayManager:
def __init__(self, config_path, log_path):
self.config_path = Path(config_path)
self.log_path = Path(log_path)
self.process = None
self.log_queue = Queue()
self.running = False
def start(self):
"""启动v2ray核心"""
if self.running:
return True
v2ray_bin = get_v2ray_binary_path()
cmd = [str(v2ray_bin), "-config", str(self.config_path)]
try:
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True
)
self.running = True
# 启动日志监听线程
Thread(target=self._log_reader, daemon=True).start()
return True
except Exception as e:
logging.error(f"启动v2ray失败: {str(e)}")
return False
def stop(self):
"""停止v2ray核心"""
if not self.running:
return
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.running = False
def _log_reader(self):
"""读取v2ray日志"""
while self.running and self.process:
line = self.process.stdout.readline()
if not line:
break
self.log_queue.put(line.strip())
with open(self.log_path, 'a') as f:
f.write(line)
self.running = False

57
main.py
View File

@@ -0,0 +1,57 @@
import sys
import os
import logging
from pathlib import Path
from PyQt5.QtWidgets import QApplication
from ui.main_window import MainWindow
from utils.config_utils import init_config, load_config
class AppContext:
def __init__(self):
self.app_dir = Path(__file__).parent.resolve()
self.conf_dir = self.app_dir / "conf"
self.log_dir = self.app_dir / "logs"
self.resources = self.app_dir / "resources"
# 初始化目录
self.conf_dir.mkdir(exist_ok=True)
self.log_dir.mkdir(exist_ok=True)
# 初始化配置
init_config(self.conf_dir)
self.config = load_config()
# 配置日志
logging.basicConfig(
filename=self.log_dir / "app.log",
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def validate_config(self):
"""验证配置有效性"""
return bool(self.config.get('proxy_url'))
if __name__ == "__main__":
# 设置macOS应用属性
if sys.platform == "darwin":
from Foundation import NSBundle
bundle = NSBundle.mainBundle()
info = bundle.localizedInfoDictionary() or bundle.infoDictionary()
info["CFBundleName"] = "EZProxy"
info["CFBundleIconFile"] = "icon.icns"
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)
context = AppContext()
window = MainWindow(context)
# 首次运行检查代理配置
if not context.validate_config():
from ui.setup_dialog import SetupDialog
setup = SetupDialog(context)
if setup.exec_() != QDialog.Accepted:
sys.exit(0)
sys.exit(app.exec_())

View File

@@ -0,0 +1,5 @@
PyQt5==5.15.9
PyYAML==6.0.1
requests==2.31.0
psutil==5.9.5
pyobjc==9.2 # 用于macOS系统集成

View File

@@ -0,0 +1,23 @@
"""
EZProxy UI Module
================
用户界面模块提供所有GUI组件
"""
# 导入UI组件
from .main_window import MainWindow
from .setup_dialog import SetupDialog
from .traffic_widget import TrafficWidget
from .update_dialog import UpdateDialog
# 定义包的公开接口
__all__ = [
'MainWindow',
'SetupDialog',
'TrafficWidget',
'UpdateDialog'
]
# 包版本信息
__version__ = "1.0.0"

View File

@@ -0,0 +1,185 @@
from PyQt5.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QTextEdit, QLabel, QSplitter,
QMessageBox, QSystemTrayIcon, QMenu
)
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QIcon, QTextCursor
from .traffic_widget import TrafficWidget
from core.v2ray_manager import V2RayManager
from core.proxy_manager import ProxyManager
class MainWindow(QMainWindow):
def __init__(self, app_context):
super().__init__()
self.app_context = app_context
self.v2ray = V2RayManager(
app_context.config['v2ray_config_path'],
app_context.config['v2ray_log_path']
)
self.proxy = ProxyManager()
self.tray_icon = None
self.init_ui()
self.init_tray()
self.init_timers()
def init_ui(self):
# 主窗口设置
self.setWindowTitle("EZProxy")
self.setWindowIcon(QIcon(str(self.app_context.resources / "icon.icns")))
self.setMinimumSize(800, 600)
# 创建中央部件
central_widget = QWidget()
main_layout = QVBoxLayout(central_widget)
# 顶部控制栏
control_layout = QHBoxLayout()
self.toggle_btn = QPushButton("启用代理")
self.toggle_btn.clicked.connect(self.toggle_proxy)
self.update_btn = QPushButton("检查更新")
self.update_btn.clicked.connect(self.check_updates)
control_layout.addWidget(self.toggle_btn)
control_layout.addWidget(self.update_btn)
control_layout.addStretch()
# 日志显示区域
self.log_view = QTextEdit()
self.log_view.setReadOnly(True)
self.log_view.setFontFamily("Menlo")
# 流量监控组件
self.traffic_widget = TrafficWidget()
# 创建分割器
splitter = QSplitter(Qt.Vertical)
splitter.addWidget(self.log_view)
splitter.addWidget(self.traffic_widget)
splitter.setSizes([400, 200])
# 添加到主布局
main_layout.addLayout(control_layout)
main_layout.addWidget(splitter)
self.setCentralWidget(central_widget)
def init_tray(self):
"""初始化系统托盘"""
self.tray_icon = QSystemTrayIcon(self)
self.tray_icon.setIcon(QIcon(str(self.app_context.resources / "icon.icns")))
tray_menu = QMenu()
toggle_action = tray_menu.addAction("启用代理")
toggle_action.triggered.connect(self.toggle_proxy)
update_action = tray_menu.addAction("检查更新")
update_action.triggered.connect(self.check_updates)
quit_action = tray_menu.addAction("退出")
quit_action.triggered.connect(self.confirm_exit)
self.tray_icon.setContextMenu(tray_menu)
self.tray_icon.activated.connect(self.tray_activated)
self.tray_icon.show()
# 默认隐藏到托盘
self.hide()
def init_timers(self):
"""初始化定时器"""
# 日志更新定时器
self.log_timer = QTimer()
self.log_timer.timeout.connect(self.update_logs)
self.log_timer.start(500)
# 流量更新定时器
self.traffic_timer = QTimer()
self.traffic_timer.timeout.connect(self.update_traffic)
self.traffic_timer.start(1000)
# 更新检查定时器 (1小时)
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.auto_check_updates)
self.update_timer.start(3600000) # 1小时
def toggle_proxy(self):
"""切换代理状态"""
if self.v2ray.running:
self.stop_proxy()
else:
self.start_proxy()
def start_proxy(self):
"""启动代理服务"""
# 验证配置
if not self.app_context.validate_config():
QMessageBox.warning(self, "配置错误", "请先设置有效的代理地址")
return
# 启动v2ray
if not self.v2ray.start():
QMessageBox.critical(self, "启动失败", "无法启动v2ray核心")
return
# 配置系统代理
if not self.proxy.enable():
QMessageBox.warning(self, "代理警告", "系统代理配置失败但v2ray仍在运行")
self.toggle_btn.setText("停止代理")
self.tray_icon.setToolTip("EZProxy - 已启用")
def stop_proxy(self):
"""停止代理服务"""
self.proxy.disable()
self.v2ray.stop()
self.toggle_btn.setText("启用代理")
self.tray_icon.setToolTip("EZProxy - 已停止")
def update_logs(self):
"""更新日志显示"""
while not self.v2ray.log_queue.empty():
log_line = self.v2ray.log_queue.get()
self.log_view.append(log_line)
# 保持滚动到底部
scrollbar = self.log_view.verticalScrollBar()
scrollbar.setValue(scrollbar.maximum())
def update_traffic(self):
"""更新流量统计"""
if self.v2ray.running:
stats = self.v2ray.get_traffic_stats()
self.traffic_widget.update_stats(stats)
def closeEvent(self, event):
"""关闭事件处理"""
event.ignore()
self.hide()
self.tray_icon.showMessage(
"EZProxy",
"应用已最小化到系统托盘",
QSystemTrayIcon.Information,
2000
)
def confirm_exit(self):
"""确认退出"""
if self.v2ray.running:
reply = QMessageBox.question(
self, "确认退出",
"代理服务正在运行,确定要退出吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
return
self.stop_proxy()
QApplication.quit()
def tray_activated(self, reason):
"""托盘图标激活"""
if reason == QSystemTrayIcon.DoubleClick:
self.show()

View File

@@ -0,0 +1,85 @@
from PyQt5.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel,
QLineEdit, QPushButton, QMessageBox
)
from PyQt5.QtCore import Qt
from urllib.parse import urlparse
class SetupDialog(QDialog):
def __init__(self, app_context, parent=None):
super().__init__(parent)
self.app_context = app_context
self.setWindowTitle("设置代理地址")
self.setMinimumWidth(500)
layout = QVBoxLayout()
# 说明标签
desc_label = QLabel("请输入您的VLESS代理地址 (格式: vless://...)")
desc_label.setWordWrap(True)
layout.addWidget(desc_label)
# 代理地址输入
self.url_input = QLineEdit()
self.url_input.setPlaceholderText("vless://uuid@server:port?type=tcp&security=reality&...")
layout.addWidget(self.url_input)
# 按钮布局
btn_layout = QHBoxLayout()
self.test_btn = QPushButton("测试连接")
self.test_btn.clicked.connect(self.test_connection)
self.save_btn = QPushButton("保存并继续")
self.save_btn.clicked.connect(self.save_and_continue)
self.cancel_btn = QPushButton("取消")
self.cancel_btn.clicked.connect(self.reject)
btn_layout.addWidget(self.test_btn)
btn_layout.addStretch()
btn_layout.addWidget(self.save_btn)
btn_layout.addWidget(self.cancel_btn)
layout.addLayout(btn_layout)
self.setLayout(layout)
def test_connection(self):
"""测试代理连接"""
url = self.url_input.text().strip()
if not url:
QMessageBox.warning(self, "输入错误", "请输入代理地址")
return
if not url.startswith('vless://'):
QMessageBox.warning(self, "格式错误", "代理地址必须以 vless:// 开头")
return
try:
parsed = urlparse(url)
if not parsed.hostname or not parsed.port:
raise ValueError("无效的服务器地址或端口")
QMessageBox.information(self, "测试成功",
f"代理地址解析成功:\n服务器: {parsed.hostname}\n端口: {parsed.port}")
except Exception as e:
QMessageBox.warning(self, "测试失败", f"代理地址格式错误: {str(e)}")
def save_and_continue(self):
"""保存配置并继续"""
url = self.url_input.text().strip()
if not url:
QMessageBox.warning(self, "输入错误", "请输入代理地址")
return
if not url.startswith('vless://'):
QMessageBox.warning(self, "格式错误", "代理地址必须以 vless:// 开头")
return
# 保存到配置
config = self.app_context.config
config['proxy_url'] = url
# 保存配置文件
if not save_config(config):
QMessageBox.critical(self, "保存失败", "无法保存配置文件")
return
self.accept()

View File

@@ -0,0 +1,163 @@
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QProgressBar, QGroupBox
)
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtCharts import (
QChart, QChartView, QLineSeries,
QValueAxis, QBarSet, QBarSeries, QBarCategoryAxis
)
from PyQt5.QtGui import QPainter, QColor, QPen
class TrafficWidget(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.init_ui()
self.init_chart()
self.reset_stats()
# 定时器用于模拟数据
self.timer = QTimer()
self.timer.timeout.connect(self.update_fake_data)
self.timer.start(1000)
def init_ui(self):
"""初始化UI"""
layout = QVBoxLayout()
# 速度显示
speed_layout = QHBoxLayout()
self.download_label = QLabel("下载: 0.00 KB/s")
self.upload_label = QLabel("上传: 0.00 KB/s")
self.total_label = QLabel("总计: 0.00 MB")
speed_layout.addWidget(self.download_label)
speed_layout.addWidget(self.upload_label)
speed_layout.addWidget(self.total_label)
speed_layout.addStretch()
layout.addLayout(speed_layout)
# 进度条
self.progress = QProgressBar()
self.progress.setRange(0, 100)
layout.addWidget(self.progress)
# 图表
self.chart_view = QChartView()
self.chart_view.setRenderHint(QPainter.Antialiasing)
layout.addWidget(self.chart_view)
self.setLayout(layout)
def init_chart(self):
"""初始化图表"""
self.chart = QChart()
self.chart.setTitle("实时流量统计")
self.chart.legend().setVisible(True)
# 创建系列
self.download_series = QLineSeries()
self.download_series.setName("下载")
self.download_series.setColor(QColor(52, 152, 219))
self.upload_series = QLineSeries()
self.upload_series.setName("上传")
self.upload_series.setColor(QColor(46, 204, 113))
self.chart.addSeries(self.download_series)
self.chart.addSeries(self.upload_series)
# 创建坐标轴
self.axis_x = QValueAxis()
self.axis_x.setTitleText("时间 (秒)")
self.axis_x.setRange(0, 60)
self.axis_x.setTickCount(7)
self.axis_y = QValueAxis()
self.axis_y.setTitleText("流量 (KB)")
self.axis_y.setRange(0, 100)
self.chart.addAxis(self.axis_x, Qt.AlignBottom)
self.chart.addAxis(self.axis_y, Qt.AlignLeft)
self.download_series.attachAxis(self.axis_x)
self.download_series.attachAxis(self.axis_y)
self.upload_series.attachAxis(self.axis_x)
self.upload_series.attachAxis(self.axis_y)
self.chart_view.setChart(self.chart)
def reset_stats(self):
"""重置统计数据"""
self.download_speed = 0.0
self.upload_speed = 0.0
self.total_download = 0.0
self.total_upload = 0.0
self.time_points = []
self.download_points = []
self.upload_points = []
def update_stats(self, stats):
"""更新统计数据"""
self.download_speed = stats.get('download_speed', 0.0)
self.upload_speed = stats.get('upload_speed', 0.0)
self.total_download = stats.get('total_download', 0.0)
self.total_upload = stats.get('total_upload', 0.0)
# 更新显示
self.download_label.setText(f"下载: {self.download_speed:.2f} KB/s")
self.upload_label.setText(f"上传: {self.upload_speed:.2f} KB/s")
self.total_label.setText(f"总计: {(self.total_download + self.total_upload) / 1024:.2f} MB")
# 更新进度条
total_speed = self.download_speed + self.upload_speed
progress = min(100, int(total_speed / 1024)) # 假设1MB/s为满
self.progress.setValue(progress)
self.progress.setFormat(f"总流量: {total_speed:.1f} KB/s")
# 更新图表
self.add_data_point()
def add_data_point(self):
"""添加数据点到图表"""
current_time = len(self.time_points)
# 保留最近60秒的数据
if len(self.time_points) >= 60:
self.time_points.pop(0)
self.download_points.pop(0)
self.upload_points.pop(0)
self.time_points.append(current_time)
self.download_points.append(self.download_speed)
self.upload_points.append(self.upload_speed)
# 更新系列数据
self.download_series.clear()
self.upload_series.clear()
for i, (t, d, u) in enumerate(zip(self.time_points, self.download_points, self.upload_points)):
self.download_series.append(i, d)
self.upload_series.append(i, u)
# 调整Y轴范围
max_value = max(max(self.download_points), max(self.upload_points), 100) * 1.1
self.axis_y.setRange(0, max_value)
def update_fake_data(self):
"""模拟数据更新(实际应用中应替换为真实数据)"""
import random
fake_download = random.uniform(10, 500)
fake_upload = random.uniform(5, 100)
fake_total_download = self.total_download + fake_download
fake_total_upload = self.total_upload + fake_upload
stats = {
'download_speed': fake_download,
'upload_speed': fake_upload,
'total_download': fake_total_download,
'total_upload': fake_total_upload
}
self.update_stats(stats)

View File

@@ -0,0 +1,143 @@
from PyQt5.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QProgressBar, QTextEdit, QMessageBox
)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
import webbrowser
import logging
class UpdateThread(QThread):
progress_updated = pyqtSignal(int, str)
finished = pyqtSignal(bool, str)
def __init__(self, update_manager, update_type):
super().__init__()
self.update_manager = update_manager
self.update_type = update_type
def run(self):
try:
self.progress_updated.emit(10, "准备更新...")
if self.update_type == 'config':
success = self.update_manager.update_config()
message = "配置模板更新成功" if success else "配置模板更新失败"
elif self.update_type == 'app':
success = self.update_manager.start_app_update()
message = "应用更新已启动,程序将自动重启" if success else "应用更新启动失败"
else:
success = False
message = "未知更新类型"
self.progress_updated.emit(100, "更新完成")
self.finished.emit(success, message)
except Exception as e:
logging.error(f"Update failed: {str(e)}")
self.finished.emit(False, f"更新失败: {str(e)}")
class UpdateDialog(QDialog):
def __init__(self, update_info, update_manager, parent=None):
super().__init__(parent)
self.update_info = update_info
self.update_manager = update_manager
self.setWindowTitle("更新可用")
self.setMinimumSize(500, 300)
layout = QVBoxLayout()
# 版本信息
if update_info['config']['available']:
config_label = QLabel(f"配置模板更新: {update_info['config']['version']}")
layout.addWidget(config_label)
if update_info['app']['available']:
app_label = QLabel(f"应用更新: {update_info['app']['version']}")
layout.addWidget(app_label)
# 更新日志
self.changelog_view = QTextEdit()
self.changelog_view.setReadOnly(True)
self.changelog_view.setHtml("<p>点击'查看详情'查看更新日志</p>")
layout.addWidget(QLabel("更新内容:"))
layout.addWidget(self.changelog_view)
# 按钮布局
btn_layout = QHBoxLayout()
self.details_btn = QPushButton("查看详情")
self.details_btn.clicked.connect(self.show_changelog)
self.skip_btn = QPushButton("跳过本次更新")
self.skip_btn.setEnabled(not (update_info['config'].get('force', False) or
update_info['app'].get('force', False)))
self.skip_btn.clicked.connect(self.skip_update)
self.update_btn = QPushButton("立即更新")
self.update_btn.clicked.connect(self.start_update)
btn_layout.addWidget(self.details_btn)
btn_layout.addStretch()
btn_layout.addWidget(self.skip_btn)
btn_layout.addWidget(self.update_btn)
layout.addLayout(btn_layout)
# 进度条
self.progress = QProgressBar()
self.progress.setVisible(False)
layout.addWidget(self.progress)
self.setLayout(layout)
def show_changelog(self):
"""在浏览器中打开更新日志"""
url = self.update_info.get('changelog_url', '')
if url:
webbrowser.open(url)
else:
QMessageBox.information(self, "无更新日志", "没有提供更新日志链接")
def skip_update(self):
"""跳过更新"""
reply = QMessageBox.question(
self, "跳过更新",
"您确定要跳过本次更新吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.Yes:
self.reject()
def start_update(self):
"""开始更新"""
# 禁用按钮
self.details_btn.setEnabled(False)
self.skip_btn.setEnabled(False)
self.update_btn.setEnabled(False)
# 显示进度条
self.progress.setVisible(True)
# 创建更新线程
self.update_thread = UpdateThread(self.update_manager, 'both')
self.update_thread.progress_updated.connect(self.update_progress)
self.update_thread.finished.connect(self.update_finished)
self.update_thread.start()
def update_progress(self, value, message):
"""更新进度"""
self.progress.setValue(value)
self.progress.setFormat(message)
def update_finished(self, success, message):
"""更新完成"""
if success:
QMessageBox.information(self, "更新成功", message)
self.accept()
else:
QMessageBox.critical(self, "更新失败", message)
self.details_btn.setEnabled(True)
self.skip_btn.setEnabled(True)
self.update_btn.setEnabled(True)
self.progress.setVisible(False)

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
import sys
import time
import shutil
import zipfile
from pathlib import Path
def main():
if len(sys.argv) != 4:
print("Usage: update_helper.py <zip_path> <app_dir> <temp_dir>")
sys.exit(1)
zip_path = Path(sys.argv[1])
app_dir = Path(sys.argv[2])
temp_dir = Path(sys.argv[3])
try:
# 解压更新包到临时目录
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# 备份当前应用
backup_dir = app_dir.parent / f"{app_dir.name}_backup"
if backup_dir.exists():
shutil.rmtree(backup_dir)
shutil.copytree(app_dir, backup_dir)
# 复制更新文件 (跳过conf目录)
for item in temp_dir.iterdir():
if item.name == "conf":
continue
dest = app_dir / item.name
if item.is_dir():
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(item, dest)
else:
if dest.exists():
dest.unlink()
shutil.copy2(item, dest)
# 设置执行权限
for bin_file in app_dir.rglob("v2ray*"):
bin_file.chmod(0o755)
# 清理
shutil.rmtree(temp_dir, ignore_errors=True)
# 重启应用
main_executable = app_dir / "main.py"
os.execv(sys.executable, [sys.executable, str(main_executable)])
except Exception as e:
print(f"Update failed: {str(e)}")
# 恢复备份
if backup_dir.exists():
shutil.rmtree(app_dir)
shutil.copytree(backup_dir, app_dir)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,56 @@
"""
EZProxy Utilities Module
========================
工具函数模块,提供各种辅助功能
"""
# 导入工具函数
from .config_utils import (
init_config,
load_config,
save_config,
parse_vless_url,
generate_v2ray_config
)
from .system_utils import (
get_architecture,
get_v2ray_binary_path,
set_system_proxy,
get_temp_dir
)
from .network_utils import (
download_file,
fetch_json,
fetch_yaml,
check_connectivity,
get_public_ip
)
# 定义包的公开接口
__all__ = [
# 配置工具
'init_config',
'load_config',
'save_config',
'parse_vless_url',
'generate_v2ray_config',
# 系统工具
'get_architecture',
'get_v2ray_binary_path',
'set_system_proxy',
'get_temp_dir',
# 网络工具
'download_file',
'fetch_json',
'fetch_yaml',
'check_connectivity',
'get_public_ip'
]
# 包版本信息
__version__ = "1.0.0"

View File

@@ -0,0 +1,161 @@
import os
import yaml
import json
import logging
from pathlib import Path
from urllib.parse import urlparse, parse_qs
def init_config(conf_dir):
"""初始化配置文件"""
config_path = Path(conf_dir) / "config.yaml"
if not config_path.exists():
default_config = {
'proxy_url': '',
'config_version': '1.0',
'app_version': '1.0',
'domains': [
'google.com',
'youtube.com',
'facebook.com',
'twitter.com',
'instagram.com',
'netflix.com',
'github.com',
'gitlab.com',
'stackoverflow.com',
'reddit.com'
],
'v2ray_config_path': str(Path(conf_dir) / "v2ray.json"),
'v2ray_template_path': str(Path(conf_dir) / "v2ray_template.json"),
'v2ray_log_path': str(Path(__file__).parent.parent / "logs" / "v2ray.log")
}
with open(config_path, 'w') as f:
yaml.dump(default_config, f)
logging.info("Default config file created")
return load_config()
def load_config():
"""加载配置文件"""
config_path = Path(__file__).parent.parent / "conf" / "config.yaml"
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f)
except Exception as e:
logging.error(f"Failed to load config: {str(e)}")
return {
'proxy_url': '',
'config_version': '1.0',
'app_version': '1.0',
'domains': [],
'v2ray_config_path': str(Path(__file__).parent.parent / "conf" / "v2ray.json"),
'v2ray_template_path': str(Path(__file__).parent.parent / "conf" / "v2ray_template.json"),
'v2ray_log_path': str(Path(__file__).parent.parent / "logs" / "v2ray.log")
}
def save_config(config):
"""保存配置文件"""
config_path = Path(__file__).parent.parent / "conf" / "config.yaml"
try:
with open(config_path, 'w') as f:
yaml.dump(config, f)
return True
except Exception as e:
logging.error(f"Failed to save config: {str(e)}")
return False
def parse_vless_url(url):
"""解析vless://URL"""
try:
# 移除URL编码
url = url.replace('%2F', '/').replace('%3D', '=').replace('%3F', '?')
# 解析URL
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
# 提取必要参数
uuid = parsed.username
address = parsed.hostname
port = parsed.port
name = parsed.fragment
# 提取REALITY参数
security = query_params.get('security', ['none'])[0]
encryption = query_params.get('encryption', ['none'])[0]
flow = query_params.get('flow', [''])[0]
type_ = query_params.get('type', ['tcp'])[0]
sni = query_params.get('sni', [''])[0]
fp = query_params.get('fp', ['chrome'])[0]
pbk = query_params.get('pbk', [''])[0]
sid = query_params.get('sid', [''])[0]
spx = query_params.get('spx', [''])[0]
return {
'uuid': uuid,
'address': address,
'port': port,
'name': name,
'security': security,
'encryption': encryption,
'flow': flow,
'type': type_,
'sni': sni,
'fp': fp,
'pbk': pbk,
'sid': sid,
'spx': spx
}
except Exception as e:
logging.error(f"Failed to parse vless URL: {str(e)}")
return None
def generate_v2ray_config(template_path, output_path, proxy_info, domains):
"""生成v2ray配置文件"""
try:
# 读取模板
with open(template_path, 'r') as f:
template = json.load(f)
# 替换模板中的占位符
outbound = template['outbounds'][0]
# 设置vless服务器信息
outbound['settings']['vnext'][0]['address'] = proxy_info['address']
outbound['settings']['vnext'][0]['port'] = proxy_info['port']
outbound['settings']['vnext'][0]['users'][0]['id'] = proxy_info['uuid']
outbound['settings']['vnext'][0]['users'][0]['encryption'] = proxy_info['encryption']
outbound['settings']['vnext'][0]['users'][0]['flow'] = proxy_info['flow']
# 设置streamSettings
stream_settings = outbound['streamSettings']
stream_settings['network'] = proxy_info['type']
stream_settings['security'] = proxy_info['security']
if proxy_info['security'] == 'reality':
reality_settings = {
'serverName': proxy_info['sni'],
'fingerprint': proxy_info['fp'],
'publicKey': proxy_info['pbk'],
'shortId': proxy_info['sid'],
'spiderX': proxy_info['spx']
}
stream_settings['realitySettings'] = reality_settings
# 替换域名列表
for rule in template['routing']['rules']:
if 'domain' in rule and 'REPLACE_DOMAINS' in rule['domain']:
rule['domain'] = domains
# 保存配置
with open(output_path, 'w') as f:
json.dump(template, f, indent=2)
return True
except Exception as e:
logging.error(f"Failed to generate v2ray config: {str(e)}")
return False

View File

@@ -0,0 +1,86 @@
import requests
import json
import yaml
import logging
import time
from pathlib import Path
def download_file(url, save_path, timeout=30):
"""下载文件"""
try:
response = requests.get(url, timeout=timeout, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
logging.error(f"Failed to download file from {url}: {str(e)}")
return False
def fetch_json(url, timeout=15):
"""获取JSON数据"""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.json()
except Exception as e:
logging.error(f"Failed to fetch JSON from {url}: {str(e)}")
return None
def fetch_yaml(url, timeout=15):
"""获取YAML数据"""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return yaml.safe_load(response.text)
except Exception as e:
logging.error(f"Failed to fetch YAML from {url}: {str(e)}")
return None
def check_connectivity(host="8.8.8.8", port=53, timeout=3):
"""检查网络连接"""
import socket
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except socket.error:
return False
def get_public_ip():
"""获取公网IP"""
try:
response = requests.get('https://api.ipify.org?format=json', timeout=10)
response.raise_for_status()
return response.json()['ip']
except Exception as e:
logging.error(f"Failed to get public IP: {str(e)}")
return None
def measure_speedtest():
"""简单测速"""
try:
# 下载测试
start_time = time.time()
response = requests.get('https://speedtest.tele2.net/10MB.zip', timeout=30, stream=True)
response.raise_for_status()
total_bytes = 0
for chunk in response.iter_content(chunk_size=8192):
total_bytes += len(chunk)
elapsed_time = time.time() - start_time
download_speed = (total_bytes * 8) / (elapsed_time * 1024 * 1024) # Mbps
return {
'download_speed': download_speed,
'elapsed_time': elapsed_time,
'total_bytes': total_bytes
}
except Exception as e:
logging.error(f"Speed test failed: {str(e)}")
return None

View File

@@ -0,0 +1,48 @@
import platform
import subprocess
from pathlib import Path
def get_architecture():
"""检测系统架构"""
machine = platform.machine().lower()
if 'arm' in machine or 'aarch64' in machine:
return 'arm64'
return 'amd64'
def get_v2ray_binary_path():
"""获取v2ray二进制路径"""
arch = get_architecture()
base_dir = Path(__file__).parent.parent
v2ray_dir = base_dir / "v2ray"
if arch == 'arm64':
return v2ray_dir / "v2ray-macos-arm64"
return v2ray_dir / "v2ray-macos-64"
def set_system_proxy(enable, host="127.0.0.1", port=1081):
"""配置macOS系统代理"""
# 获取当前网络服务
service_cmd = "networksetup -listnetworkserviceorder | grep $(route -n get default | grep 'interface' | awk '{print $2}') -A1 | grep -o '[^ ]*$'"
service = subprocess.check_output(service_cmd, shell=True).decode().strip()
if enable:
cmd = f"""
osascript -e 'do shell script "networksetup -setwebproxy \\"{service}\\" {host} {port}" with administrator privileges'
osascript -e 'do shell script "networksetup -setsecurewebproxy \\"{service}\\" {host} {port}" with administrator privileges'
"""
else:
cmd = f"""
osascript -e 'do shell script "networksetup -setwebproxystate \\"{service}\\" off" with administrator privileges'
osascript -e 'do shell script "networksetup -setsecurewebproxystate \\"{service}\\" off" with administrator privileges'
"""
try:
subprocess.run(cmd, shell=True, check=True)
return True
except subprocess.CalledProcessError:
return False
def get_temp_dir():
"""获取安全的临时目录"""
import tempfile
return Path(tempfile.gettempdir()) / f"ezproxy_update_{os.getpid()}"