测试了一下在RTX2060跑代码速度,结果非常明显,看来还是要搞一块好显卡啊
结果:
============================== 性能对比总结 ============================== GPU 总耗时: 13.07 秒 CPU 总耗时: 89.01 秒
CPU 耗时是 GPU 耗时的 6.81 倍。
import pandas as pd
from sentence_transformers import SentenceTransformer, util
import numpy as np
import torch
import time
import os
from collections import defaultdict, Counter # 引入 Counter 用于计数
import jieba # 引入 jieba 分词
import jieba.posseg as pseg # 引入 jieba 词性标注
import re # 正则表达式,用于过滤
# --- 配置参数 (保持不变) ---
CSV_FILE_PATH = r'Sheet1.csv'
KEYWORD_COLUMN = '关键词'
HEAT_COLUMN = '热度'
LENGTH_COLUMN = '长度'
STOPWORDS_FILE_PATH = r'chinese_stopwords.txt'
TOP_N_PHRASES = 20
NGRAM_RANGE = (2, 3)
ALLOWED_POS = {'n', 'v', 'j', 'vn', 'a', 'nz', 'nr', 'ns', 'nt', 'eng'}
MIN_PHRASE_LENGTH = 2
MODEL_NAME = 'paraphrase-multilingual-mpnet-base-v2'
SIMILARITY_THRESHOLD = 0.80
MIN_CLUSTER_SIZE = 2
ENCODE_BATCH_SIZE = 128
CLUSTER_BATCH_SIZE = 256
# --- 辅助函数 (保持不变) ---
# load_stopwords, clean_and_segment, get_ngrams, extract_high_freq_phrases
# load_data, generate_embeddings, find_clusters_and_map_data
# (这些函数定义与上一个版本完全相同,为简洁省略,请确保它们存在于代码中)
def load_stopwords(filepath):
"""加载停用词表文件"""
stopwords = set()
if filepath and os.path.exists(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
word = line.strip()
if word:
stopwords.add(word)
print(f"成功加载 {len(stopwords)} 个停用词从 '{filepath}'")
except Exception as e:
print(f"加载停用词文件 '{filepath}' 时出错: {e}")
else:
print(f"警告:停用词文件路径 '{filepath}' 未指定或文件不存在,不使用停用词过滤。")
return stopwords
stopwords_set = load_stopwords(STOPWORDS_FILE_PATH)
def clean_and_segment(text, stopwords, allowed_pos_tags):
"""对文本进行分词、过滤停用词和词性筛选"""
# 确保输入是字符串
text = str(text) if not isinstance(text, str) else text
seg_list = pseg.cut(text)
filtered_words = []
for word, flag in seg_list:
main_pos = flag.split('.')[0] # 获取主要词性
# 增加对空字符串的检查
word_stripped = word.strip()
if word_stripped and word_stripped not in stopwords and main_pos in allowed_pos_tags:
# 改进过滤逻辑:不是纯数字、不包含特殊符号(允许字母和汉字)
if not re.fullmatch(r'd+(.d+)?%?', word_stripped) and re.match(r'^[u4e00-u9fa5a-zA-Z]+$', word_stripped):
filtered_words.append(word_stripped)
return filtered_words
def get_ngrams(words, n_range=(2, 3)):
"""从词列表中提取 N-grams 短语"""
ngrams = []
min_n, max_n = n_range
for n in range(min_n, max_n + 1):
if len(words) >= n: # 确保有足够的词
for i in range(len(words) - n + 1):
ngram_tuple = tuple(words[i:i+n])
ngram_str = "".join(ngram_tuple) # 直接连接
ngrams.append(ngram_str)
return ngrams
def extract_high_freq_phrases(dataframe, keyword_col, stopwords, allowed_pos_tags, ngram_r, min_phrase_len, top_n):
"""提取数据中高频的核心短语"""
print(f"n开始提取高频核心短语 (N-grams: {ngram_r})...")
phrase_counter = Counter()
total_keywords = len(dataframe)
processed_count = 0
for keyword_text in dataframe[keyword_col]:
if pd.isna(keyword_text):
continue
filtered_words = clean_and_segment(str(keyword_text), stopwords, allowed_pos_tags)
if len(filtered_words) >= ngram_r[0]:
phrases = get_ngrams(filtered_words, ngram_r)
valid_phrases =
phrase_counter.update(valid_phrases)
processed_count += 1
if processed_count % 1000 == 0 or processed_count == total_keywords:
print(f"已处理 {processed_count}/{total_keywords} 个关键词用于高频短语提取...")
print("核心短语提取和计数完成。")
print(f"共找到 {len(phrase_counter)} 个不同的核心短语 (满足最小长度)。")
top_phrases_list = phrase_counter.most_common(top_n)
print(f"提取到 Top {len(top_phrases_list)} 高频核心短语。")
return top_phrases_list
def load_data(file_path, keyword_col, heat_col, length_col):
"""从 CSV 文件加载指定列,并进行基本清洗。"""
print(f"n开始从 '{file_path}' 文件加载数据...")
if not os.path.exists(file_path):
print(f"错误:找不到 CSV 文件 '{file_path}'")
raise FileNotFoundError(f"错误:找不到 CSV 文件 {file_path}")
try:
df = pd.read_csv(file_path, header=0, encoding='utf-8-sig',
usecols=[keyword_col, heat_col, length_col],
on_bad_lines='warn', engine='python')
print(f"原始数据加载完成,共 {len(df)} 行。")
df[keyword_col] = df[keyword_col].astype(str).str.strip()
df.dropna(subset=[keyword_col], inplace=True)
df = df[df[keyword_col] != '']
print(f"清洗后 (关键词非空): {len(df)} 行。")
df[heat_col] = pd.to_numeric(df[heat_col], errors='coerce')
df[length_col] = pd.to_numeric(df[length_col], errors='coerce')
original_count_before_numeric_dropna = len(df)
df.dropna(subset=[heat_col, length_col], inplace=True)
dropped_for_numeric = original_count_before_numeric_dropna - len(df)
if dropped_for_numeric > 0:
print(f"因 '{heat_col}' 或 '{length_col}' 列包含无法转换为数值的内容,已删除 {dropped_for_numeric} 行。")
df[heat_col] = df[heat_col].astype(int)
df[length_col] = df[length_col].astype(int)
print(f"清洗后 (热度/长度为有效数值): {len(df)} 行。")
if df.empty:
print("错误:数据清洗后 DataFrame 为空。")
return None, None
unique_keywords = df[keyword_col].unique().tolist()
print(f"提取到 {len(unique_keywords)} 个唯一的关键词用于生成 Embeddings。")
return df, unique_keywords
except Exception as e:
print(f"加载或处理数据时出错: {e}")
raise
def generate_embeddings(unique_keywords, model_name, device_to_use, batch_size):
"""为唯一的关键词列表生成句子嵌入。"""
print(f"n开始加载句子转换器模型: {model_name}...")
try:
model = SentenceTransformer(model_name, device=device_to_use)
print(f"模型已加载到: {device_to_use}")
except Exception as e:
print(f"加载模型 '{model_name}' 时出错: {e}")
raise
print(f"开始为 {len(unique_keywords)} 个唯一关键词生成词向量 (使用 {device_to_use}, batch_size={batch_size})...")
start_time = time.time()
try:
embeddings = model.encode(
unique_keywords,
convert_to_tensor=True,
show_progress_bar=True,
device=device_to_use,
batch_size=batch_size
)
except torch.cuda.OutOfMemoryError:
print("n错误:GPU 显存不足!请尝试减小配置文件中的 ENCODE_BATCH_SIZE。")
raise
except Exception as e:
print(f"生成词向量时出错: {e}")
raise
end_time = time.time()
time_taken = end_time - start_time
print(f"词向量生成完毕,耗时 {time_taken:.2f} 秒。")
# 返回 CPU 上的 embeddings 和生成时间
return embeddings.cpu(), time_taken
def find_clusters_and_map_data(original_df, unique_keywords, embeddings_cpu, threshold, min_size, cluster_batch_size, device_for_clustering, keyword_col, heat_col, length_col):
"""执行聚类并将结果映射回包含热度和长度的原始数据。"""
print(f"n开始聚类,相似度阈值 >= {threshold}, 最小规模={min_size}, 聚类批大小={cluster_batch_size}...")
start_time = time.time()
try:
embeddings_on_device = embeddings_cpu.to(device_for_clustering)
clusters_indices = util.community_detection(
embeddings_on_device,
min_community_size=min_size,
threshold=threshold,
batch_size=cluster_batch_size
)
except torch.cuda.OutOfMemoryError:
print("n错误:在聚类计算过程中 GPU 显存不足!请尝试减小配置文件中的 CLUSTER_BATCH_SIZE。")
raise
except Exception as e:
print(f"聚类计算时出错: {e}")
raise
end_time = time.time()
time_taken = end_time - start_time
print(f"聚类算法执行完毕,耗时 {time_taken:.2f} 秒。")
print(f"找到 {len(clusters_indices)} 个初步聚类。开始映射回原始数据...")
# --- 数据映射逻辑 (与之前相同) ---
final_clusters_with_data = []
keyword_to_original_records_map = defaultdict(list)
for record in original_df.to_dict('records'):
keyword_to_original_records_map[record[keyword_col]].append(record)
processed_unique_keyword_indices_count = 0
total_unique_keywords_in_detected_clusters = sum(len(c_indices) for c_indices in clusters_indices)
for i, cluster_of_unique_keyword_indices in enumerate(clusters_indices):
current_cluster_all_original_data = []
for unique_kw_idx in cluster_of_unique_keyword_indices:
processed_unique_keyword_indices_count += 1
if unique_kw_idx < len(unique_keywords):
unique_keyword_text = unique_keywords[unique_kw_idx]
original_records_for_this_keyword = keyword_to_original_records_map.get(unique_keyword_text, [])
current_cluster_all_original_data.extend(original_records_for_this_keyword)
else:
print(f"警告:聚类索引 {unique_kw_idx} 超出唯一关键词列表范围 ({len(unique_keywords)}),已跳过。")
if current_cluster_all_original_data:
current_cluster_all_original_data.sort(key=lambda x: (-x[heat_col], x[keyword_col]))
final_clusters_with_data.append(current_cluster_all_original_data)
print(f"数据映射完成。共处理 {processed_unique_keyword_indices_count}/{total_unique_keywords_in_detected_clusters} 个唯一关键词索引加入到最终聚类中。")
print(f"最终得到 {len(final_clusters_with_data)} 个包含完整数据的聚类。")
# 返回聚类结果和聚类/映射所需的时间
return final_clusters_with_data, time_taken
# --- 新增:封装核心处理流程的函数 ---
def run_pipeline(target_device):
"""
执行完整的数据加载、高频词提取、聚类流程,并返回总耗时。
:param target_device: 'cuda' 或 'cpu'
:return: 该设备下的总执行时间 (秒), 或者在出错时返回 None
"""
run_start_time = time.time()
print(f"n{'='*20} 开始使用 {target_device.upper()} 设备运行 {'='*20}")
try:
# === 第 1 部分:加载数据 ===
original_dataframe, unique_keywords_list = load_data(CSV_FILE_PATH, KEYWORD_COLUMN, HEAT_COLUMN, LENGTH_COLUMN)
if original_dataframe is None or original_dataframe.empty:
print("未能加载有效数据,此设备运行终止。")
return None
# === 第 2 部分:提取高频核心短语 (这部分总是在 CPU 上运行) ===
# 注意:提取高频词的时间会计入总时间,但它不受 target_device 参数影响
start_hf_time = time.time()
top_phrases_results = extract_high_freq_phrases(
original_dataframe, KEYWORD_COLUMN, stopwords_set, ALLOWED_POS,
NGRAM_RANGE, MIN_PHRASE_LENGTH, TOP_N_PHRASES
)
end_hf_time = time.time()
print(f"高频短语提取耗时: {end_hf_time - start_hf_time:.2f} 秒")
# --- 打印高频核心短语结果 ---
print(f"n--- Top {len(top_phrases_results)} 高频核心短语 ---")
if not top_phrases_results:
print("未能提取到任何核心短语。")
else:
for phrase, count in top_phrases_results:
print(f"- "{phrase}" (出现次数: {count})")
# === 第 3 部分:执行语义聚类 (使用指定的 target_device) ===
if not unique_keywords_list:
print("n未能提取唯一关键词,无法进行聚类。")
embeddings_time = 0
clustering_time = 0
found_clusters_with_data = []
else:
# 生成词向量
# generate_embeddings 返回 (embeddings_cpu, time_taken)
unique_keyword_embeddings_cpu, embeddings_time = generate_embeddings(
unique_keywords_list, MODEL_NAME, target_device, ENCODE_BATCH_SIZE
)
# 执行聚类并映射数据
# find_clusters_and_map_data 返回 (clusters_data, time_taken)
found_clusters_with_data, clustering_time = find_clusters_and_map_data(
original_dataframe, unique_keywords_list, unique_keyword_embeddings_cpu,
SIMILARITY_THRESHOLD, MIN_CLUSTER_SIZE, CLUSTER_BATCH_SIZE,
target_device, # 聚类计算也使用目标设备
KEYWORD_COLUMN, HEAT_COLUMN, LENGTH_COLUMN
)
# --- 打印聚类结果 ---
print("n--- 关键词语义聚类结果 ---")
if not found_clusters_with_data:
print(f"未能找到满足条件的聚类。")
else:
found_clusters_with_data.sort(key=len, reverse=True)
# 为节省篇幅,只打印聚类数量信息
print(f"共找到 {len(found_clusters_with_data)} 个聚类分组。")
# 如果需要看详细结果,取消下面的注释
# for i, cluster_data_list in enumerate(found_clusters_with_data):
# unique_keywords_in_this_cluster = set(item[KEYWORD_COLUMN] for item in cluster_data_list)
# print(f"n分组 {i + 1} ({len(cluster_data_list)} 条记录, {len(unique_keywords_in_this_cluster)} 个唯一词):")
# for item_dict in cluster_data_list:
# print(f" - {item_dict[KEYWORD_COLUMN]} (热度: {item_dict[HEAT_COLUMN]}, 长度: {item_dict[LENGTH_COLUMN]})")
run_end_time = time.time()
total_run_time = run_end_time - run_start_time
print(f"n--- {target_device.upper()} 设备运行性能 ---")
print(f"词向量生成耗时: {embeddings_time:.2f} 秒")
print(f"聚类及映射耗时: {clustering_time:.2f} 秒")
print(f"高频词提取耗时: {end_hf_time - start_hf_time:.2f} 秒")
print(f"使用 {target_device.upper()} 运行总耗时: {total_run_time:.2f} 秒")
print(f"{'='*20} {target_device.upper()} 设备运行结束 {'='*20}n")
return total_run_time
except Exception as e:
run_end_time = time.time()
print(f"n在 {target_device.upper()} 设备上运行时发生错误: {e}")
import traceback
traceback.print_exc()
print(f"{'='*20} {target_device.upper()} 设备运行因错误终止 ({run_end_time - run_start_time:.2f} 秒) {'='*20}n")
return None # 返回 None 表示运行失败
# --- 主执行逻辑 (修改后) ---
if __name__ == "__main__":
overall_start_time = time.time()
gpu_total_time = None
cpu_total_time = None
# === GPU 运行 ===
if torch.cuda.is_available():
try:
# 清理 GPU 缓存 (可选,但有时有帮助)
torch.cuda.empty_cache()
print("n>>> 开始 GPU 运行...")
gpu_total_time = run_pipeline(target_device='cuda')
except Exception as e:
print(f"nGPU 运行期间发生顶层错误: {e}")
gpu_total_time = None # 标记 GPU 运行失败
else:
print("nCUDA 不可用,跳过 GPU 运行。")
# === CPU 运行 ===
try:
print("n>>> 开始 CPU 运行...")
# 如果之前GPU运行占用了内存,这里理论上不需要清理CPU内存,但无害
cpu_total_time = run_pipeline(target_device='cpu')
except Exception as e:
print(f"nCPU 运行期间发生顶层错误: {e}")
cpu_total_time = None # 标记 CPU 运行失败
# === 结果对比 ===
print("n" + "="*30 + " 性能对比总结 " + "="*30)
if gpu_total_time is not None:
print(f"GPU 总耗时: {gpu_total_time:.2f} 秒")
else:
print("GPU 运行未成功完成或未执行。")
if cpu_total_time is not None:
print(f"CPU 总耗时: {cpu_total_time:.2f} 秒")
else:
print("CPU 运行未成功完成。")
if gpu_total_time is not None and cpu_total_time is not None and gpu_total_time > 0:
ratio = cpu_total_time / gpu_total_time
print(f"nCPU 耗时是 GPU 耗时的 {ratio:.2f} 倍。")
elif gpu_total_time == 0 and cpu_total_time is not None:
print("nGPU 耗时接近于 0,无法计算有意义的倍数。")
else:
print("n由于其中一次或两次运行未能成功完成,无法进行时间对比。")
overall_end_time = time.time()
print(f"n脚本总执行时间 (包括两次运行): {overall_end_time - overall_start_time:.2f} 秒。")
print("脚本执行结束。")

评论