先用后羿采集器去、,等空间网络搜索引擎上找正在运行的ollama服务器,整理好了IP地址以后存到TXT里,然后运行下面的PY代码来批量自动验证。
以下是源码:
import requests
import json
import time
import os
import concurrent.futures
import threading
# 全局变量
MODEL_TO_TEST = "deepseek-r1:70b"
TIMEOUT = 120
def fetch_models(server_address):
server_url = f"http://{server_address}/api/tags"
try:
response = requests.get(server_url, timeout=TIMEOUT)
response.raise_for_status()
tags = response.json()
if not isinstance(tags, dict) or not tags.get("models"):
return []
models = [tag["model"] for tag in tags.get("models", [])]
return models
except requests.exceptions.RequestException:
return []
def test_model(server_address, model_name):
server_url = f"http://{server_address}/api/generate"
payload = {
"model": model_name,
"prompt": "1+2+3+4一直加到100等于多少,直接给答案,不要思考过程,不要废话,说人话"
}
start_time = time.time()
try:
response = requests.post(server_url, data=json.dumps(payload), headers={'Content-Type': 'application/json'}, timeout=TIMEOUT)
response.raise_for_status()
end_time = time.time()
duration = end_time - start_time
return duration
except requests.exceptions.RequestException:
return None
def test_server(server_address):
server_address = server_address.strip()
if server_address:
# 获取模型列表
models = fetch_models(server_address)
# 检查是否包含指定模型并测试
if MODEL_TO_TEST in models:
duration = test_model(server_address, MODEL_TO_TEST)
if duration is not None and duration <= TIMEOUT: # 将总耗时阈值改为120秒
return f"{server_address} {duration:.2f}秒"
return None
def main():
# 创建一个集合来存储已验证的服务器,避免重复
verified_servers = set()
# 检查文件是否存在,如果不存在则创建一个空文件
if not os.path.exists("ollama_server_list.txt"):
with open("ollama_server_list.txt", "w", encoding="utf-8") as file:
file.write("")
# 从文件中读取已验证的服务器
if os.path.exists("good_ollama_server.txt"):
with open("good_ollama_server.txt", "r", encoding="utf-8") as file:
for line in file:
verified_servers.add(line.strip())
# 从文件中读取服务器列表
with open("ollama_server_list.txt", "r", encoding="utf-8") as file:
server_list = file.read().strip().split("\n")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_server = {executor.submit(test_server_with_timeout, server_address): server_address for server_address in server_list}
for future in concurrent.futures.as_completed(future_to_server):
server_address = future_to_server[future]
try:
result = future.result()
if result and result not in verified_servers:
verified_servers.add(result)
with open("good_ollama_server.txt", "a", encoding="utf-8") as file:
file.write(result + "\n")
print(result)
except Exception:
pass
def test_server_with_timeout(server_address):
def worker():
nonlocal result
result = test_server(server_address)
result = None
thread = threading.Thread(target=worker)
thread.start()
thread.join(timeout=TIMEOUT)
if thread.is_alive():
return None
return result
if __name__ == "__main__":
print("开始测试...\n")
main()
print("服务器测试完成,结果已保存到 good_ollama_server.txt")

评论