抖音下拉词采集_异步多线程

其他杂项24字数 21286阅读70分57秒阅读模式
# -*- coding: utf-8 -*-
import requests
import json
import urllib3
from urllib3.exceptions import InsecureRequestWarning
import urllib.parse
import time
import os
import sys
import asyncio
import httpx
import random
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError, Error as PlaywrightError

# --- 核心配置区域 ---
initial_keywords = ["银行", "存款", "利息", "利率", "存钱", "提前还房贷", "房贷", "手机银行", "理财", "退税", "活期", "定期", "攒钱", "存单", "银行卡", "房产契税", "银行贷款", "自动转存", "存单", "存折", "大额存单", "阶梯式存钱"] # 脚本开始时使用的初始关键词列表
output_filename = "douyin_xiala.txt"  # 输出结果的文件名
cookie_filename = "douyin_cookie.txt" # 存储Cookie的文件名
start_score = 20                      # 推荐词的起始最高分(排名第一的建议得分)
num_cycles = 10                       # 总的关键词扩展轮数

# --- 优化参数 ---
# 1. 限制每轮处理的源关键词数量
MAX_KEYWORDS_PER_CYCLE = None  # 每轮最多处理的源关键词数量。设为 None 则不限制。
KEYWORD_PROCESSING_STRATEGY = 'first' # 当待处理关键词超过MAX_KEYWORDS_PER_CYCLE时的筛选策略: 'first' (取列表前面的), 'random' (随机抽取)

# 2. 限制添加到下一轮的推荐词数量/质量
MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE = 10 # 对于每个源关键词,最多取前N条评分达标的建议加入下一轮的候选列表
MIN_SCORE_TO_ADD_NEXT_CYCLE = 1       # 只有分数(start_score - index) >= 此值的建议才会被考虑加入下一轮

# --- 请求与重试配置 ---
max_retries = 3                       # 单个关键词API请求失败后的最大重试次数
retry_delay_time = 5.0                # 基础重试间隔时间(秒),对于429等错误会基于此值指数增加
request_timeout = 25                  # 单个HTTP请求的超时时间(秒)

# --- 并发与延时控制 ---
CONCURRENCY_LIMIT = 5                 # 同时执行的最大并发请求数。根据网络和对方服务器承受能力调整
ENABLE_MICRO_DELAY = True             # 是否在每个并发任务完成内部(释放信号量前)加入微小随机延时
MICRO_DELAY_MIN = 0.5                 # 单个任务内部延时的最小值(秒)
MICRO_DELAY_MAX = 1.5                 # 单个任务内部延时的最大值(秒)
BATCH_DELAY_MIN = 5.0                 # 每处理完一批并发任务后的最小延时(秒)
BATCH_DELAY_MAX = 10.0                # 每处理完一批并发任务后的最大延时(秒)

# --- 登录相关配置 ---
login_url = "https://www.douyin.com/search/%E9%93%B6%E8%A1%8C%E9%87%91%E6%9D%A1" # 触发登录的初始页面
login_button_xpath = '//*[@id="island_b69f5"]/div/div[5]' # 登录按钮的XPath 【请务必根据实际页面更新】
login_check_url = "https://www.douyin.com/user/self?from_tab_name=main" # 验证登录状态的URL
login_check_text = "观看历史"             # 验证登录成功的页面文本标识
login_wait_timeout = 30               # 等待用户扫码登录的总时间(秒)
login_reminder_interval = 10          # 扫码登录期间的倒计时提醒间隔(秒)
navigation_timeout_check = 45000      # Playwright导航到验证页面的超时时间(毫秒)
text_wait_timeout_check = 15000       # Playwright等待验证文本出现的超时时间(毫秒)

# --- API URL 配置 ---
# 【!!!务必用你最新的有效 URL 和关键词替换!!!】
original_encoded_keyword = "%E9%93%B6%E8%A1%8C%E6%8E%92" # 示例值,与你的 base_url 中 keyword= 后面的部分精确匹配
base_url = 'https://www.douyin.com/aweme/v1/web/search/sug/?device_platform=webapp&aid=6383&channel=channel_pc_web&keyword=%E9%93%B6%E8%A1%8C%E6%8E%92&source=aweme_video_web&from_group_id=SOME_GROUP_ID&update_version_code=170400&pc_client_type=1&SOME_OTHER_PARAMS&msToken=YOUR_NEW_MSTOKEN&a_bogus=YOUR_NEW_A_BOGUS_SIGNATURE' # 示例结构,你需要用完整的、当前有效的URL替换

