feat: AAAA records and json configuration

This commit is contained in:
Daniel Langbein 2023-03-29 10:51:05 +02:00
parent 8e76f6de1e
commit 27492ba1ef
5 changed files with 128 additions and 69 deletions

View File

@ -1,12 +1,12 @@
# netcup DNS
Update DNS records with your current external IP address using the netcup DNS API.
Update DNS A/AAAA records with your current external IP address using the netcup DNS API.
## Configuration
For each netcup customer, create a `.ini` configuration file.
For each netcup customer, create a `.json` configuration file inside `/etc/netcup-dpns`.
There is an [example configuration](cfg/example.ini).
There is an [example configuration](cfg/example.json).
## TODOs

View File

@ -1,12 +0,0 @@
;[credentials]
;customer = 123456
;api_key = abcdefghijklmnopqrstuvwxyz
;api_password = abcdefghijklmnopqrstuvwxyz
;
;[example.com]
;hostname = @
;type = A
;
;[foo.bar]
;hostname = @
;type = A

25
cfg/example.json Normal file
View File

@ -0,0 +1,25 @@
{
"disabled": true,
"customer": 123456,
"api_key": "abcdefghijklmnopqrstuvwxyz",
"api_password": "abcdefghijklmnopqrstuvwxyz",
"records": [
{
"domain": "example.com",
"hostname": "@",
"type": "A"
},
{
"domain": "example.com",
"hostname": "@",
"type": "AAAA"
},
{
"domain": "foo.bar",
"hostname": "@",
"type": "A"
}
]
}

View File

@ -3,7 +3,7 @@
[metadata]
name = netcup-dns
version = 0.1.0
version = 0.2.0
author = Daniel Langbein
author_email = daniel@systemli.org
description = Update DNS records with your current external IP address using the netcup DNS API.

View File

