polycule-network/dns.py

55 lines
1.7 KiB
Python
Raw Permalink Normal View History

2024-07-30 07:50:47 +00:00
import time
2024-07-30 12:55:06 +00:00
import socket
2024-07-30 07:50:47 +00:00
from ipaddress import IPv4Address, IPv4Network
from dnslib import DNSRecord,RCODE,QTYPE
from dnslib.server import DNSServer,DNSHandler,BaseResolver,DNSLogger
from dnslib.label import DNSLabel
from dnslib.ranges import IP4
class ProxyResolver(BaseResolver):
2024-08-01 20:16:29 +00:00
def __init__(self, config):
self.timeout = config.dns_timeout
2024-07-30 07:50:47 +00:00
self.config = config
def resolve(self, request, handler):
qname = DNSLabel(request.q.qname)
2024-08-01 20:16:29 +00:00
dns = self.config.dns_server(str(qname))
2024-07-30 13:20:41 +00:00
2024-07-30 07:50:47 +00:00
try:
2024-08-01 20:16:29 +00:00
proxy_r = request.send(dns.address, dns.port, timeout=self.timeout)
2024-07-30 07:50:47 +00:00
reply = DNSRecord.parse(proxy_r)
except socket.timeout:
reply = request.reply()
reply.header.rcode = getattr(RCODE, 'NXDOMAIN')
2024-08-01 20:16:29 +00:00
return reply
if dns.network != None and dns.network != config.local_network:
2024-07-30 07:50:47 +00:00
for rr in reply.rr:
2024-08-01 20:16:29 +00:00
rr.rdata.data = IPv4Address(config.translate(str(rr.rdata), network)).packed
2024-07-30 07:50:47 +00:00
reply.set_header_qa()
return reply
def translate(ip, untranslated_range, translated_range):
for (loc, trans) in zip(IPv4Network(untranslated_range), IPv4Network(translated_range)):
if str(loc) == ip:
return str(trans)
return ip
2024-08-01 20:16:29 +00:00
def run(config):
resolver = ProxyResolver(config)
2024-07-30 07:50:47 +00:00
handler = DNSHandler
logger = DNSLogger("+request,+reply,+truncated,+error,-recv,-send,-data")
2024-08-01 20:16:29 +00:00
udp_server = DNSServer(
resolver,
port=config.listen_port,
address=config.listen_address,
logger=logger,
handler=handler
)
2024-07-30 07:50:47 +00:00
udp_server.start_thread()
while udp_server.isAlive():
time.sleep(1)