# --- 同步函数定义 (文件操作和Cookie格式化) ---
def save_cookies(cookies, filename=cookie_filename):
    try:
        with open(filename, 'w', encoding='utf-8') as f: json.dump(cookies, f, ensure_ascii=False, indent=4)
        print(f"Cookies 已成功保存到 {filename}")
    except Exception as e: print(f"保存 Cookies 到 {filename} 时出错: {e}")

def load_cookies(filename=cookie_filename):
    if not os.path.exists(filename): return None
    try:
        with open(filename, 'r', encoding='utf-8') as f: cookies = json.load(f)
        if isinstance(cookies, list) and all(isinstance(c, dict) for c in cookies):
             print(f"从 {filename} 加载 Cookies 成功。"); return cookies
        else:
             print(f"警告:{filename} 格式错误,忽略。")
             if os.path.exists(filename):
                 try: os.remove(filename); print(f"警告:已删除格式错误的 Cookie 文件: {filename}")
                 except OSError as e_del: print(f"警告:删除格式错误的 Cookie 文件 {filename} 时出错: {e_del}")
             return None
    except json.JSONDecodeError:
        print(f"警告:无法解析 {filename} JSON 数据,忽略。")
        if os.path.exists(filename):
            try: os.remove(filename); print(f"警告:已删除无法解析的 Cookie 文件: {filename}")
            except OSError as e_del: print(f"警告:删除无法解析的 Cookie 文件 {filename} 时出错: {e_del}")
        return None
    except Exception as e: print(f"加载 Cookies 从 {filename} 时出错: {e}"); return None

def format_cookies_for_requests(cookies):
    if not cookies: return ""
    return "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])

def deduplicate_output_file(filename):
    print(f"n--- 开始对文件 '{filename}' 进行最终去重和分数优化 ---")
    if not os.path.exists(filename): print(f"文件 '{filename}' 不存在,无需去重。"); return
    highest_scores = {}; lines_read = 0; malformed_lines = 0
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            for line in f:
                lines_read += 1; line = line.strip()
                if not line: continue
                parts = line.rsplit(',', 1)
                if len(parts) == 2:
                    keyword = parts[0].strip(); score_str = parts[1].strip()
                    if not keyword: malformed_lines += 1; continue
                    try:
                        score = int(score_str)
                        if keyword not in highest_scores or score > highest_scores[keyword]:
                            highest_scores[keyword] = score
                    except ValueError: malformed_lines += 1
                else: malformed_lines += 1
        print(f"读取 {lines_read} 行, 格式错误 {malformed_lines} 行。最终去重后 {len(highest_scores)} 个。")
        sorted_items = sorted(highest_scores.items(), key=lambda item: item[1], reverse=True)
        with open(filename, 'w', encoding='utf-8') as f:
            for keyword, score in sorted_items:
                f.write(f"{keyword},{score}n")
        print(f"--- 文件 '{filename}' 最终去重和分数优化完成 ---")
    except FileNotFoundError: print(f"错误:文件 '{filename}' 未找到。")
    except Exception as e: print(f"处理文件 '{filename}' 时发生错误:{e}")

