該脚本需要安裝 requests、ping3 模塊。
pip3 install requests
pip3 install ping3
这个脚本的功能包括:
- 检查指定的 URL 是否可达,如果全部可达则等待 5 分钟后重新检查,否则执行其他代码。
- 如果有 URL 不可达,则执行其他代码,其中包括对指定 IP 地址进行 ping 测试,并找到延迟最小的 IP。
- 使用 Cloudflare API 更新指定域名的 DNS 记录,将延迟最小的 IP 设置为域名的 A 记录。
假設你有一個大陸VPS,通過該脚本可以測試中轉域名的IP是否可用,若不可用,則更新IP(自動設置為延遲最低的IP)。
import requests
import ping3
import time
api_key = ''
email = ""
domain = ''
prefix = ''
def check_urls(target_urls):
for target_url in target_urls:
try:
response = requests.get(target_url, timeout=2)
if response.status_code == 200:
print(f"Request to {target_url} successful. Status code: {response.status_code}")
else:
print(f"Request to {target_url} failed with status code: {response.status_code}")
return False
except Exception as e:
print(f"Error while requesting {target_url}: {e}")
return False
return True
def get_zone_id(domain):
headers = {
'X-Auth-Email': email,
'X-Auth-Key': api_key,
'Content-Type': 'application/json'
}
url = f'https://api.cloudflare.com/client/v4/zones?name={domain}'
response = requests.get(url, headers=headers)
return response.json()['result'][0]['id']
def get_record_id(domain, prefix):
zone_id = get_zone_id(domain) # 获取区域ID
headers = {
'X-Auth-Email': email,
'X-Auth-Key': api_key,
'Content-Type': 'application/json'
}
url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records'
response = requests.get(url, headers=headers)
records = response.json()['result']
for record in records:
if record['name'] == f"{prefix}.{domain}":
return record['id']
return None
def DNS_Update(domain, record_type, prefix, record_content):
headers = {
'X-Auth-Email': email,
'X-Auth-Key': api_key,
'Content-Type': 'application/json'
}
zone_id = get_zone_id(domain)
config_domain = prefix + '.' + domain
record_id = get_record_id(domain, prefix) # 获取记录ID
url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records'
# 构建数据
data = {
'type': record_type,
'name': config_domain,
'content': record_content,
'ttl': 120,
'proxied': False
}
# 如果记录已存在,则使用PUT请求更新记录
if record_id:
url += f'/{record_id}'
response = requests.put(url, headers=headers, json=data)
else:
# 否则,使用POST请求添加新记录
response = requests.post(url, headers=headers, json=data)
# 检查响应状态码并打印结果
if response.status_code == 200:
print('Successfully updated/added:', config_domain, record_type, record_content)
else:
print('Failed to update/add:', config_domain, record_type, record_content)
def test_ping(target_ip):
try:
# 发送 ICMP ping 请求并获取延迟
delay = ping3.ping(target_ip)
if delay is not None:
print(f"Ping to {target_ip} successful. Delay: {delay} ms")
return delay
else:
print(f"No response from {target_ip}.")
return 999
except Exception as e:
print(f"Error while pinging {target_ip}: {e}")
return 999
if __name__ == "__main__":
target_ips = ["8.8.8.8", '1.1.1.1']
target_urls = ['https://gosogle.com', 'https://yosutube.com']
# 检查 URL 是否正常,如果全部正常则等待 5 分钟后重复运行,否则执行其他代码
while True:
if check_urls(target_urls):
print("All URLs are reachable. Waiting for 5 minutes before rechecking.")
time.sleep(300) # 5 minutes
else:
print("Some URLs are not reachable. Running other code.")
delays = []
min_delay_ip = None # 用于存储当前最小延迟的IP
min_delay = float('inf') # 初始设定为无穷大
for target_ip in target_ips:
# 执行 ping 测试
delay = test_ping(target_ip)
delays.append(delay)
# 更新最小延迟的IP
if delay < min_delay:
min_delay = delay
min_delay_ip = target_ip
print(f"The IP with the minimum delay is: {min_delay_ip} with delay: {min_delay} ms")
DNS_Update(domain, 'A', prefix, min_delay_ip)