@ -1,10 +1,11 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import configparser
import ipaddress
import json
import sys
from functools import lru_cache
from pathlib import Path
from typing import Type
import requests
from nc_dnsapi import Client, DNSRecord
@ -22,33 +23,39 @@ def main():
if not cfg_dir.exists():
raise Exception(f'The config directory is missing: {cfg_dir}')
cfg_files = [file for file in cfg_dir.iterdir() if file.name.endswith('.ini') and file.is_file()]
cfg_files = [file for file in cfg_dir.iterdir() if file.name.endswith('.json') and file.is_file()]
for cfg_file in cfg_files:
cfg = configparser.ConfigParser()
cfg.read(cfg_file)
if len(cfg.sections()) == 0:
# Skip completely empty configuration file.
cfg = json.loads(cfg_file.read_text())
if len(cfg) == 0 or cfg['disabled'] is True:
# Skip empty or disabled configuration file.
continue
customer = cfg['credentials']['customer']
api_key = cfg['credentials']['api_key']
api_password = cfg['credentials']['api_password']
domains = [section for section in cfg.sections() if section != 'credentials']
customer = cfg['customer']
api_key = cfg['api_key']
api_password = cfg['api_password']
entries = cfg['records']
with Client(customer, api_key, api_password) as api:
for domain in domains:
hostname = cfg[domain]['hostname']
type_ = cfg[domain]['type'].upper()
for entry in entries:
domain = entry['domain']
hostname = entry['hostname']
type_ = entry['type'].upper()
if type_ == 'A':
# Lazy: Only determine external IPv4 if an A record shall be updated.
destination = external_ipv4()
if update_record_destination(api, domain, hostname, type_, destination=destination):
print(f'Set {hostname}.{domain} {type_} record to {destination}')
else:
print(f'The {hostname}.{domain} {type_} record points already to {destination}')
elif type_ == 'AAAA':
# Lazy: Only determine external IPv6 if an AAAA record shall be updated.
destination = external_ipv6()
else:
raise Exception(f'DNS record type {type_} is not supported.')
if update_record_destination(api, domain, hostname, type_, destination=destination):
print(f'Set {hostname}.{domain} {type_} record to {destination}')
else:
print(f'The {hostname}.{domain} {type_} record points already to {destination}')
def update_record_destination(api: Client, domain: str, hostname: str, type_: str, destination: str) -> bool:
"""
@ -89,51 +96,90 @@ def get_record(api: Client, domain: str, hostname: str, type_: str) -> DNSRecord
return matches[0]
@lru_cache(maxsize=None)
def external_ipv4(timeout=5) -> str:
def ipv4_endpoints() -> list[str]:
"""
:argument timeout: Timeout for each IP detection webservice in seconds.
:return: Public IPv4 address
:return: List of services that return your external IPv4 address.
"""
# IPv4 only.
endpoints = ['https://checkipv4.dedyn.io/', 'https://api.ipify.org', 'https://v4.ident.me/']
# Not sure if they return IPv4 addresses only,
# so we try these endpoints last.
endpoints = ['https://checkipv4.dedyn.io', 'https://api.ipify.org', 'https://v4.ident.me/']
# Not sure if they return IPv4 addresses only.
endpoints += ['https://ipinfo.io/ip', 'https://ifconfig.me/ip']
return endpoints
def ipv6_endpoints() -> list[str]:
"""
:return: List of services that return your external IPv6 address.
"""
# IPv6 only.
endpoints = ['https://checkipv6.dedyn.io', 'https://api6.ipify.org', 'https://v6.ident.me/']
# Returns either IPv4 or IPv6.
endpoints += []
# Not sure if they return IPv6.
endpoints += ['https://ipinfo.io/ip']
return endpoints
backup = None
try:
# Force the usage of IPv4.
# https://stackoverflow.com/a/72440253/6334421
#
# Alternatively, use urllib3: https://stackoverflow.com/a/46972341/6334421
backup = requests.packages.urllib3.util.connection.HAS_IPV6
requests.packages.urllib3.util.connection.HAS_IPV6 = False
for endpoint in endpoints:
def external_ip(version: Type[ipaddress.IPv4Address | ipaddress.IPv6Address], timeout: float = 5,
endpoints: list[str] = None) -> str:
"""
:param version: Weather the public IPv4 or IPv6 address shall be determined.
:param endpoints: List of webservices that return ones public IP IPv4/IPv6 address.
:argument timeout: Timeout for each webservice in seconds.
:return: Public IPv4/IPv6 address.
"""
if endpoints is None:
if version == ipaddress.IPv4Address:
endpoints = ipv4_endpoints()
elif version == ipaddress.IPv6Address:
endpoints = ipv6_endpoints()
else:
raise ValueError(f'Invalid argument: {version}')
if len(endpoints) == 0:
raise ValueError(f'Invalid argument: {endpoints}')
for endpoint in endpoints:
try:
# Timeout after 5 seconds.
try:
ip = requests.get(endpoint, timeout=timeout).text.strip()
except requests.exceptions.RequestException:
continue
ip = requests.get(endpoint, timeout=timeout).text.strip()
except requests.exceptions.RequestException:
continue
# Check if it is actually an IPv4 address.
# Some services, such as e.g. v4.ident.me, sometimes return IPv6.
try:
ipv4 = ipaddress.ip_address(ip)
except ValueError:
continue
if not isinstance(ipv4, ipaddress.IPv4Address):
continue
try:
# Try to parse the IP address.
parsed_ip = ipaddress.ip_address(ip)
except ValueError:
continue
# Return ip address as string.
return ipv4.exploded
finally:
if backup is not None:
requests.packages.urllib3.util.connection.HAS_IPV6 = backup
# Check if it is actually an IPv4/IPv6 address.
if not isinstance(parsed_ip, version):
continue
raise Exception('Could not determine public IPv4 address.')
# Return IP address as string.
return parsed_ip.exploded
raise Exception('Could not determine public IP address.')
@lru_cache(maxsize=None)
def external_ipv4(timeout: float = 5, endpoints: list[str] = None) -> str:
"""
:param endpoints: List of webservices that return ones public IPv4 address.
:argument timeout: Timeout for each webservice in seconds.
:return: Public IPv4 address.
"""
return external_ip(ipaddress.IPv4Address, timeout, endpoints)
@lru_cache(maxsize=None)
def external_ipv6(timeout: float = 5, endpoints: list[str] = None) -> str:
"""
:param endpoints: List of webservices that return ones public IPv6 address.
:argument timeout: Timeout for each webservice in seconds.
:return: Public IPv6 address.
"""
return external_ip(ipaddress.IPv6Address, timeout, endpoints)
if __name__ == '__main__':