# --- Playwright 异步函数 (登录和Cookie验证) ---
async def async_check_login_status(p, cookies): # p: Playwright 实例, cookies: 从文件加载的Cookie列表
    browser = None; context = None; verify_page = None
    try:
        print("正在使用 Playwright Async API (channel: msedge) 验证 Cookie 有效性...")
        browser = await p.chromium.launch(channel="msedge", headless=True) # 以无头模式启动 Edge
        context = await browser.new_context(); # 创建新的浏览器上下文
        if cookies: await context.add_cookies(cookies) # 如果有旧Cookie,添加到上下文中
        verify_page = await context.new_page() # 在上下文中创建新页面用于验证
        print(f"  - 正在新标签页导航到验证页面: {login_check_url}")
        await verify_page.goto(login_check_url, timeout=navigation_timeout_check, wait_until='load') # 导航到验证URL
        print("  - 导航完成。")
        print(f"  - 正在查找验证文本: '{login_check_text}'...")
        try:
            login_indicator = verify_page.locator(f'*:has-text("{login_check_text}")').first # 定位包含验证文本的元素
            await login_indicator.wait_for(state='visible', timeout=text_wait_timeout_check) # 等待元素可见
            print(f"  - 找到文本 '{login_check_text}'。Cookie 验证成功。"); return True # 验证成功
        except PlaywrightTimeoutError: print(f"  - 未找到文本 '{login_check_text}'。Cookie 可能已失效。"); return False # 超时未找到,验证失败
    except PlaywrightTimeoutError as nav_timeout_err: print(f"验证时页面导航超时: {nav_timeout_err}"); return False # 导航超时
    except Exception as e:
        print(f"验证 Cookie 时出错: {e}")
        if "channel "msedge" not found" in str(e): print("错误:找不到 Edge (msedge)。请安装或运行 'playwright install'。")
        return False
    finally: # 确保资源被关闭
        if verify_page and not verify_page.is_closed():
            try: await verify_page.close()
            except PlaywrightError as e_close_page: print(f"关闭验证页出错: {e_close_page}")
        if context:
            try: await context.close()
            except Exception as e_close: print(f"关闭 Context 出错: {e_close}")
        if browser:
            try: await browser.close()
            except Exception as e_close: print(f"关闭 Browser 出错: {e_close}")

