#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import subprocess import json import threading import tkinter as tk from tkinter import ttk, scrolledtext, filedialog, messagebox from pathlib import Path from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError from datetime import datetime import zipfile try: import pyzipper except ImportError: pyzipper = None import shutil import time class ADKAPKGUI: def __init__(self): self.root = tk.Tk() self.root.title("长安语言安装工具") self.root.geometry("650x640") self.root.resizable(True, True) # 设置颜色主题 self.colors = { 'bg_dark': '#1e1e2e', 'bg_light': '#2a2a3e', 'accent': '#6c5ce7', 'accent_hover': '#5b4bc4', 'success': '#00b894', 'error': '#d63031', 'warning': '#fdcb6e', 'info': '#0984e3', 'text': '#dfe6e9', 'text_secondary': '#b2bec3', 'border': '#3d3d5e' } # 从 exe/py 所在目录查找资源文件 self.base_dir = Path(sys.executable).parent if getattr(sys, 'frozen', False) else Path(__file__).parent if getattr(sys, 'frozen', False): self.adb = str(Path(sys._MEIPASS) / 'adb.exe') self.sz = str(Path(sys._MEIPASS) / '7za.exe') else: self.adb = 'adb' self.sz = str(self.base_dir / '7za.exe') self.package_file = self.base_dir / "package.bin" self.extract_password = None self.apps_dir = None self.priv_apps_dir = None self.temp_dir = None self.api_url = "https://api.changan.softwindy.cn/api/authorizations/auth-check" self.vin = None self.device_connected = False self._refreshing = False # 防止并发刷新 self.debug_mode = False # 调试模式 # 设置样式 self.setup_styles() self.setup_ui() self.center_window() # 检查环境 self.check_environment() # 启动设备状态监控 self.start_device_monitor() def setup_styles(self): """设置自定义样式""" style = ttk.Style() style.theme_use('clam') # 配置主颜色 style.configure('TFrame', background=self.colors['bg_dark']) style.configure('TLabel', background=self.colors['bg_dark'], foreground=self.colors['text']) style.configure('TLabelframe', background=self.colors['bg_dark'], foreground=self.colors['text']) style.configure('TLabelframe.Label', background=self.colors['bg_dark'], foreground=self.colors['accent']) # 配置进度条 style.configure('TProgressbar', background=self.colors['accent'], troughcolor=self.colors['bg_light'], borderwidth=0) def setup_ui(self): """设置UI界面""" # 配置根窗口 self.root.configure(bg=self.colors['bg_dark']) # 创建主框架 main_frame = tk.Frame(self.root, bg=self.colors['bg_dark']) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 顶部标题栏 title_frame = tk.Frame(main_frame, bg=self.colors['bg_dark'], height=65) title_frame.pack(fill=tk.X, pady=(0, 10)) title_frame.pack_propagate(False) # 标题 title_label = tk.Label(title_frame, text="🚀 适用于启源Q07多语言安装", font=('Microsoft YaHei', 18, 'bold'), fg=self.colors['accent'], bg=self.colors['bg_dark']) title_label.pack() subtitle_label = tk.Label(title_frame, text="宜宾科宜科技有限公司 - 智能设备管理平台", font=('Microsoft YaHei', 9), fg=self.colors['text_secondary'], bg=self.colors['bg_dark']) subtitle_label.pack() # 按钮区域(两排,每排5个) button_frame = tk.Frame(main_frame, bg=self.colors['bg_light'], relief=tk.RAISED, bd=1) button_frame.pack(fill=tk.X, pady=(0, 10), padx=5) # 按钮样式参数 btn_params = { 'font': ('Microsoft YaHei', 9), 'fg': 'white', 'relief': tk.FLAT, 'cursor': 'hand2', 'height': 1, 'width': 14 } # 第一排按钮 row1_frame = tk.Frame(button_frame, bg=self.colors['bg_light']) row1_frame.pack(pady=(8, 4)) self.btn_root = tk.Button(row1_frame, text="🔓 获取权限", command=self.get_root_permission, bg=self.colors['success'], **btn_params) self.btn_root.pack(side=tk.LEFT, padx=4) self.btn_push = tk.Button(row1_frame, text="📦 刷入语言包", command=self.push_all_apks, bg=self.colors['accent'], **btn_params) self.btn_push.pack(side=tk.LEFT, padx=4) self.btn_install_all = tk.Button(row1_frame, text="📱 安装App", command=self.install_apps, bg=self.colors['accent'], **btn_params) self.btn_install_all.pack(side=tk.LEFT, padx=4) self.btn_language = tk.Button(row1_frame, text="🌐 语言设置", command=self.open_language_quick_set, bg=self.colors['accent'], **btn_params) self.btn_language.pack(side=tk.LEFT, padx=4) # 第二排按钮 row2_frame = tk.Frame(button_frame, bg=self.colors['bg_light']) row2_frame.pack(pady=(4, 8)) self.btn_timezone = tk.Button(row2_frame, text="⏰ 时区设置", command=self.open_timezone_settings, bg=self.colors['accent'], **btn_params) self.btn_timezone.pack(side=tk.LEFT, padx=4) self.btn_settings = tk.Button(row2_frame, text="⚙️ 安卓设置", command=self.open_android_settings, bg=self.colors['accent'], **btn_params) self.btn_settings.pack(side=tk.LEFT, padx=4) self.btn_reboot = tk.Button(row2_frame, text="🔄 重启设备", command=self.reboot_device, bg=self.colors['warning'], **btn_params) self.btn_reboot.pack(side=tk.LEFT, padx=4) self.btn_exit = tk.Button(row2_frame, text="❌ 禁用升级", command=self.on_disable_upgrade, bg=self.colors['error'], **btn_params) self.btn_exit.pack(side=tk.LEFT, padx=4) # 设备状态栏(横条) status_bar_frame = tk.Frame(main_frame, bg=self.colors['bg_light'], relief=tk.RAISED, bd=1) status_bar_frame.pack(fill=tk.X, pady=(0, 5)) # 状态指示器 status_indicator_frame = tk.Frame(status_bar_frame, bg=self.colors['bg_light']) status_indicator_frame.pack(side=tk.LEFT, padx=10, pady=5) self.status_indicator = tk.Canvas(status_indicator_frame, width=10, height=10, bg=self.colors['bg_light'], highlightthickness=0) self.status_indicator.pack(side=tk.LEFT) self.status_dot = self.status_indicator.create_oval(2, 2, 8, 8, fill='#636e72') tk.Label(status_indicator_frame, text="设备:", font=('Microsoft YaHei', 9), fg=self.colors['text'], bg=self.colors['bg_light']).pack(side=tk.LEFT, padx=(5, 3)) self.device_status_label = tk.Label(status_indicator_frame, text="未检测", font=('Microsoft YaHei', 9, 'bold'), fg='#636e72', bg=self.colors['bg_light'], anchor='w', width=4) self.device_status_label.pack(side=tk.LEFT) # VIN信息 vin_frame = tk.Frame(status_bar_frame, bg=self.colors['bg_light']) vin_frame.pack(side=tk.LEFT, padx=20, pady=5) tk.Label(vin_frame, text="VIN码:", font=('Microsoft YaHei', 9), fg=self.colors['text'], bg=self.colors['bg_light']).pack(side=tk.LEFT) self.vin_label = tk.Label(vin_frame, text="未获取", font=('Microsoft YaHei', 9, 'bold'), fg='#636e72', bg=self.colors['bg_light'], anchor='w', width=17) self.vin_label.pack(side=tk.LEFT, padx=(5, 0)) # 授权状态 auth_frame = tk.Frame(status_bar_frame, bg=self.colors['bg_light']) auth_frame.pack(side=tk.LEFT, padx=20, pady=5) tk.Label(auth_frame, text="授权:", font=('Microsoft YaHei', 9), fg=self.colors['text'], bg=self.colors['bg_light']).pack(side=tk.LEFT) self.auth_label = tk.Label(auth_frame, text="未验证", font=('Microsoft YaHei', 9, 'bold'), fg='#636e72', bg=self.colors['bg_light'], anchor='w', width=4) self.auth_label.pack(side=tk.LEFT, padx=(5, 0)) # 刷新按钮 refresh_btn = tk.Button(status_bar_frame, text="🔄 检查", command=self.refresh_device_status, font=('Microsoft YaHei', 8), fg=self.colors['accent'], bg=self.colors['bg_light'], relief=tk.FLAT, cursor='hand2') refresh_btn.pack(side=tk.RIGHT, padx=10, pady=5) # 提示信息区域(设备状态下方) tips_frame = tk.Frame(main_frame, bg=self.colors['bg_light'], relief=tk.RAISED, bd=1) tips_frame.pack(fill=tk.X, pady=(5, 5), padx=5) tips = [ "1. 安装语言过程中请保持车辆和电脑的电量充足,不可中途停止。", "2. 获取权限以后,车辆自动重启以后再进入语言刷入。", "3. 部分语言需要重启后生效,可以一切工作完成以后再重启。", ] for i, tip in enumerate(tips): tip_row = tk.Frame(tips_frame, bg=self.colors['bg_light']) tip_row.pack(fill=tk.X, padx=10, pady=(5 if i == 0 else 0, 5 if i == len(tips) - 1 else 0)) tk.Label(tip_row, text=tip, font=('Microsoft YaHei', 9), fg=self.colors['warning'], bg=self.colors['bg_light'], wraplength=600, justify=tk.LEFT).pack(side=tk.LEFT) # 解压进度条框架 progress_frame = tk.Frame(main_frame, bg=self.colors['bg_dark']) progress_frame.pack(fill=tk.X, pady=(5, 5)) self.progress_label = tk.Label(progress_frame, text="", font=('Microsoft YaHei', 9), fg=self.colors['text_secondary'], bg=self.colors['bg_dark']) self.progress_label.pack() self.progress = ttk.Progressbar(progress_frame, mode='determinate', style='TProgressbar') self.progress.pack(fill=tk.X, pady=(2, 0)) # 推送进度条 self.push_progress_label = tk.Label(progress_frame, text="", font=('Microsoft YaHei', 9), fg=self.colors['text_secondary'], bg=self.colors['bg_dark']) self.push_progress = ttk.Progressbar(progress_frame, mode='determinate', style='TProgressbar') # 日志区域(下方) log_card = tk.Frame(main_frame, bg=self.colors['bg_light'], relief=tk.RAISED, bd=1) log_card.pack(fill=tk.BOTH, expand=True, pady=(5, 0)) # 日志标题栏 log_title_frame = tk.Frame(log_card, bg=self.colors['bg_dark'], height=30) log_title_frame.pack(fill=tk.X) log_title_frame.pack_propagate(False) tk.Label(log_title_frame, text="📋 运行日志", font=('Microsoft YaHei', 10, 'bold'), fg=self.colors['accent'], bg=self.colors['bg_dark']).pack(side=tk.LEFT, padx=10) self.btn_clear = tk.Button(log_title_frame, text="🗑 清空日志", command=self.clear_log, font=('Microsoft YaHei', 8), fg=self.colors['text_secondary'], bg=self.colors['bg_dark'], relief=tk.FLAT, cursor='hand2') self.btn_clear.pack(side=tk.RIGHT, padx=10) # 日志文本框 text_frame = tk.Frame(log_card, bg=self.colors['bg_light']) text_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.log_text = scrolledtext.ScrolledText(text_frame, height=12, wrap=tk.WORD, font=('Consolas', 9), bg='#2d2d3d', fg='#e0e0e0', insertbackground='white', relief=tk.FLAT, borderwidth=0) self.log_text.pack(fill=tk.BOTH, expand=True) # 配置日志颜色标签 self.log_text.tag_config('INFO', foreground='#74b9ff') self.log_text.tag_config('SUCCESS', foreground='#55efc4') self.log_text.tag_config('ERROR', foreground='#ff7675') self.log_text.tag_config('WARNING', foreground='#ffeaa7') self.log_text.tag_config('CMD', foreground='#a29bfe') # 底部状态栏 bottom_status = tk.Frame(main_frame, bg=self.colors['bg_light'], height=22) bottom_status.pack(fill=tk.X, pady=(5, 0)) bottom_status.pack_propagate(False) self.status_text = tk.Label(bottom_status, text="就绪", font=('Microsoft YaHei', 8), fg=self.colors['text_secondary'], bg=self.colors['bg_light']) self.status_text.pack(side=tk.LEFT, padx=10) # 调试模式快捷键 self.root.bind('', self._toggle_debug) # 绑定悬停效果 self.bind_hover_effects() def bind_hover_effects(self): """绑定按钮悬停效果""" buttons = [self.btn_root, self.btn_push, self.btn_install_all, self.btn_language, self.btn_timezone, self.btn_settings, self.btn_reboot, self.btn_clear, self.btn_exit] for btn in buttons: original_bg = btn.cget('bg') def on_enter(e, btn=btn, bg=original_bg): btn.config(bg=self.lighten_color(bg)) def on_leave(e, btn=btn, bg=original_bg): btn.config(bg=bg) btn.bind('', on_enter) btn.bind('', on_leave) def lighten_color(self, color): """调亮颜色""" if color == self.colors['accent']: return self.colors['accent_hover'] elif color == self.colors['warning']: return '#feca57' elif color == self.colors['info']: return '#0984e3' elif color == self.colors['error']: return '#e17055' elif color == self.colors['success']: return '#00a884' return color def center_window(self): """将窗口居中显示在屏幕上""" self.root.update_idletasks() screen_w = self.root.winfo_screenwidth() screen_h = self.root.winfo_screenheight() win_w = self.root.winfo_reqwidth() win_h = self.root.winfo_reqheight() x = (screen_w - win_w) // 2 y = (screen_h - win_h) // 2 self.root.geometry(f"+{x}+{y}") def run_on_ui_thread(self, func, *args, **kwargs): """将函数调度到主线程执行,确保线程安全""" self.root.after(0, func, *args, **kwargs) def _log_impl(self, message, level="INFO"): """日志写入的实际实现(必须在主线程调用)""" timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] [{level}] {message}\n" self.log_text.insert(tk.END, log_entry, level) self.log_text.see(tk.END) def log(self, message, level="INFO"): """添加日志(线程安全)""" self.run_on_ui_thread(self._log_impl, message, level) def clear_log(self): """清空日志""" self.log_text.delete(1.0, tk.END) self.log("日志已清空", "INFO") def _show_progress_impl(self, show=True, is_push=False): """显示/隐藏进度条的实际实现(必须在主线程调用)""" if is_push: if show: self.push_progress_label.pack() self.push_progress.pack(fill=tk.X, pady=(2, 0)) self.push_progress['value'] = 0 else: self.push_progress_label.pack_forget() self.push_progress.pack_forget() else: if show: self.progress_label.pack() self.progress.pack(fill=tk.X, pady=(2, 0)) self.progress['value'] = 0 else: self.progress_label.pack_forget() self.progress.pack_forget() def show_progress(self, show=True, is_push=False): """显示/隐藏进度条(线程安全)""" self.run_on_ui_thread(self._show_progress_impl, show, is_push) def _update_progress_impl(self, value, max_value=100, label="", is_push=False): """更新进度条的实际实现(必须在主线程调用)""" if is_push: percent = (value / max_value) * 100 self.push_progress['value'] = percent self.push_progress_label.config(text=f"{label}: {value}/{max_value} ({percent:.1f}%)") else: percent = (value / max_value) * 100 self.progress['value'] = percent self.progress_label.config(text=f"{label}: {value}/{max_value} ({percent:.1f}%)") self.root.update_idletasks() def update_progress(self, value, max_value=100, label="", is_push=False): """更新进度条(线程安全)""" self.run_on_ui_thread(self._update_progress_impl, value, max_value, label, is_push) def update_device_status(self, connected, vin=None, authorized=False): """更新设备状态显示(线程安全:立即设状态变量,UI走主线程)""" self.device_connected = connected if vin is not None: self.vin = vin self.run_on_ui_thread(self._update_device_status_impl, connected, vin, authorized) def _update_device_status_impl(self, connected, vin, authorized): """设备状态UI更新的实际实现(必须在主线程调用)""" if connected: self.status_indicator.itemconfig(self.status_dot, fill=self.colors['success']) self.device_status_label.config(text="已连接", fg=self.colors['success']) if vin: self.vin_label.config(text=vin, fg=self.colors['success']) if authorized: self.auth_label.config(text="已授权", fg=self.colors['success']) else: self.auth_label.config(text="未授权", fg=self.colors['error']) else: self.vin_label.config(text="未获取", fg=self.colors['error']) self.auth_label.config(text="未验证", fg=self.colors['error']) else: self.status_indicator.itemconfig(self.status_dot, fill=self.colors['error']) self.device_status_label.config(text="未连接", fg=self.colors['error']) self.vin_label.config(text="未获取", fg=self.colors['error']) self.auth_label.config(text="未验证", fg=self.colors['error']) def check_device_connection(self): """检查设备是否连接""" if self.debug_mode: return True if not self.device_connected: messagebox.showwarning("设备未连接", "请先连接设备并点击「检查」按钮刷新状态!") return False return True def start_device_monitor(self): """启动设备状态监控(每5秒检查一次)""" def monitor(): while True: try: result = subprocess.run(f'{self.adb} -d devices', shell=True, capture_output=True, text=True) lines = result.stdout.strip().split('\n') devices = [line for line in lines[1:] if line.strip() and 'device' in line and 'offline' not in line] if devices and not self.device_connected and not self._refreshing: # 设备新连接,刷新状态 self.refresh_device_status() elif not devices and self.device_connected: # 设备断开连接 self.update_device_status(False) self.log("设备已断开连接", "WARNING") time.sleep(5) except: time.sleep(5) threading.Thread(target=monitor, daemon=True).start() def get_root_permission(self): """获取 root 权限,首次获取需重启""" if not self.check_device_connection(): return def get_root(): self.show_progress(True, is_push=False) # 执行 adb -d root ok_root, out_root = self.run_adb_command('adb -d root') if not ok_root: self.log("获取 root 失败", "ERROR") self.show_progress(False, is_push=False) return time.sleep(1) # 执行 adb -d remount(需同时捕获 stdout 和 stderr) remount_result = subprocess.run(f'{self.adb} -d remount', shell=True, capture_output=True, text=True) if remount_result.returncode != 0: self.log("获取权限失败", "ERROR") self.show_progress(False, is_push=False) return # 判断是否需要重启:首次 remount 返回 "Now reboot your device for settings to take effect" combined_output = (remount_result.stdout + remount_result.stderr).lower() if 'now reboot your device' in combined_output: self.log("首次获取权限,需要重启设备...", "INFO") ok, _ = self.run_adb_command('adb -d shell reboot') if ok: self.log("设备即将重启,重启后权限生效", "INFO") self.update_device_status(False) else: self.log("重启失败", "ERROR") else: self.log("已获取权限", "SUCCESS") self.show_progress(False, is_push=False) threading.Thread(target=get_root, daemon=True).start() def check_package_extracted(self): """检查语言包是否已解压""" has_app = self.apps_dir and self.apps_dir.exists() and len(list(self.apps_dir.glob("*.apk"))) > 0 has_priv = self.priv_apps_dir and self.priv_apps_dir.exists() and len(list(self.priv_apps_dir.glob("*.apk"))) > 0 return has_app or has_priv def extract_package_silent(self): """静默解压语言包(带进度)""" if not self.package_file.exists(): self.log(f"未找到资源包 ({self.package_file.name})", "ERROR") return False try: # 使用用户目录,无需管理员权限 local_appdata = os.environ.get('LOCALAPPDATA', os.path.expanduser('~\\AppData\\Local')) hidden_path = Path(local_appdata) / ".cache" / "system" / ".android" hidden_path.mkdir(parents=True, exist_ok=True) self.temp_dir = hidden_path / "apps_cache_Q07" # 如果已存在,先清理 if self.temp_dir.exists(): shutil.rmtree(self.temp_dir, ignore_errors=True) time.sleep(0.5) self.temp_dir.mkdir(parents=True, exist_ok=True) # 设置隐藏属性(Windows) if sys.platform == 'win32': subprocess.run(f'attrib +h "{self.temp_dir.parent}"', shell=True, capture_output=True) subprocess.run(f'attrib +h "{self.temp_dir}"', shell=True, capture_output=True) # 使用 7za 高速解压 self.update_progress(0, 1, "资源加载中...") result = subprocess.run( [self.sz, 'x', str(self.package_file), f'-p{self.extract_password}', f'-o{self.temp_dir}', '-y'], capture_output=True, text=True, creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 ) if result.returncode != 0: stderr = result.stderr.strip() if stderr: self.log(f"解压失败: {stderr[:200]}", "ERROR") else: self.log("解压失败,请检查密码是否正确", "ERROR") return False self.update_progress(1, 1, "资源加载完成") # 查找app和priv-app目录 self.apps_dir = None self.priv_apps_dir = None app_candidates = list(self.temp_dir.rglob("app")) or list(self.temp_dir.rglob("apps")) if app_candidates: self.apps_dir = app_candidates[0] priv_app_candidates = list(self.temp_dir.rglob("priv-app")) or list(self.temp_dir.rglob("priv-apps")) if priv_app_candidates: self.priv_apps_dir = priv_app_candidates[0] if not self.apps_dir and not self.priv_apps_dir: self.log("未找到 app/priv-app 目录", "WARNING") return False return True except Exception as e: self.log(f"资源准备失败", "ERROR") return False def check_environment(self): """检查环境""" try: result = subprocess.run(f'{self.adb} version', shell=True, capture_output=True, text=True) if result.returncode == 0: self.refresh_device_status() if not self.package_file.exists(): self.log("未找到资源包文件", "WARNING") else: self._try_reuse_extracted() else: self.log("未找到adb命令,请将ADB文件放入本目录", "ERROR") except FileNotFoundError: self.log("未找到adb命令,请将ADB文件放入本目录", "ERROR") def _try_reuse_extracted(self): """检查磁盘上是否已有解压好的资源,有则直接复用""" local_appdata = os.environ.get('LOCALAPPDATA', os.path.expanduser('~\\AppData\\Local')) cache_dir = Path(local_appdata) / ".cache" / "system" / ".android" / "apps_cache_Q07" if not cache_dir.exists(): return app_candidates = list(cache_dir.rglob("app")) or list(cache_dir.rglob("apps")) priv_candidates = list(cache_dir.rglob("priv-app")) or list(cache_dir.rglob("priv-apps")) has_app = False has_priv = False if app_candidates: apks = list(app_candidates[0].glob("*.apk")) has_app = len(apks) > 0 if priv_candidates: apks = list(priv_candidates[0].glob("*.apk")) has_priv = len(apks) > 0 if has_app or has_priv: if has_app: self.apps_dir = app_candidates[0] if has_priv: self.priv_apps_dir = priv_candidates[0] self.temp_dir = cache_dir # self.log("已复用缓存的资源文件", "INFO") def refresh_device_status(self): """刷新设备状态""" # 防止并发刷新 if self._refreshing: return self._refreshing = True def refresh(): was_connected = self.device_connected # 检查设备连接 result = subprocess.run(f'{self.adb} -d devices', shell=True, capture_output=True, text=True) lines = result.stdout.strip().split('\n') devices = [line for line in lines[1:] if line.strip() and 'device' in line and 'offline' not in line] if devices: # 只在首次连接时打日志 if not was_connected: self.log("设备已连接", "SUCCESS") # 获取VIN — 兼容两种 key,过滤 Android null 返回值 vin = '' for key in ('ca_vin_info', 'VIN'): vin_result = subprocess.run( f'{self.adb} -d shell settings get system {key}', shell=True, capture_output=True, text=True) vin = vin_result.stdout.strip() if vin and vin != 'null': break vin = '' if vin: self.log(f"当前车辆VIN: {vin}", "INFO") # 验证授权 authorized = self.check_authorization(vin) self.update_device_status(True, vin, authorized) else: self.log("无法获取VIN", "WARNING") self.update_device_status(True, None, False) else: if was_connected: self.log("设备未连接", "WARNING") self.update_device_status(False) self._refreshing = False threading.Thread(target=refresh, daemon=True).start() def check_authorization(self, vin): """检查授权""" if self.debug_mode: self.log("调试模式: 跳过授权验证", "WARNING") return True self.log("正在验证授权...", "INFO") try: url = f"{self.api_url}?vin={vin}" req = Request(url, method='GET', headers={'User-Agent': 'Mozilla/5.0'}) with urlopen(req, timeout=10) as response: data = json.loads(response.read().decode('utf-8')) if data.get('authorized') == True: self.log("✅ 授权验证通过!", "SUCCESS") if 'data' in data and 'vehicleName' in data['data']: self.log(f"车辆名称: {data['data']['vehicleName']}", "INFO") return True else: self.log(f"❌ 授权验证失败", "ERROR") return False except Exception as e: self.log(f"❌ 授权验证失败", "ERROR") return False def fetch_package_password(self): """从服务端获取资源包解压密码""" if not self.vin: self.log("请先连接adb!", "ERROR") return False try: pwd_api_url = "https://api.changan.softwindy.cn/api/authorizations/package-key" url = f"{pwd_api_url}?vin={self.vin}" req = Request(url, method='GET', headers={'User-Agent': 'Mozilla/5.0'}) with urlopen(req, timeout=10) as response: data = json.loads(response.read().decode('utf-8')) if data.get('success') and 'data' in data and 'password' in data['data']: self.extract_password = data['data']['password'] return True else: self.log(f"数据准备失败: {data.get('message', '未知错误')}", "ERROR") return False except Exception as e: self.log(f"数据准备失败: {str(e)}", "ERROR") return False def run_adb_command(self, command): """执行 adb 命令,静默执行,仅返回结果""" command = command.replace('adb', self.adb, 1) if self.debug_mode: self.log(f"CMD: {command}", "CMD") try: result = subprocess.run(command, shell=True, capture_output=True, text=True, encoding='utf-8') if self.debug_mode: out = result.stdout.strip() err = result.stderr.strip() if out: self.log(f" -> {out[:300]}", "CMD") if err: self.log(f" !! {err[:300]}", "ERROR") if result.returncode == 0: return True, result.stdout.strip() else: return False, result.stderr.strip() except Exception as e: return False, str(e) def push_single_apk(self, apk_path, apk_name, target_type="app"): """推送单个APK到系统分区,返回 (成功, 错误信息)""" temp_apk_path = f"/data/local/tmp/{apk_name}.apk" target_dir = f"/system/priv-app/{apk_name}" if target_type == "priv-app" else f"/system/app/{apk_name}" target_apk_path = f"{target_dir}/{apk_name}.apk" ok, err = self.run_adb_command(f'adb -d push "{apk_path}" {temp_apk_path}') if not ok: return False, f"push失败: {err}" self.run_adb_command(f'adb -d shell mkdir -p {target_dir}') ok, err = self.run_adb_command(f'adb -d shell cp {temp_apk_path} {target_apk_path}') self.run_adb_command(f'adb -d shell rm -f {temp_apk_path}') if not ok: return False, f"cp失败: {err}" return True, "" def push_all_apks(self): """推送APK到系统分区(支持app和priv-app)""" if not self.check_device_connection(): return if not self.vin: messagebox.showwarning("警告", "请先刷新设备状态并获取VIN码") return messagebox.showwarning("⚠️ 重要提示", "刷入过程中请勿:\n" " ● 重启车机\n" " ● 退出本程序\n" " ● 关闭电脑\n\n" "否则可能导致车机系统损坏!") def do_push_all(): if not self.check_authorization(self.vin): self.run_on_ui_thread(lambda: messagebox.showerror("授权失败", "设备未授权")) return if not self.extract_password: if not self.fetch_package_password(): self.run_on_ui_thread(lambda: messagebox.showerror("错误", "资源准备失败!")) return if not self.check_package_extracted(): self.show_progress(True, is_push=False) if not self.extract_package_silent(): self.show_progress(False, is_push=False) self.run_on_ui_thread(lambda: messagebox.showerror("错误", "资源准备失败!")) return self.show_progress(False, is_push=False) if (not self.apps_dir or not self.apps_dir.exists()) and \ (not self.priv_apps_dir or not self.priv_apps_dir.exists()): self.run_on_ui_thread(lambda: messagebox.showerror("错误", "资源目录未找到")) return self.show_progress(True, is_push=True) self.run_adb_command('adb -d shell mkdir -p /data/local/tmp') all_apks = [] if self.apps_dir and self.apps_dir.exists(): for apk in self.apps_dir.glob("*.apk"): all_apks.append((apk, "app")) if self.priv_apps_dir and self.priv_apps_dir.exists(): for apk in self.priv_apps_dir.glob("*.apk"): all_apks.append((apk, "priv-app")) if not all_apks: # 缓存可能过期,强制重新解压 self.apps_dir = None self.priv_apps_dir = None self.temp_dir = None if not self.fetch_package_password() or not self.extract_package_silent(): self.log("未找到语言包文件", "WARNING") self.show_progress(False, is_push=True) return # 重新收集 all_apks = [] if self.apps_dir and self.apps_dir.exists(): for apk in self.apps_dir.glob("*.apk"): all_apks.append((apk, "app")) if self.priv_apps_dir and self.priv_apps_dir.exists(): for apk in self.priv_apps_dir.glob("*.apk"): all_apks.append((apk, "priv-app")) if not all_apks: self.log("未找到语言包文件", "WARNING") self.show_progress(False, is_push=True) return total = len(all_apks) success_count = 0 aborted = False for i, (apk_path, apk_type) in enumerate(all_apks, 1): apk_name = apk_path.stem ok, err = self.push_single_apk(apk_path, apk_name, apk_type) if ok: success_count += 1 else: if "Read-only file system" in err: self.log("请先点击「获取权限」获取权限后再试", "ERROR") aborted = True break self.update_progress(i, total, "正在刷入...", is_push=True) self.update_progress(total, total, "刷入完成" if not aborted else "已终止", is_push=True) if success_count == total: self.log(f"刷入完成,共 {total} 个语言包", "SUCCESS") self.log("语言包已刷入完成,重启设备后生效,您可在适当时候重启", "WARNING") elif success_count > 0: self.log(f"部分刷入成功({success_count}/{total})", "WARNING") if not aborted: self.log("语言包已刷入完成,重启设备后生效,您可在适当时候重启", "WARNING") self.show_progress(False, is_push=True) threading.Thread(target=do_push_all, daemon=True).start() def install_all_apks(self): """批量安装APK — 手动选择文件夹""" if not self.check_device_connection(): return apk_dir = filedialog.askdirectory(title="选择包含APK文件的文件夹") if not apk_dir: return apk_files = list(Path(apk_dir).glob("*.apk")) if not apk_files: messagebox.showerror("错误", "所选文件夹中没有APK文件!") return result = messagebox.askyesno("确认安装", f"找到 {len(apk_files)} 个APK文件\n\n是否开始批量安装?") if not result: return def install(): self.show_progress(True, is_push=True) total = len(apk_files) self.log(f"开始批量安装 {total} 个APK...", "INFO") self.run_adb_command('adb -d shell setprop vecentek.model 1') success_count = 0 for i, apk_path in enumerate(apk_files, 1): self.update_progress(i, total, "安装中...", is_push=True) success, _ = self.run_adb_command(f'adb -d install -r "{apk_path}"') if success: success_count += 1 self.run_adb_command('adb -d shell setprop vecentek.model 0') self.update_progress(total, total, "安装完成", is_push=True) self.show_progress(False, is_push=True) if success_count == total: self.log(f"安装完成:全部 {total} 个成功", "SUCCESS") messagebox.showinfo("安装完成", f"成功安装 {total} 个APK!") elif success_count > 0: self.log(f"安装完成:{success_count}/{total} 成功", "WARNING") messagebox.showwarning("部分成功", f"成功: {success_count}\n失败: {total - success_count}") else: self.log("安装失败", "ERROR") messagebox.showerror("安装失败", "所有APK安装失败!") self.show_progress(False, is_push=True) threading.Thread(target=install, daemon=True).start() def install_single_apk(self): """安装单个APK""" # 检查设备连接 if not self.check_device_connection(): return file_path = filedialog.askopenfilename( title="选择APK文件", filetypes=[("APK文件", "*.apk"), ("所有文件", "*.*")] ) if not file_path: return def install(): self.show_progress(True, is_push=True) self.update_progress(50, 100, f"安装中", is_push=True) self.run_adb_command('adb -d shell setprop vecentek.model 1') success, _ = self.run_adb_command(f'adb -d install -r "{file_path}"') self.run_adb_command('adb -d shell setprop vecentek.model 0') self.update_progress(100, 100, f"完成", is_push=True) if success: self.log("✓ 安装成功", "SUCCESS") else: self.log("✗ 安装失败", "ERROR") self.show_progress(False, is_push=True) threading.Thread(target=install, daemon=True).start() def open_language_settings(self): """打开系统语言设置""" if not self.check_device_connection(): return self.run_adb_command('adb -d shell am start -a android.settings.LOCALE_SETTINGS') def open_language_quick_set(self): """打开快捷语言设置弹窗""" # 检查设备连接 if not self.check_device_connection(): return # 创建弹窗 popup = tk.Toplevel(self.root) popup.title("快捷语言设置") popup.geometry("520x320") popup.configure(bg=self.colors['bg_dark']) popup.resizable(False, False) # 居中显示 popup.update_idletasks() x = self.root.winfo_x() + (self.root.winfo_width() - 520) // 2 y = self.root.winfo_y() + (self.root.winfo_height() - 320) // 2 popup.geometry(f"+{x}+{y}") popup.transient(self.root) popup.grab_set() # 标题 header = tk.Label(popup, text="选择目标语言", font=('Microsoft YaHei', 13, 'bold'), fg=self.colors['accent'], bg=self.colors['bg_dark']) header.pack(pady=(15, 10)) hint = tk.Label(popup, text="点击按钮即可将系统语言切换为对应语言,重启后生效", font=('Microsoft YaHei', 9), fg=self.colors['text_secondary'], bg=self.colors['bg_dark']) hint.pack(pady=(0, 12)) # 语言列表:(显示名, locale_code) languages = [ ("🇨🇳 中文", "zh-CN"), ("英 English", "en-US"), ("俄 Русский", "ru-RU"), ("法 Français", "fr-FR"), ("西 Español", "es-ES"), ("葡 Português", "pt-BR"), ("意 Italiano", "it-IT"), ("阿 العربية", "ar-SA"), ] # 创建按钮容器 btn_frame = tk.Frame(popup, bg=self.colors['bg_dark']) btn_frame.pack(pady=(0, 10)) btn_colors = [ self.colors['accent'], self.colors['info'], self.colors['success'], self.colors['warning'], '#e17055', '#00b894', '#6c5ce7', '#0984e3', ] for i, (label, locale) in enumerate(languages): row = i // 4 col = i % 4 def make_cmd(loc=locale, lbl=label): return lambda: self._quick_set_language(loc, lbl, popup) btn = tk.Button(btn_frame, text=label, command=make_cmd(), font=('Microsoft YaHei', 10), fg='white', bg=btn_colors[i], relief=tk.FLAT, cursor='hand2', width=12, height=2) btn.grid(row=row, column=col, padx=5, pady=5) # 底部分隔 + 打开系统设置入口 sep = tk.Frame(popup, bg=self.colors['border'], height=1) sep.pack(fill=tk.X, padx=20, pady=(8, 6)) sys_btn = tk.Button(popup, text="⚙️ 打开系统语言设置(手动选择)", command=lambda: self._open_sys_and_close(popup), font=('Microsoft YaHei', 9), fg=self.colors['text_secondary'], bg=self.colors['bg_light'], relief=tk.FLAT, cursor='hand2') sys_btn.pack(pady=(0, 10)) def _quick_set_language(self, locale_code, language_name, popup): """执行快捷语言设置""" popup.destroy() def do_set(): self.log(f"正在设置系统语言为: {language_name} ({locale_code})", "INFO") success, output = self.run_adb_command( f'adb -d shell settings put system system_locales {locale_code}' ) if success: self.log(f"✓ 语言已设置为 {language_name}", "SUCCESS") messagebox.showinfo( "设置成功", f"系统语言已设置为 {language_name}\n\n⚠️ 请重启设备使其生效。" ) else: self.log(f"✗ 语言设置失败: {output}", "ERROR") messagebox.showerror("设置失败", f"语言设置失败!\n\n{output}") threading.Thread(target=do_set, daemon=True).start() def _open_sys_and_close(self, popup): """关闭弹窗并打开系统语言设置""" popup.destroy() self.open_language_settings() def open_timezone_settings(self): """打开时区设置""" if not self.check_device_connection(): return self.run_adb_command('adb -d shell am start -a android.settings.TIMEZONE_SETTINGS') def open_android_settings(self): """打开安卓原生设置""" if not self.check_device_connection(): return self.run_adb_command('adb -d shell am start -a android.settings.SETTINGS') def reboot_device(self): """重启设备""" if not self.check_device_connection(): return if messagebox.askyesno("确认重启", "确定要重启设备吗?"): subprocess.Popen(f'{self.adb} -d shell reboot', shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) self.log("设备正在重启...", "INFO") self.update_device_status(False) def on_disable_upgrade(self): """禁用系统升级""" # 检查设备连接 if not self.check_device_connection(): return # 弹窗确认 result = messagebox.askyesno( "确认禁用升级", "⚠️ 警告:禁用系统升级后,将无法接收系统更新!\n\n" "是否确定要禁用系统升级应用?\n\n" "禁用命令:\n" "adb -d shell pm disable-user --user 0 com.incall.apps.softmanager" ) if not result: self.log("已取消禁用升级操作", "INFO") return def disable(): self.show_progress(True, is_push=False) success, output = self.run_adb_command( 'adb -d shell pm disable-user --user 0 com.incall.apps.softmanager') if success: self.log("系统升级已禁用", "SUCCESS") messagebox.showinfo("成功", "系统升级已成功禁用!") else: self.log("禁用系统升级失败", "ERROR") messagebox.showerror("错误", f"禁用失败:{output}") self.show_progress(False, is_push=False) threading.Thread(target=disable, daemon=True).start() def _toggle_debug(self, event=None): """切换调试模式(隐藏入口,Ctrl+Shift+D)""" if self.debug_mode: self.debug_mode = False self.log("调试模式已关闭", "WARNING") self.status_text.config(text="就绪") self.refresh_device_status() return pwd = tk.simpledialog.askstring("调试模式", "请输入调试密码:", show='*', parent=self.root) if pwd == "zxch5200": self.debug_mode = True self.update_device_status(True, "", True) self.log("🔧 调试模式已开启 - 跳过授权和设备校验,显示详细ADB日志", "WARNING") self.status_text.config(text="🔧 调试模式") elif pwd is not None: messagebox.showwarning("错误", "密码错误") def install_apps(self): """安装App — 支持单选或多选APK文件""" if not self.check_device_connection(): return file_paths = filedialog.askopenfilenames( title="选择APK文件", filetypes=[("APK文件", "*.apk"), ("所有文件", "*.*")] ) if not file_paths: return count = len(file_paths) result = messagebox.askyesno("确认安装", f"已选择 {count} 个APK文件\n\n是否开始安装?") if not result: return def install(): self.show_progress(True, is_push=True) self.log(f"开始安装 {count} 个APK...", "INFO") self.run_adb_command('adb -d shell setprop vecentek.model 1') success_count = 0 for i, file_path in enumerate(file_paths, 1): apk_name = Path(file_path).stem self.update_progress(i, count, f"安装中 ({apk_name})", is_push=True) success, _ = self.run_adb_command(f'adb -d install -r "{file_path}"') if success: self.log(f"✓ {apk_name}.apk", "SUCCESS") success_count += 1 else: self.log(f"✗ {apk_name}.apk", "ERROR") self.run_adb_command('adb -d shell setprop vecentek.model 0') self.update_progress(count, count, "安装完成", is_push=True) self.show_progress(False, is_push=True) if success_count == count: self.log(f"安装完成:全部 {count} 个成功", "SUCCESS") messagebox.showinfo("安装完成", f"成功安装 {count} 个APK!") elif success_count > 0: self.log(f"安装完成:{success_count}/{count} 成功", "WARNING") messagebox.showwarning("部分成功", f"成功: {success_count}\n失败: {count - success_count}") else: self.log("安装失败", "ERROR") messagebox.showerror("安装失败", "所有APK安装失败!") threading.Thread(target=install, daemon=True).start() def run(self): """运行程序""" self.root.mainloop() def main(): """主函数""" if sys.version_info < (3, 6): print("错误:需要Python 3.6或更高版本") sys.exit(1) try: app = ADKAPKGUI() app.run() except Exception as e: print(f"启动失败: {e}") import traceback traceback.print_exc() messagebox.showerror("错误", f"程序启动失败: {e}") if __name__ == "__main__": main()