import requests
import csv
import uuid
import random
from datetime import datetime
# API credentials
API_PRIVATE_KEY = "your_private_key"
API_PUBLIC_KEY = "your_public_key"
BASE_URL = "https://api.pingproxies.com/1.0/public/user"
# Headers for authentication
headers = {
"X-API-Public-Key": API_PUBLIC_KEY,
"X-API-Private-Key": API_PRIVATE_KEY,
"Content-Type": "application/json"
}
def read_proxies_from_file(file_path):
"""
Read proxy list from a text file
Args:
file_path: Path to the text file containing proxies
Returns:
List of proxy strings or empty list if file reading failed
"""
try:
with open(file_path, "r") as file:
proxies = [line.strip() for line in file.readlines() if line.strip()]
print(f"Successfully loaded {len(proxies)} proxies from {file_path}")
return proxies
except FileNotFoundError:
print(f"Error: File '{file_path}' not found")
return []
except Exception as e:
print(f"Error reading file: {e}")
return []
def get_available_test_servers():
"""
Get list of active proxy test servers
Returns:
List of available test servers or None if request failed
"""
response = requests.get(
f"{BASE_URL}/proxy_test_server/search?proxy_test_server_active=true",
headers=headers
)
if response.status_code != 200:
print(f"Error getting test servers: {response.status_code}")
print(response.text)
return None
data = response.json()
return data["data"]
def create_bulk_proxy_test(proxy_strings, test_server_id, target_url="https://pingproxies.com"):
"""
Create a proxy test for multiple proxies against a target URL
Args:
proxy_strings: List of proxy strings in format "host:port:username:password"
test_server_id: ID of the proxy test server to use
target_url: URL to test the proxies against
Returns:
Test run ID or None if creation failed
"""
test_run_id = str(uuid.uuid4())
# Create proxy list with unique IDs
proxies = []
for proxy_string in proxy_strings:
proxy_id = str(uuid.uuid4())
proxies.append({"proxy_id": proxy_id, "proxy_string": proxy_string})
# Create the request payload
payload = {
"proxies": proxies,
"proxy_tester_server_id": [test_server_id],
"proxy_test_run_id": test_run_id,
"urls": [target_url]
}
# Make request to create the proxy test
response = requests.post(
f"{BASE_URL}/proxy_test_run/create",
json=payload,
headers=headers
)
if response.status_code != 200:
print(f"Error: {response.status_code}")
print(response.text)
return None
data = response.json()
return data
def write_results_to_csv(results, output_file):
"""
Write proxy test results to CSV file
Args:
results: The test results from the API
output_file: Path to the output CSV file
"""
if not results or "data" not in results:
print("No test results to write")
return
# Define CSV headers
headers_csv = [
"proxy_ip",
"proxy_port",
"status_code",
"response_time_ms",
"proxy_city",
"proxy_country",
]
try:
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
# Write headers
writer.writerow(headers_csv)
# Write data rows
for test_run_id, test_data in results["data"].items():
if "results" not in test_data:
continue
for result in test_data["results"]:
row = [
result.get("proxy_tester_proxy_host", ""),
result.get("proxy_tester_proxy_port", ""),
result.get("proxy_test_result_status_code", ""),
result.get("proxy_test_result_response_time", ""),
result.get("proxy_tester_proxy_city_name", ""),
result.get("proxy_tester_proxy_country_id", "").upper(),
]
writer.writerow(row)
print(f"Results successfully written to {output_file}")
except Exception as e:
print(f"Error writing to CSV: {e}")
# Example usage
if __name__ == "__main__":
# Configuration
PROXY_FILE = "proxies.txt" # Text file with one proxy per line in format host:port:username:password
OUTPUT_CSV = f"proxy_test_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
TARGET_URL = "https://pingproxies.com"
BATCH_SIZE = 20
# Step 1: Read proxies from file
print("Reading proxies from file...")
proxy_list = read_proxies_from_file(PROXY_FILE)
if not proxy_list:
print("No proxies found in file. Please check the file exists and contains valid proxy strings.")
exit(1)
# Step 2: Get available test servers
print("Getting available test servers...")
servers = get_available_test_servers()
if not servers:
print("Failed to get test servers")
exit(1)
# Use the first available server
test_server_id = random.choice(servers)["proxy_test_server_id"]
print(f"Using test server: {test_server_id}")
# Step 3: Process proxies in batches
all_results = {"data": {}}
for i in range(0, len(proxy_list), BATCH_SIZE):
batch = proxy_list[i : i + BATCH_SIZE]
batch_num = (i // BATCH_SIZE) + 1
total_batches = (len(proxy_list) + BATCH_SIZE - 1) // BATCH_SIZE
print(f"\nProcessing batch {batch_num}/{total_batches} ({len(batch)} proxies)...")
# Create the proxy test for this batch
batch_results = create_bulk_proxy_test(batch, test_server_id, TARGET_URL)
if not batch_results:
print(f"Failed to create proxy test for batch {batch_num}")
continue
if batch_results and "data" in batch_results:
# Merge results
all_results["data"].update(batch_results["data"])
print(f"✅ Batch {batch_num} completed successfully")
else:
print(f"❌ Failed to retrieve results for batch {batch_num}")
# Step 4: Write all results to CSV
if all_results["data"]:
print(f"\nWriting all results to CSV...")
write_results_to_csv(all_results, OUTPUT_CSV)
print(f"\n✅ Testing complete! Results saved to: {OUTPUT_CSV}")
else:
print("No results to write to CSV")