# -*- coding: utf-8 -*-
import requests
import json
import urllib3
from urllib3.exceptions import InsecureRequestWarning
import urllib.parse
import time
import os
import sys
import asyncio
import httpx
import random
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError, Error as PlaywrightError
# --- 核心配置区域 ---
initial_keywords = ["银行", "存款", "利息", "利率", "存钱", "提前还房贷", "房贷", "手机银行", "理财", "退税", "活期", "定期", "攒钱", "存单", "银行卡", "房产契税", "银行贷款", "自动转存", "存单", "存折", "大额存单", "阶梯式存钱"] # 脚本开始时使用的初始关键词列表
output_filename = "douyin_xiala.txt" # 输出结果的文件名
cookie_filename = "douyin_cookie.txt" # 存储Cookie的文件名
start_score = 20 # 推荐词的起始最高分(排名第一的建议得分)
num_cycles = 10 # 总的关键词扩展轮数
# --- 优化参数 ---
# 1. 限制每轮处理的源关键词数量
MAX_KEYWORDS_PER_CYCLE = None # 每轮最多处理的源关键词数量。设为 None 则不限制。
KEYWORD_PROCESSING_STRATEGY = 'first' # 当待处理关键词超过MAX_KEYWORDS_PER_CYCLE时的筛选策略: 'first' (取列表前面的), 'random' (随机抽取)
# 2. 限制添加到下一轮的推荐词数量/质量
MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE = 10 # 对于每个源关键词,最多取前N条评分达标的建议加入下一轮的候选列表
MIN_SCORE_TO_ADD_NEXT_CYCLE = 1 # 只有分数(start_score - index) >= 此值的建议才会被考虑加入下一轮
# --- 请求与重试配置 ---
max_retries = 3 # 单个关键词API请求失败后的最大重试次数
retry_delay_time = 5.0 # 基础重试间隔时间(秒),对于429等错误会基于此值指数增加
request_timeout = 25 # 单个HTTP请求的超时时间(秒)
# --- 并发与延时控制 ---
CONCURRENCY_LIMIT = 5 # 同时执行的最大并发请求数。根据网络和对方服务器承受能力调整
ENABLE_MICRO_DELAY = True # 是否在每个并发任务完成内部(释放信号量前)加入微小随机延时
MICRO_DELAY_MIN = 0.5 # 单个任务内部延时的最小值(秒)
MICRO_DELAY_MAX = 1.5 # 单个任务内部延时的最大值(秒)
BATCH_DELAY_MIN = 5.0 # 每处理完一批并发任务后的最小延时(秒)
BATCH_DELAY_MAX = 10.0 # 每处理完一批并发任务后的最大延时(秒)
# --- 登录相关配置 ---
login_url = "https://www.douyin.com/search/%E9%93%B6%E8%A1%8C%E9%87%91%E6%9D%A1" # 触发登录的初始页面
login_button_xpath = '//*[@id="island_b69f5"]/div/div[5]' # 登录按钮的XPath 【请务必根据实际页面更新】
login_check_url = "https://www.douyin.com/user/self?from_tab_name=main" # 验证登录状态的URL
login_check_text = "观看历史" # 验证登录成功的页面文本标识
login_wait_timeout = 30 # 等待用户扫码登录的总时间(秒)
login_reminder_interval = 10 # 扫码登录期间的倒计时提醒间隔(秒)
navigation_timeout_check = 45000 # Playwright导航到验证页面的超时时间(毫秒)
text_wait_timeout_check = 15000 # Playwright等待验证文本出现的超时时间(毫秒)
# --- API URL 配置 ---
# 【!!!务必用你最新的有效 URL 和关键词替换!!!】
original_encoded_keyword = "%E9%93%B6%E8%A1%8C%E6%8E%92" # 示例值,与你的 base_url 中 keyword= 后面的部分精确匹配
base_url = 'https://www.douyin.com/aweme/v1/web/search/sug/?device_platform=webapp&aid=6383&channel=channel_pc_web&keyword=%E9%93%B6%E8%A1%8C%E6%8E%92&source=aweme_video_web&from_group_id=SOME_GROUP_ID&update_version_code=170400&pc_client_type=1&SOME_OTHER_PARAMS&msToken=YOUR_NEW_MSTOKEN&a_bogus=YOUR_NEW_A_BOGUS_SIGNATURE' # 示例结构,你需要用完整的、当前有效的URL替换
# --- 同步函数定义 (文件操作和Cookie格式化) ---
def save_cookies(cookies, filename=cookie_filename):
try:
with open(filename, 'w', encoding='utf-8') as f: json.dump(cookies, f, ensure_ascii=False, indent=4)
print(f"Cookies 已成功保存到 {filename}")
except Exception as e: print(f"保存 Cookies 到 {filename} 时出错: {e}")
def load_cookies(filename=cookie_filename):
if not os.path.exists(filename): return None
try:
with open(filename, 'r', encoding='utf-8') as f: cookies = json.load(f)
if isinstance(cookies, list) and all(isinstance(c, dict) for c in cookies):
print(f"从 {filename} 加载 Cookies 成功。"); return cookies
else:
print(f"警告:{filename} 格式错误,忽略。")
if os.path.exists(filename):
try: os.remove(filename); print(f"警告:已删除格式错误的 Cookie 文件: {filename}")
except OSError as e_del: print(f"警告:删除格式错误的 Cookie 文件 {filename} 时出错: {e_del}")
return None
except json.JSONDecodeError:
print(f"警告:无法解析 {filename} JSON 数据,忽略。")
if os.path.exists(filename):
try: os.remove(filename); print(f"警告:已删除无法解析的 Cookie 文件: {filename}")
except OSError as e_del: print(f"警告:删除无法解析的 Cookie 文件 {filename} 时出错: {e_del}")
return None
except Exception as e: print(f"加载 Cookies 从 {filename} 时出错: {e}"); return None
def format_cookies_for_requests(cookies):
if not cookies: return ""
return "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
def deduplicate_output_file(filename):
print(f"n--- 开始对文件 '{filename}' 进行最终去重和分数优化 ---")
if not os.path.exists(filename): print(f"文件 '{filename}' 不存在,无需去重。"); return
highest_scores = {}; lines_read = 0; malformed_lines = 0
try:
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
lines_read += 1; line = line.strip()
if not line: continue
parts = line.rsplit(',', 1)
if len(parts) == 2:
keyword = parts[0].strip(); score_str = parts[1].strip()
if not keyword: malformed_lines += 1; continue
try:
score = int(score_str)
if keyword not in highest_scores or score > highest_scores[keyword]:
highest_scores[keyword] = score
except ValueError: malformed_lines += 1
else: malformed_lines += 1
print(f"读取 {lines_read} 行, 格式错误 {malformed_lines} 行。最终去重后 {len(highest_scores)} 个。")
sorted_items = sorted(highest_scores.items(), key=lambda item: item[1], reverse=True)
with open(filename, 'w', encoding='utf-8') as f:
for keyword, score in sorted_items:
f.write(f"{keyword},{score}n")
print(f"--- 文件 '{filename}' 最终去重和分数优化完成 ---")
except FileNotFoundError: print(f"错误:文件 '{filename}' 未找到。")
except Exception as e: print(f"处理文件 '{filename}' 时发生错误:{e}")
# --- Playwright 异步函数 (登录和Cookie验证) ---
async def async_check_login_status(p, cookies): # p: Playwright 实例, cookies: 从文件加载的Cookie列表
browser = None; context = None; verify_page = None
try:
print("正在使用 Playwright Async API (channel: msedge) 验证 Cookie 有效性...")
browser = await p.chromium.launch(channel="msedge", headless=True) # 以无头模式启动 Edge
context = await browser.new_context(); # 创建新的浏览器上下文
if cookies: await context.add_cookies(cookies) # 如果有旧Cookie,添加到上下文中
verify_page = await context.new_page() # 在上下文中创建新页面用于验证
print(f" - 正在新标签页导航到验证页面: {login_check_url}")
await verify_page.goto(login_check_url, timeout=navigation_timeout_check, wait_until='load') # 导航到验证URL
print(" - 导航完成。")
print(f" - 正在查找验证文本: '{login_check_text}'...")
try:
login_indicator = verify_page.locator(f'*:has-text("{login_check_text}")').first # 定位包含验证文本的元素
await login_indicator.wait_for(state='visible', timeout=text_wait_timeout_check) # 等待元素可见
print(f" - 找到文本 '{login_check_text}'。Cookie 验证成功。"); return True # 验证成功
except PlaywrightTimeoutError: print(f" - 未找到文本 '{login_check_text}'。Cookie 可能已失效。"); return False # 超时未找到,验证失败
except PlaywrightTimeoutError as nav_timeout_err: print(f"验证时页面导航超时: {nav_timeout_err}"); return False # 导航超时
except Exception as e:
print(f"验证 Cookie 时出错: {e}")
if "channel "msedge" not found" in str(e): print("错误:找不到 Edge (msedge)。请安装或运行 'playwright install'。")
return False
finally: # 确保资源被关闭
if verify_page and not verify_page.is_closed():
try: await verify_page.close()
except PlaywrightError as e_close_page: print(f"关闭验证页出错: {e_close_page}")
if context:
try: await context.close()
except Exception as e_close: print(f"关闭 Context 出错: {e_close}")
if browser:
try: await browser.close()
except Exception as e_close: print(f"关闭 Browser 出错: {e_close}")
async def async_get_cookies(p): # p: Playwright 实例
browser = None; context = None; page = None; verify_page = None
try:
print("正在启动 Edge (msedge channel) 浏览器进行登录...")
browser = await p.chromium.launch(channel="msedge", headless=False) # 启动可见的 Edge 浏览器
context = await browser.new_context(user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0') # 设置 User-Agent
page = await context.new_page() # 初始登录页面
print(f"正在打开页面: {login_url}")
await page.goto(login_url, wait_until='domcontentloaded', timeout=60000) # 导航到登录触发页
print("页面已打开,尝试点击登录按钮...")
try:
login_button = page.locator(login_button_xpath) # 定位登录按钮
await login_button.wait_for(state="visible", timeout=20000) # 等待按钮可见
await login_button.click() # 点击登录按钮
print("登录按钮已点击,请在弹出的 Edge 浏览器中扫描二维码登录...")
except PlaywrightTimeoutError:
print(f"错误:未找到或无法点击登录按钮 (XPath: {login_button_xpath})。")
print("请手动点击登录按钮(如果需要)。脚本将继续等待...") # 给用户手动操作的机会
except Exception as e:
print(f"点击登录按钮时出错: {e}")
print("请检查浏览器窗口并手动操作(如果需要)。脚本将继续等待...")
print(f"请在 {login_wait_timeout} 秒内完成扫码登录。")
remaining_time = login_wait_timeout
while remaining_time > 0: # 倒计时等待用户扫码
print(f"剩余登录时间: {remaining_time} 秒...")
wait_chunk = min(remaining_time, login_reminder_interval)
await asyncio.sleep(wait_chunk) # 异步等待
remaining_time -= wait_chunk
print("登录等待时间结束,现在开始验证登录状态...")
login_successful = False
try: # 在新标签页验证登录状态
verify_page = await context.new_page()
print(f" - 正在新标签页导航到验证页面: {login_check_url}")
await verify_page.goto(login_check_url, timeout=navigation_timeout_check, wait_until='load')
print(f" - 导航完成。")
print(f" - 正在查找验证文本: '{login_check_text}'...")
login_indicator = verify_page.locator(f'*:has-text("{login_check_text}")').first
await login_indicator.wait_for(state='visible', timeout=text_wait_timeout_check)
print(f" - 找到文本 '{login_check_text}'!"); login_successful = True
except PlaywrightTimeoutError as check_timeout_err: print(f" - 验证失败(超时): {check_timeout_err}")
except PlaywrightError as check_general_err: print(f" - 验证时发生 Playwright 错误: {check_general_err}")
except Exception as e: print(f" - 验证时发生未知错误: {e}")
finally:
if verify_page and not verify_page.is_closed():
try: await verify_page.close()
except PlaywrightError as e_close_page: print(f"关闭验证页出错: {e_close_page}")
if login_successful:
print("验证成功,登录状态确认。正在获取 Cookies...")
cookies = await context.cookies(); print("Cookies 获取成功。"); return cookies # 返回获取到的Cookie
else:
print(f"错误:未能验证登录成功状态。"); return None # 登录失败
except Exception as e:
print(f"Playwright 登录过程中发生严重错误: {e}")
if "channel "msedge" not found" in str(e): print("错误:找不到 Edge (msedge)。请安装或运行 'playwright install'。")
return None
finally: # 确保资源被关闭
if context:
try: await context.close(); print("Playwright 上下文已关闭。")
except Exception as e_close: print(f"关闭 Context 出错: {e_close}")
if browser:
try: await browser.close(); print("Playwright (msedge) 浏览器已关闭。")
except Exception as e_close: print(f"关闭 Browser 出错: {e_close}")
# --- Async Fetch Function (带延时和重试策略) ---
async def fetch_suggestions(client: httpx.AsyncClient, semaphore: asyncio.Semaphore, source_keyword: str, cookies_for_headers: str):
# 全局变量声明
global base_url, original_encoded_keyword, start_score, max_retries, retry_delay_time, request_timeout, cookie_seems_invalid
global ENABLE_MICRO_DELAY, MICRO_DELAY_MIN, MICRO_DELAY_MAX
headers = { # 【务必确保这些Headers与浏览器成功请求时一致或足够模拟】
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'cookie': cookies_for_headers,
'dnt': '1',
'priority': 'u=1, i',
'referer': 'https://www.douyin.com/search/', # 尝试更精确的Referer可能更好
'sec-ch-ua': '"Microsoft Edge";v="125", "Chromium";v="125", "Not.A/Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0',
# 【检查】确保这里包含了所有从浏览器复制的关键Headers,特别是token类
}
encoded_keyword = urllib.parse.quote(source_keyword) # URL编码关键词
current_url = base_url.replace(original_encoded_keyword, encoded_keyword) # 替换URL中的占位关键词
suggestions_found = [] # 存储本关键词找到的建议
response_obj_for_debug = None # 用于在异常块中访问response对象
async with semaphore: # 获取信号量,控制并发数量
for attempt in range(max_retries): # 进行重试
try:
response = await client.get(current_url, headers=headers, timeout=request_timeout) # 发起异步GET请求
response_obj_for_debug = response # 保存response对象以便调试
if response.status_code == 403: # 如果是403 Forbidden
cookie_seems_invalid = True # 标记Cookie可能已失效
print(f" ! HTTP 403 for '{source_keyword}'. Cookie 可能已失效。")
response.raise_for_status() # 抛出HTTPStatusError,由下面的except块处理
response.raise_for_status() # 对其他非2xx状态码也抛出异常
data = response.json() # 尝试解析JSON响应体
sug_list = data.get('sug_list', []) # 获取推荐列表
if sug_list:
for index, item in enumerate(sug_list):
score = start_score - index # 计算分数
if score < 1: break # 分数过低则停止处理后续建议
if isinstance(item, dict) and 'content' in item:
content = item.get('content', '').strip() # 获取建议内容
if content: suggestions_found.append((content, score)) # 添加到结果列表
if ENABLE_MICRO_DELAY: # 如果启用了微小延时
micro_delay = random.uniform(MICRO_DELAY_MIN, MICRO_DELAY_MAX)
await asyncio.sleep(micro_delay) # 在释放信号量前加入随机延时
return suggestions_found # 成功,返回结果
except httpx.HTTPStatusError as http_err: # 处理HTTP状态码错误
print(f" ! HTTP Error for '{source_keyword}' (Attempt {attempt + 1}/{max_retries}): {http_err}")
current_status_code = -1
if response_obj_for_debug: # 确保response对象存在
current_status_code = response_obj_for_debug.status_code
print(f" ! Response Status Code (HTTPError): {current_status_code}")
# print(f" ! Response Headers (HTTPError): {response_obj_for_debug.headers}") # 可选打印
try: print(f" ! Response Text (HTTPError, first 500 chars): {response_obj_for_debug.text[:500]}...")
except Exception as e_text: print(f" ! Could not get text from HTTPError response: {e_text}")
if current_status_code == 403: print(f" ! HTTP 403 confirmed for '{source_keyword}'. Stopping retries."); return None # 403不再重试
elif current_status_code == 429: # Too Many Requests
wait_time = retry_delay_time * (2 ** attempt) * random.uniform(3, 6) # 指数退避并增加随机性
print(f" ! Received 429 for '{source_keyword}'. Waiting {wait_time:.2f}s before next attempt...")
await asyncio.sleep(wait_time) # 等待更长时间
elif attempt >= max_retries - 1: print(f" ! Max retries reached for HTTP error on '{source_keyword}'. Giving up."); return None # 达到最大重试
else: await asyncio.sleep(retry_delay_time * (2 ** attempt)) # 其他HTTP错误,指数退避重试
except (httpx.TimeoutException, httpx.NetworkError, httpx.ConnectError) as req_err: # 处理网络或超时错误
print(f" ! Network/Timeout Error for '{source_keyword}' (Attempt {attempt + 1}/{max_retries}): {req_err}")
if attempt >= max_retries - 1: print(f" ! Max retries reached for '{source_keyword}'. Giving up."); return None
else: await asyncio.sleep(retry_delay_time * (2 ** attempt))
except json.JSONDecodeError as json_err: # 处理JSON解析错误
print(f" ! JSON Decode Error for '{source_keyword}': {json_err}")
if response_obj_for_debug is not None:
print(f" ! Response Status Code: {response_obj_for_debug.status_code}")
full_response_text = response_obj_for_debug.text
print(f" ! Full Response Text (first 1000 chars): {full_response_text[:1000]}...")
else: print(" ! 'response' object was not available for JSONDecodeError details.")
print(f" ! Giving up on '{source_keyword}' due to JSON error."); return None # JSON错误通常不重试
except Exception as e: # 处理其他未知错误
print(f" ! Unknown Error for '{source_keyword}' (Attempt {attempt + 1}/{max_retries}): {e}")
if attempt >= max_retries - 1: print(f" ! Max retries reached for '{source_keyword}'. Giving up."); return None
else: await asyncio.sleep(retry_delay_time * (2 ** attempt))
return None # 所有重试都失败了
# --- Async Main Task Function (负责调度和聚合结果) ---
async def run_async_tasks():
# 声明需要用到的全局变量 (配置和状态)
global current_keywords_for_cycle, next_cycle_keywords_collector, all_processed_source_keywords
global cookie_seems_invalid, cookie_string, output_filename, num_cycles
global MAX_KEYWORDS_PER_CYCLE, KEYWORD_PROCESSING_STRATEGY
global MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE, MIN_SCORE_TO_ADD_NEXT_CYCLE
global BATCH_DELAY_MIN, BATCH_DELAY_MAX, CONCURRENCY_LIMIT
print(f"n准备就绪,开始异步处理关键词,共计 {num_cycles} 轮循环。")
print(f"结果将追加写入到文件: {output_filename}")
if os.path.exists(output_filename): print(f"注意:文件 {output_filename} 已存在,将在其后追加内容。最终会进行去重。")
urllib3.disable_warnings(InsecureRequestWarning) # 禁用requests的SSL警告(如果还用到的话)
semaphore = asyncio.Semaphore(CONCURRENCY_LIMIT) # 创建信号量控制并发
# 创建httpx异步客户端,整个异步流程中复用
async with httpx.AsyncClient(http2=True, verify=False, timeout=request_timeout) as client:
for cycle in range(1, num_cycles + 1): # 外层轮次循环
if cookie_seems_invalid: # 如果标记了Cookie失效
print("n警告:Cookie 可能已失效,建议停止脚本并重新登录获取新 Cookie。")
break # 结束所有轮次
print(f"n--- 开始第 {cycle}/{num_cycles} 轮 ---")
print(f"本轮开始时,待处理源关键词总数: {len(current_keywords_for_cycle)}")
if not current_keywords_for_cycle: print("没有新的源关键词需要处理,结束循环。"); break # 没有词了就结束
# --- 1. 限制每轮处理的关键词数量 ---
keywords_this_cycle_actual_process_list = list(current_keywords_for_cycle) # 默认处理全部
if MAX_KEYWORDS_PER_CYCLE is not None and len(current_keywords_for_cycle) > MAX_KEYWORDS_PER_CYCLE:
print(f"待处理关键词数量 ({len(current_keywords_for_cycle)}) 超过上限 ({MAX_KEYWORDS_PER_CYCLE}),将进行筛选。")
temp_list_for_selection = list(current_keywords_for_cycle)
if KEYWORD_PROCESSING_STRATEGY == 'random': # 如果是随机策略
keywords_this_cycle_actual_process_list = random.sample(temp_list_for_selection, MAX_KEYWORDS_PER_CYCLE)
print(f"已随机选取 {MAX_KEYWORDS_PER_CYCLE} 个关键词进行处理。")
else: # 默认 'first' 策略,取列表前面的
keywords_this_cycle_actual_process_list = temp_list_for_selection[:MAX_KEYWORDS_PER_CYCLE]
print(f"已选取前 {MAX_KEYWORDS_PER_CYCLE} 个关键词进行处理。")
# ------------------------------------
total_this_cycle_for_display = len(keywords_this_cycle_actual_process_list) # 用于显示的本轮总数
processed_keywords_display_count = 0 # 用于显示的已处理进度
apis_actually_called_this_cycle = 0 # 记录本轮实际发起的API请求数
suggestions_written_this_cycle = 0 # 记录本轮实际写入文件的建议数
next_cycle_keywords_collector.clear() # 清空上一轮收集的下一轮候选词
tasks_for_gather = [] # 存储本批次要并发执行的task
keywords_in_current_batch = [] # 存储与tasks_for_gather对应的源关键词
# --- 遍历本轮要处理的关键词,创建任务 ---
for source_keyword in keywords_this_cycle_actual_process_list: # 使用筛选后的列表
processed_keywords_display_count += 1 # 更新显示进度
# 如果该源词之前已经作为源词处理过,则跳过
if source_keyword in all_processed_source_keywords:
print(f" 跳过已作为源词处理过的 ({processed_keywords_display_count}/{total_this_cycle_for_display}): {source_keyword}")
continue
# 创建异步任务
task = asyncio.create_task(fetch_suggestions(client, semaphore, source_keyword, cookie_string))
tasks_for_gather.append(task)
keywords_in_current_batch.append(source_keyword) # 记录这个关键词进入了当前批次
apis_actually_called_this_cycle += 1
# --- 执行本轮创建的所有任务 ---
if not tasks_for_gather: # 如果过滤或筛选后没有任务了
print("本轮筛选或过滤后没有需要请求的新关键词,进入下一轮准备。")
else:
print(f"本轮共创建 {len(tasks_for_gather)} 个新请求任务,开始并发执行 (并发数上限: {CONCURRENCY_LIMIT})...")
# 并发执行所有任务,return_exceptions=True 使得一个任务的失败不影响其他任务
results_from_gather = await asyncio.gather(*tasks_for_gather, return_exceptions=True)
print(f"本轮 {len(tasks_for_gather)} 个任务执行完毕,开始处理结果...")
# --- 处理并发任务的结果 ---
all_suggestions_to_write_this_cycle = [] # 收集本轮所有成功的建议,以便一次性写入
failed_source_keywords_this_cycle = [] # 记录本轮处理失败的源关键词
processed_results_display_count = 0 # 用于结果处理的进度显示
for i, result_item in enumerate(results_from_gather):
processed_results_display_count +=1
current_source_keyword = keywords_in_current_batch[i] # 获取对应的源关键词
if isinstance(result_item, Exception): # 如果 gather 的结果是异常
print(f" 处理结果出错 ({processed_results_display_count}/{apis_actually_called_this_cycle}) 源关键词 '{current_source_keyword}': {result_item}")
failed_source_keywords_this_cycle.append(current_source_keyword)
elif result_item is None: # 如果 fetch_suggestions 返回 None (重试耗尽或特定错误)
print(f" 处理结果失败 ({processed_results_display_count}/{apis_actually_called_this_cycle}) 源关键词 '{current_source_keyword}' (已达最大重试次数或JSON解析/403错误)")
failed_source_keywords_this_cycle.append(current_source_keyword)
else: # result_item 是一个包含 (content, score) 元组的列表
if result_item: # 如果列表非空,表示找到了建议
all_suggestions_to_write_this_cycle.extend(result_item) # 先全部收集用于写入
# --- 2. 限制添加到下一轮的推荐词数量/质量 ---
suggestions_added_to_next_count = 0
for content, score in result_item: # API返回的通常已按相关性排序
if score >= MIN_SCORE_TO_ADD_NEXT_CYCLE: # 检查分数是否达标
if content not in all_processed_source_keywords: # 未被作为源词处理过
next_cycle_keywords_collector.add(content) # 加入下一轮候选
suggestions_added_to_next_count += 1
# 如果达到了本源词的最大添加数,则停止添加
if MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE is not None and
suggestions_added_to_next_count >= MAX_SUGGESTIONS_TO_ADD_NEXT_CYCLE:
break
if suggestions_added_to_next_count > 0:
print(f" 源关键词 '{current_source_keyword}' -> 添加了 {suggestions_added_to_next_count} 个有效建议到下一轮候选。")
# else: # 列表为空,表示API成功返回但没有建议 (减少日志)
# print(f" 处理结果成功 ({processed_results_display_count}/{apis_actually_called_this_cycle}) 源关键词 '{current_source_keyword}' (未找到建议)")
all_processed_source_keywords.add(current_source_keyword) # 标记该源关键词已被处理
# --- 批量写入文件 ---
if all_suggestions_to_write_this_cycle:
try:
with open(output_filename, 'a', encoding='utf-8') as f:
for content, score in all_suggestions_to_write_this_cycle: f.write(f"{content},{score}n")
suggestions_written_this_cycle = len(all_suggestions_to_write_this_cycle)
except Exception as e: print(f"错误:写入文件 '{output_filename}' 时发生错误: {e}")
# --- 加入批次延时 ---
if cycle < num_cycles and apis_actually_called_this_cycle > 0 : # 只有实际请求了且不是最后一轮才延时
batch_delay = random.uniform(BATCH_DELAY_MIN, BATCH_DELAY_MAX)
print(f"n处理完一批并发任务,随机暂停 {batch_delay:.2f} 秒...")
await asyncio.sleep(batch_delay)
# =====================
# --- 准备下一轮 ---
print(f"n--- 第 {cycle}/{num_cycles} 轮结束 ---")
print(f"本轮实际请求API的源关键词数: {apis_actually_called_this_cycle}")
print(f"本轮共写入建议条数: {suggestions_written_this_cycle}")
print(f"失败或未找到建议的源关键词数: {len(failed_source_keywords_this_cycle)}")
print(f"收集到下一轮潜在源关键词数量: {len(next_cycle_keywords_collector)}")
current_keywords_for_cycle = next_cycle_keywords_collector - all_processed_source_keywords # 确保下一轮的源词是全新的
print(f"准备用于下一轮的新源关键词数: {len(current_keywords_for_cycle)}")
if CONCURRENCY_LIMIT > 1 : await asyncio.sleep(0.1) # 并发时短暂释放CPU,避免100%占用
print(f"n--- 所有 {num_cycles} 轮循环处理完毕 (异步) ---")
# --- 脚本入口 ---
if __name__ == "__main__":
print("--- 初始化和登录检查 (同步执行) ---")
current_keywords_for_cycle = set(initial_keywords) # 本轮要处理的源关键词
next_cycle_keywords_collector = set() # 收集下一轮候选的推荐词
all_processed_source_keywords = set() # 记录所有已作为源词处理过的关键词
cookie_seems_invalid = False # Cookie失效标志
cookie_string = None # 全局Cookie字符串
loaded_cookies = load_cookies() # 尝试加载本地Cookie
needs_login = True # 是否需要登录标志
playwright_successful = False # Playwright操作是否成功标志
try:
# 使用异步包装函数来执行异步的Playwright操作
async def sync_wrapper_for_playwright():
global cookie_string, needs_login # 声明要修改全局变量
async with async_playwright() as p: # 启动异步Playwright
if loaded_cookies: # 如果加载到本地Cookie
is_valid = await async_check_login_status(p, loaded_cookies) # 异步验证Cookie
if is_valid:
cookie_string = format_cookies_for_requests(loaded_cookies)
needs_login = False; print("使用文件中有效的 Cookie 继续。")
else: # Cookie无效
print("文件中的 Cookie 无效或已过期。")
if os.path.exists(cookie_filename):
try: os.remove(cookie_filename); print(f"已删除无效 Cookie 文件: {cookie_filename}")
except OSError as e: print(f"删除无效 Cookie 文件 {cookie_filename} 时出错: {e}")
if needs_login: # 如果需要登录 (无本地Cookie或本地Cookie无效)
print("需要登录以获取新的 Cookie。")
new_cookies = await async_get_cookies(p) # 异步获取新Cookie
if new_cookies:
save_cookies(new_cookies); cookie_string = format_cookies_for_requests(new_cookies)
print("已获取并保存新的 Cookie。")
else: print("错误:无法获取有效的 Cookie,脚本无法继续执行。"); return False # 获取失败
if not cookie_string: print("错误:未能准备好有效的 Cookie 字符串,脚本终止。"); return False # Cookie准备失败
return True # Playwright操作成功
playwright_successful = asyncio.run(sync_wrapper_for_playwright()) # 同步运行异步包装函数
except Exception as e_pw_init: # 捕获Playwright初始化或执行中的异常
print(f"初始化或执行 Playwright 操作时发生错误: {e_pw_init}")
playwright_successful = False # 标记失败
if playwright_successful: # 如果Playwright操作成功(Cookie已准备好)
print("n--- 开始执行异步关键词处理 ---")
try:
asyncio.run(run_async_tasks()) # 运行核心的异步任务处理逻辑
except KeyboardInterrupt: print("n捕获到中断信号,正在尝试停止...") # 处理用户中断
except Exception as e_async_main: # 捕获异步任务执行中的其他异常
print(f"n异步任务执行过程中发生错误: {e_async_main}")
import traceback
traceback.print_exc() # 打印详细的错误堆栈信息
finally: # 无论异步任务是否成功,都执行最终的去重操作
deduplicate_output_file(output_filename) # 对输出文件进行最终去重和排序
print(f"n脚本执行完毕。")
print(f"总共处理过的独立源关键词数量 (发起过API请求的): {len(all_processed_source_keywords)}")
print(f"请检查最终结果文件 (已去重并优化分数): {output_filename}")
print(f"使用的 Cookie 保存在: {cookie_filename}")
else: # 如果Playwright操作失败
print("n由于未能成功获取或验证 Cookie,脚本已终止。")
评论