Python批量验证ollama服务器上可用的deepseek模型

其他杂项32字数 2651阅读8分50秒阅读模式

先用后羿采集器去zoomeyefofa,等空间网络搜索引擎上找正在运行的ollama服务器,整理好了IP地址以后存到TXT里,然后运行下面的PY代码来批量自动验证。

以下是源码:

import requests
import json
import time
import os
import concurrent.futures
import threading

# 全局变量
MODEL_TO_TEST = "deepseek-r1:70b"
TIMEOUT = 120

def fetch_models(server_address):
    server_url = f"http://{server_address}/api/tags"
    
    try:
        response = requests.get(server_url, timeout=TIMEOUT)
        response.raise_for_status()
        tags = response.json()
        
        if not isinstance(tags, dict) or not tags.get("models"):
            return []
        models = [tag["model"] for tag in tags.get("models", [])]
        return models
    except requests.exceptions.RequestException:
        return []

def test_model(server_address, model_name):
    server_url = f"http://{server_address}/api/generate"
    payload = {
        "model": model_name,
        "prompt": "1+2+3+4一直加到100等于多少,直接给答案,不要思考过程,不要废话,说人话"
    }
    
    start_time = time.time()
    
    try:
        response = requests.post(server_url, data=json.dumps(payload), headers={'Content-Type': 'application/json'}, timeout=TIMEOUT)
        response.raise_for_status()
        
        end_time = time.time()
        duration = end_time - start_time
        
        return duration
    except requests.exceptions.RequestException:
        return None

def test_server(server_address):
    server_address = server_address.strip()
    if server_address:
        # 获取模型列表
        models = fetch_models(server_address)
        
        # 检查是否包含指定模型并测试
        if MODEL_TO_TEST in models:
            duration = test_model(server_address, MODEL_TO_TEST)
            if duration is not None and duration <= TIMEOUT:  # 将总耗时阈值改为120秒
                return f"{server_address}    {duration:.2f}秒"
            
    return None

def main():
    # 创建一个集合来存储已验证的服务器,避免重复
    verified_servers = set()

    # 检查文件是否存在,如果不存在则创建一个空文件
    if not os.path.exists("ollama_server_list.txt"):
        with open("ollama_server_list.txt", "w", encoding="utf-8") as file:
            file.write("")
    
    # 从文件中读取已验证的服务器
    if os.path.exists("good_ollama_server.txt"):
        with open("good_ollama_server.txt", "r", encoding="utf-8") as file:
            for line in file:
                verified_servers.add(line.strip())

    # 从文件中读取服务器列表
    with open("ollama_server_list.txt", "r", encoding="utf-8") as file:
        server_list = file.read().strip().split("\n")

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_server = {executor.submit(test_server_with_timeout, server_address): server_address for server_address in server_list}
        
        for future in concurrent.futures.as_completed(future_to_server):
            server_address = future_to_server[future]
            try:
                result = future.result()
                if result and result not in verified_servers:
                    verified_servers.add(result)
                    with open("good_ollama_server.txt", "a", encoding="utf-8") as file:
                        file.write(result + "\n")
                    print(result)
            except Exception:
                pass

def test_server_with_timeout(server_address):
    def worker():
        nonlocal result
        result = test_server(server_address)
    
    result = None
    thread = threading.Thread(target=worker)
    thread.start()
    thread.join(timeout=TIMEOUT)
    
    if thread.is_alive():
        return None
    return result

if __name__ == "__main__":
    print("开始测试...\n")
    main()
    print("服务器测试完成,结果已保存到 good_ollama_server.txt")

 最后更新:2025-2-20
  • 本文由 asdfasd 发表于 2025-02-1820:42:19
  • 转载请务必保留本文链接:http://wp.fangfa.me/other-note/python%e6%89%b9%e9%87%8f%e9%aa%8c%e8%af%81ollama%e6%9c%8d%e5%8a%a1%e5%99%a8%e4%b8%8a%e5%8f%af%e7%94%a8%e7%9a%84deepseek%e6%a8%a1%e5%9e%8b.html