async def async_get_cookies(p): # p: Playwright 实例
    browser = None; context = None; page = None; verify_page = None
    try:
        print("正在启动 Edge (msedge channel) 浏览器进行登录...")
        browser = await p.chromium.launch(channel="msedge", headless=False) # 启动可见的 Edge 浏览器
        context = await browser.new_context(user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0') # 设置 User-Agent
        page = await context.new_page() # 初始登录页面
        print(f"正在打开页面: {login_url}")
        await page.goto(login_url, wait_until='domcontentloaded', timeout=60000) # 导航到登录触发页
        print("页面已打开,尝试点击登录按钮...")
        try:
            login_button = page.locator(login_button_xpath) # 定位登录按钮
            await login_button.wait_for(state="visible", timeout=20000) # 等待按钮可见
            await login_button.click() # 点击登录按钮
            print("登录按钮已点击,请在弹出的 Edge 浏览器中扫描二维码登录...")
        except PlaywrightTimeoutError:
            print(f"错误:未找到或无法点击登录按钮 (XPath: {login_button_xpath})。")
            print("请手动点击登录按钮(如果需要)。脚本将继续等待...") # 给用户手动操作的机会
        except Exception as e:
            print(f"点击登录按钮时出错: {e}")
            print("请检查浏览器窗口并手动操作(如果需要)。脚本将继续等待...")

        print(f"请在 {login_wait_timeout} 秒内完成扫码登录。")
        remaining_time = login_wait_timeout
        while remaining_time > 0: # 倒计时等待用户扫码
            print(f"剩余登录时间: {remaining_time} 秒...")
            wait_chunk = min(remaining_time, login_reminder_interval)
            await asyncio.sleep(wait_chunk) # 异步等待
            remaining_time -= wait_chunk
        print("登录等待时间结束,现在开始验证登录状态...")

        login_successful = False
        try: # 在新标签页验证登录状态
            verify_page = await context.new_page()
            print(f"  - 正在新标签页导航到验证页面: {login_check_url}")
            await verify_page.goto(login_check_url, timeout=navigation_timeout_check, wait_until='load')
            print(f"  - 导航完成。")
            print(f"  - 正在查找验证文本: '{login_check_text}'...")
            login_indicator = verify_page.locator(f'*:has-text("{login_check_text}")').first
            await login_indicator.wait_for(state='visible', timeout=text_wait_timeout_check)
            print(f"  - 找到文本 '{login_check_text}'!"); login_successful = True
        except PlaywrightTimeoutError as check_timeout_err: print(f"  - 验证失败(超时): {check_timeout_err}")
        except PlaywrightError as check_general_err: print(f"  - 验证时发生 Playwright 错误: {check_general_err}")
        except Exception as e: print(f"  - 验证时发生未知错误: {e}")
        finally:
            if verify_page and not verify_page.is_closed():
                try: await verify_page.close()
                except PlaywrightError as e_close_page: print(f"关闭验证页出错: {e_close_page}")

        if login_successful:
            print("验证成功,登录状态确认。正在获取 Cookies...")
            cookies = await context.cookies(); print("Cookies 获取成功。"); return cookies # 返回获取到的Cookie
        else:
            print(f"错误:未能验证登录成功状态。"); return None # 登录失败
    except Exception as e:
        print(f"Playwright 登录过程中发生严重错误: {e}")
        if "channel "msedge" not found" in str(e): print("错误:找不到 Edge (msedge)。请安装或运行 'playwright install'。")
        return None
    finally: # 确保资源被关闭
        if context:
            try: await context.close(); print("Playwright 上下文已关闭。")
            except Exception as e_close: print(f"关闭 Context 出错: {e_close}")
        if browser:
            try: await browser.close(); print("Playwright (msedge) 浏览器已关闭。")
            except Exception as e_close: print(f"关闭 Browser 出错: {e_close}")

# --- Async Fetch Function (带延时和重试策略) ---
async def fetch_suggestions(client: httpx.AsyncClient, semaphore: asyncio.Semaphore, source_keyword: str, cookies_for_headers: str):
    # 全局变量声明
    global base_url, original_encoded_keyword, start_score, max_retries, retry_delay_time, request_timeout, cookie_seems_invalid
    global ENABLE_MICRO_DELAY, MICRO_DELAY_MIN, MICRO_DELAY_MAX

    headers = { # 【务必确保这些Headers与浏览器成功请求时一致或足够模拟】
        'accept': 'application/json, text/plain, */*',
        'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
        'cookie': cookies_for_headers,
        'dnt': '1',
        'priority': 'u=1, i',
        'referer': 'https://www.douyin.com/search/', # 尝试更精确的Referer可能更好
        'sec-ch-ua': '"Microsoft Edge";v="125", "Chromium";v="125", "Not.A/Brand";v="24"',
        'sec-ch-ua-mobile': '?0',
        'sec-ch-ua-platform': '"Windows"',
        'sec-fetch-dest': 'empty',
        'sec-fetch-mode': 'cors',
        'sec-fetch-site': 'same-origin',
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0',
        # 【检查】确保这里包含了所有从浏览器复制的关键Headers,特别是token类
    }
    encoded_keyword = urllib.parse.quote(source_keyword) # URL编码关键词
    current_url = base_url.replace(original_encoded_keyword, encoded_keyword) # 替换URL中的占位关键词
    suggestions_found = [] # 存储本关键词找到的建议
    response_obj_for_debug = None # 用于在异常块中访问response对象

    async with semaphore: # 获取信号量,控制并发数量
        for attempt in range(max_retries): # 进行重试
            try:
                response = await client.get(current_url, headers=headers, timeout=request_timeout) # 发起异步GET请求
                response_obj_for_debug = response # 保存response对象以便调试
                if response.status_code == 403: # 如果是403 Forbidden
                    cookie_seems_invalid = True # 标记Cookie可能已失效
                    print(f"    ! HTTP 403 for '{source_keyword}'. Cookie 可能已失效。")
                    response.raise_for_status() # 抛出HTTPStatusError,由下面的except块处理
                response.raise_for_status() # 对其他非2xx状态码也抛出异常
                
                data = response.json() # 尝试解析JSON响应体
                sug_list = data.get('sug_list', []) # 获取推荐列表
                if sug_list:
                    for index, item in enumerate(sug_list):
                        score = start_score - index # 计算分数
                        if score < 1: break # 分数过低则停止处理后续建议
                        if isinstance(item, dict) and 'content' in item:
                            content = item.get('content', '').strip() # 获取建议内容
                            if content: suggestions_found.append((content, score)) # 添加到结果列表
                
                if ENABLE_MICRO_DELAY: # 如果启用了微小延时
                    micro_delay = random.uniform(MICRO_DELAY_MIN, MICRO_DELAY_MAX)
                    await asyncio.sleep(micro_delay) # 在释放信号量前加入随机延时
                return suggestions_found # 成功,返回结果
            except httpx.HTTPStatusError as http_err: # 处理HTTP状态码错误
                print(f"    ! HTTP Error for '{source_keyword}' (Attempt {attempt + 1}/{max_retries}): {http_err}")
                current_status_code = -1
                if response_obj_for_debug: # 确保response对象存在
                    current_status_code = response_obj_for_debug.status_code
                    print(f"    ! Response Status Code (HTTPError): {current_status_code}")
                    # print(f"    ! Response Headers (HTTPError): {response_obj_for_debug.headers}") # 可选打印
                    try: print(f"    ! Response Text (HTTPError, first 500 chars): {response_obj_for_debug.text[:500]}...")
                    except Exception as e_text: print(f"    ! Could not get text from HTTPError response: {e_text}")

                if current_status_code == 403: print(f"    ! HTTP 403 confirmed for '{source_keyword}'. Stopping retries."); return None # 403不再重试
                elif current_status_code == 429: # Too Many Requests
                    wait_time = retry_delay_time * (2 ** attempt) * random.uniform(3, 6) # 指数退避并增加随机性
                    print(f"    ! Received 429 for '{source_keyword}'. Waiting {wait_time:.2f}s before next attempt...")
                    await asyncio.sleep(wait_time) # 等待更长时间
                elif attempt >= max_retries - 1: print(f"    ! Max retries reached for HTTP error on '{source_keyword}'. Giving up."); return None # 达到最大重试
                else: await asyncio.sleep(retry_delay_time * (2 ** attempt)) # 其他HTTP错误,指数退避重试
            except (httpx.TimeoutException, httpx.NetworkError, httpx.ConnectError) as req_err: # 处理网络或超时错误
                print(f"    ! Network/Timeout Error for '{source_keyword}' (Attempt {attempt + 1}/{max_retries}): {req_err}")
                if attempt >= max_retries - 1: print(f"    ! Max retries reached for '{source_keyword}'. Giving up."); return None
                else: await asyncio.sleep(retry_delay_time * (2 ** attempt))
            except json.JSONDecodeError as json_err: # 处理JSON解析错误
                print(f"    ! JSON Decode Error for '{source_keyword}': {json_err}")
                if response_obj_for_debug is not None:
                    print(f"    ! Response Status Code: {response_obj_for_debug.status_code}")
                    full_response_text = response_obj_for_debug.text
                    print(f"    ! Full Response Text (first 1000 chars): {full_response_text[:1000]}...")
                else: print("    ! 'response' object was not available for JSONDecodeError details.")
                print(f"    ! Giving up on '{source_keyword}' due to JSON error."); return None # JSON错误通常不重试
            except Exception as e: # 处理其他未知错误
                print(f"    ! Unknown Error for '{source_keyword}' (Attempt {attempt + 1}/{max_retries}): {e}")
                if attempt >= max_retries - 1: print(f"    ! Max retries reached for '{source_keyword}'. Giving up."); return None
                else: await asyncio.sleep(retry_delay_time * (2 ** attempt))
        return None # 所有重试都失败了

# --- Async Main Task Function (负责调度和聚合结果) ---
async def run_async_tasks():
    # 声明需要用到的全局变量 (配置和状态)
    global current_keywords_for_cycle, next_cycle_keywords_collector, all_processed_source_keywords
    global cookie_seems_invalid, cookie_string, output_filename, num_cycles
    global MAX_KEYWORDS_PER_CYCLE, KEYWORD_PROCESSING_STRATEGY
    global MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE, MIN_SCORE_TO_ADD_NEXT_CYCLE
    global BATCH_DELAY_MIN, BATCH_DELAY_MAX, CONCURRENCY_LIMIT

    print(f"n准备就绪,开始异步处理关键词,共计 {num_cycles} 轮循环。")
    print(f"结果将追加写入到文件: {output_filename}")
    if os.path.exists(output_filename): print(f"注意:文件 {output_filename} 已存在,将在其后追加内容。最终会进行去重。")

    urllib3.disable_warnings(InsecureRequestWarning) # 禁用requests的SSL警告(如果还用到的话)

    semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) # 创建信号量控制并发
    # 创建httpx异步客户端,整个异步流程中复用
    async with httpx.AsyncClient(http2=True, verify=False, timeout=request_timeout) as client:
        for cycle in range(1, num_cycles + 1): # 外层轮次循环
            if cookie_seems_invalid: # 如果标记了Cookie失效
                print("n警告:Cookie 可能已失效,建议停止脚本并重新登录获取新 Cookie。")
                break # 结束所有轮次

            print(f"n--- 开始第 {cycle}/{num_cycles} 轮 ---")
            print(f"本轮开始时,待处理源关键词总数: {len(current_keywords_for_cycle)}")

            if not current_keywords_for_cycle: print("没有新的源关键词需要处理,结束循环。"); break # 没有词了就结束

            # --- 1. 限制每轮处理的关键词数量 ---
            keywords_this_cycle_actual_process_list = list(current_keywords_for_cycle) # 默认处理全部
            if MAX_KEYWORDS_PER_CYCLE is not None and len(current_keywords_for_cycle) > MAX_KEYWORDS_PER_CYCLE:
                print(f"待处理关键词数量 ({len(current_keywords_for_cycle)}) 超过上限 ({MAX_KEYWORDS_PER_CYCLE}),将进行筛选。")
                temp_list_for_selection = list(current_keywords_for_cycle)
                if KEYWORD_PROCESSING_STRATEGY == 'random': # 如果是随机策略
                    keywords_this_cycle_actual_process_list = random.sample(temp_list_for_selection, MAX_KEYWORDS_PER_CYCLE)
                    print(f"已随机选取 {MAX_KEYWORDS_PER_CYCLE} 个关键词进行处理。")
                else: # 默认 'first' 策略,取列表前面的
                    keywords_this_cycle_actual_process_list = temp_list_for_selection[:MAX_KEYWORDS_PER_CYCLE]
                    print(f"已选取前 {MAX_KEYWORDS_PER_CYCLE} 个关键词进行处理。")
            # ------------------------------------

            total_this_cycle_for_display = len(keywords_this_cycle_actual_process_list) # 用于显示的本轮总数
            processed_keywords_display_count = 0 # 用于显示的已处理进度
            apis_actually_called_this_cycle = 0    # 记录本轮实际发起的API请求数
            suggestions_written_this_cycle = 0     # 记录本轮实际写入文件的建议数
            next_cycle_keywords_collector.clear() # 清空上一轮收集的下一轮候选词
            tasks_for_gather = []                  # 存储本批次要并发执行的task
            keywords_in_current_batch = []         # 存储与tasks_for_gather对应的源关键词

            # --- 遍历本轮要处理的关键词,创建任务 ---
            for source_keyword in keywords_this_cycle_actual_process_list: # 使用筛选后的列表
                processed_keywords_display_count += 1 # 更新显示进度
                # 如果该源词之前已经作为源词处理过,则跳过
                if source_keyword in all_processed_source_keywords:
                    print(f"  跳过已作为源词处理过的 ({processed_keywords_display_count}/{total_this_cycle_for_display}): {source_keyword}")
                    continue
                # 创建异步任务
                task = asyncio.create_task(fetch_suggestions(client, semaphore, source_keyword, cookie_string))
                tasks_for_gather.append(task)
                keywords_in_current_batch.append(source_keyword) # 记录这个关键词进入了当前批次
                apis_actually_called_this_cycle += 1

            # --- 执行本轮创建的所有任务 ---
            if not tasks_for_gather: # 如果过滤或筛选后没有任务了
                print("本轮筛选或过滤后没有需要请求的新关键词,进入下一轮准备。")
            else:
                print(f"本轮共创建 {len(tasks_for_gather)} 个新请求任务,开始并发执行 (并发数上限: {CONCURRENCY_LIMIT})...")
                # 并发执行所有任务,return_exceptions=True 使得一个任务的失败不影响其他任务
                results_from_gather = await asyncio.gather(*tasks_for_gather, return_exceptions=True)
                print(f"本轮 {len(tasks_for_gather)} 个任务执行完毕,开始处理结果...")

                # --- 处理并发任务的结果 ---
                all_suggestions_to_write_this_cycle = [] # 收集本轮所有成功的建议,以便一次性写入
                failed_source_keywords_this_cycle = []   # 记录本轮处理失败的源关键词
                processed_results_display_count = 0      # 用于结果处理的进度显示

                for i, result_item in enumerate(results_from_gather):
                    processed_results_display_count +=1
                    current_source_keyword = keywords_in_current_batch[i] # 获取对应的源关键词

                    if isinstance(result_item, Exception): # 如果 gather 的结果是异常
                        print(f"  处理结果出错 ({processed_results_display_count}/{apis_actually_called_this_cycle}) 源关键词 '{current_source_keyword}': {result_item}")
                        failed_source_keywords_this_cycle.append(current_source_keyword)
                    elif result_item is None: # 如果 fetch_suggestions 返回 None (重试耗尽或特定错误)
                        print(f"  处理结果失败 ({processed_results_display_count}/{apis_actually_called_this_cycle}) 源关键词 '{current_source_keyword}' (已达最大重试次数或JSON解析/403错误)")
                        failed_source_keywords_this_cycle.append(current_source_keyword)
                    else: # result_item 是一个包含 (content, score) 元组的列表
                        if result_item: # 如果列表非空,表示找到了建议
                             all_suggestions_to_write_this_cycle.extend(result_item) # 先全部收集用于写入
                             # --- 2. 限制添加到下一轮的推荐词数量/质量 ---
                             suggestions_added_to_next_count = 0
                             for content, score in result_item: # API返回的通常已按相关性排序
                                 if score >= MIN_SCORE_TO_ADD_NEXT_CYCLE: # 检查分数是否达标
                                     if content not in all_processed_source_keywords: # 未被作为源词处理过
                                         next_cycle_keywords_collector.add(content) # 加入下一轮候选
                                         suggestions_added_to_next_count += 1
                                         # 如果达到了本源词的最大添加数,则停止添加
                                         if MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE is not None and 
                                            suggestions_added_to_next_count >= MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE:
                                             break
                             if suggestions_added_to_next_count > 0:
                                 print(f"  源关键词 '{current_source_keyword}' -> 添加了 {suggestions_added_to_next_count} 个有效建议到下一轮候选。")
                        # else: # 列表为空,表示API成功返回但没有建议 (减少日志)
                            # print(f"  处理结果成功 ({processed_results_display_count}/{apis_actually_called_this_cycle}) 源关键词 '{current_source_keyword}' (未找到建议)")
                    all_processed_source_keywords.add(current_source_keyword) # 标记该源关键词已被处理

                # --- 批量写入文件 ---
                if all_suggestions_to_write_this_cycle:
                    try:
                        with open(output_filename, 'a', encoding='utf-8') as f:
                            for content, score in all_suggestions_to_write_this_cycle: f.write(f"{content},{score}n")
                        suggestions_written_this_cycle = len(all_suggestions_to_write_this_cycle)
                    except Exception as e: print(f"错误:写入文件 '{output_filename}' 时发生错误: {e}")

            # --- 加入批次延时 ---
            if cycle < num_cycles and apis_actually_called_this_cycle > 0 : # 只有实际请求了且不是最后一轮才延时
                batch_delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX)
                print(f"n处理完一批并发任务,随机暂停 {batch_delay:.2f} 秒...")
                await asyncio.sleep(batch_delay)
            # =====================

            # --- 准备下一轮 ---
            print(f"n--- 第 {cycle}/{num_cycles} 轮结束 ---")
            print(f"本轮实际请求API的源关键词数: {apis_actually_called_this_cycle}")
            print(f"本轮共写入建议条数: {suggestions_written_this_cycle}")
            print(f"失败或未找到建议的源关键词数: {len(failed_source_keywords_this_cycle)}")
            print(f"收集到下一轮潜在源关键词数量: {len(next_cycle_keywords_collector)}")

            current_keywords_for_cycle = next_cycle_keywords_collector - all_processed_source_keywords # 确保下一轮的源词是全新的
            print(f"准备用于下一轮的新源关键词数: {len(current_keywords_for_cycle)}")
            if CONCURRENCY_LIMIT > 1 : await asyncio.sleep(0.1) # 并发时短暂释放CPU,避免100%占用

    print(f"n--- 所有 {num_cycles} 轮循环处理完毕 (异步) ---")

# --- 脚本入口 ---
if __name__ == "__main__":
    print("--- 初始化和登录检查 (同步执行) ---")
    current_keywords_for_cycle = set(initial_keywords) # 本轮要处理的源关键词
    next_cycle_keywords_collector = set()              # 收集下一轮候选的推荐词
    all_processed_source_keywords = set()              # 记录所有已作为源词处理过的关键词
    cookie_seems_invalid = False                       # Cookie失效标志
    cookie_string = None                               # 全局Cookie字符串

    loaded_cookies = load_cookies() # 尝试加载本地Cookie
    needs_login = True              # 是否需要登录标志
    playwright_successful = False   # Playwright操作是否成功标志

    try:
        # 使用异步包装函数来执行异步的Playwright操作
        async def sync_wrapper_for_playwright():
            global cookie_string, needs_login # 声明要修改全局变量
            async with async_playwright() as p: # 启动异步Playwright
                if loaded_cookies: # 如果加载到本地Cookie
                    is_valid = await async_check_login_status(p, loaded_cookies) # 异步验证Cookie
                    if is_valid:
                        cookie_string = format_cookies_for_requests(loaded_cookies)
                        needs_login = False; print("使用文件中有效的 Cookie 继续。")
                    else: # Cookie无效
                        print("文件中的 Cookie 无效或已过期。")
                        if os.path.exists(cookie_filename):
                            try: os.remove(cookie_filename); print(f"已删除无效 Cookie 文件: {cookie_filename}")
                            except OSError as e: print(f"删除无效 Cookie 文件 {cookie_filename} 时出错: {e}")
                if needs_login: # 如果需要登录 (无本地Cookie或本地Cookie无效)
                    print("需要登录以获取新的 Cookie。")
                    new_cookies = await async_get_cookies(p) # 异步获取新Cookie
                    if new_cookies:
                        save_cookies(new_cookies); cookie_string = format_cookies_for_requests(new_cookies)
                        print("已获取并保存新的 Cookie。")
                    else: print("错误:无法获取有效的 Cookie,脚本无法继续执行。"); return False # 获取失败
                if not cookie_string: print("错误:未能准备好有效的 Cookie 字符串,脚本终止。"); return False # Cookie准备失败
                return True # Playwright操作成功
        playwright_successful = asyncio.run(sync_wrapper_for_playwright()) # 同步运行异步包装函数
    except Exception as e_pw_init: # 捕获Playwright初始化或执行中的异常
         print(f"初始化或执行 Playwright 操作时发生错误: {e_pw_init}")
         playwright_successful = False # 标记失败

    if playwright_successful: # 如果Playwright操作成功(Cookie已准备好)
        print("n--- 开始执行异步关键词处理 ---")
        try:
            asyncio.run(run_async_tasks()) # 运行核心的异步任务处理逻辑
        except KeyboardInterrupt: print("n捕获到中断信号,正在尝试停止...") # 处理用户中断
        except Exception as e_async_main: # 捕获异步任务执行中的其他异常
            print(f"n异步任务执行过程中发生错误: {e_async_main}")
            import traceback
            traceback.print_exc() # 打印详细的错误堆栈信息
        finally: # 无论异步任务是否成功,都执行最终的去重操作
            deduplicate_output_file(output_filename) # 对输出文件进行最终去重和排序
        
        print(f"n脚本执行完毕。")
        print(f"总共处理过的独立源关键词数量 (发起过API请求的): {len(all_processed_source_keywords)}")
        print(f"请检查最终结果文件 (已去重并优化分数): {output_filename}")
        print(f"使用的 Cookie 保存在: {cookie_filename}")
    else: # 如果Playwright操作失败
        print("n由于未能成功获取或验证 Cookie,脚本已终止。")

 

 
  • 本文由 asdfasd 发表于 2026-01-2517:50:58
  • 转载请务必保留本文链接:http://wp.fangfa.me/other-note/%e6%8a%96%e9%9f%b3%e4%b8%8b%e6%8b%89%e8%af%8d%e9%87%87%e9%9b%86_%e5%bc%82%e6%ad%a5%e5%a4%9a%e7%ba%bf%e7%a8%8b.html