# PolyculeNetwork # Copyright (C) 2024 PolyculeConnect # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import ipaddress import tomllib import jinja2 dry_run = False if dry_run: run = print else: run = os.system def load_config(path): with open(path, "rb") as f: data = tomllib.load(f) return data def load_firewall(): data = load_config("/config/config.toml") run("nft -f templates/rules.nft") networks = data["network"].keys() local_network = os.environ.get('LOCAL_NETWORK') remote_networks = list(filter(lambda k: k != local_network, networks)) local_range = str(ipaddress.IPv4Network(data["network"][local_network]["local_range"])) local_translated_range = str(ipaddress.IPv4Network(data["network"][local_network]["local_translated_range"])) remote_ranges = [str(ipaddress.IPv4Network(data["network"][net]["local_translated_range"])) for net in remote_networks] run(f"nft add element ip filter local_range {{ {local_range} }}") run(f"nft add element ip filter local_translated_range {{ {local_translated_range} }}") for net in remote_ranges: run(f"nft add element ip filter remote_range {{ {net} }}") for (loc, trans) in zip(ipaddress.IPv4Network(local_range), ipaddress.IPv4Network(local_translated_range)): run(f"nft add element ip filter ip_map_snat {{ {loc} : {trans} }}") run(f"nft add element ip filter ip_map_dnat {{ {trans} : {loc} }}") def load_wireguard(): with open("templates/wg-pn.conf.j2", "r") as f: env = jinja2.Environment() template = env.from_string(f.read()) peers = [] data = load_config("/config/config.toml") networks = data["network"].keys() local_network = os.environ.get('LOCAL_NETWORK') remote_networks = list(filter(lambda k: k != local_network, networks)) for net in remote_networks: peer = { "public_key": data["network"][net]["public_key"], } endpoint = data["network"][net].get("endpoint", "") if endpoint != "": peer["endpoint"] = endpoint peer["allowed_ips"] = data["network"][net]["local_translated_range"] untranslated_networks = data["network"][net].get("untranslated_networks", "") if untranslated_networks != "": peer["allowed_ips"] += ", " + untranslated_networks peers.append(peer) with open("wg-pn.conf", "w") as f: f.write(template.render( private_key=os.environ.get('PRIVATE_KEY'), listen_port=os.environ.get('LISTEN_PORT', "51820"), wireguard_address=data["network"][local_network]["wireguard_address"], peers=peers )) def gen_dns(): data = load_config("/config/config.toml") networks = data["network"].keys() local_network = os.environ.get('LOCAL_NETWORK') remote_networks = list(filter(lambda k: k != local_network, networks)) dns_servers = [] for domain in data["network"][local_network]["dns"].keys(): dns_servers.append({ "ip": data["network"][local_network]["dns"][domain], "domain": domain }) for net in remote_networks: for domain in data["network"][net]["dns"].keys(): ip = data["network"][net]["dns"][domain] local_range = ipaddress.IPv4Network(data["network"][net]["local_range"]) if ipaddress.IPv4Address(ip) in local_range: local_translated_range = ipaddress.IPv4Network(data["network"][net]["local_translated_range"]) for (loc, trans) in zip(local_range, local_translated_range): if ipaddress.IPv4Address(ip) == loc: ip = str(trans) break dns_servers.append({ "ip": ip, "domain": domain }) with open("templates/dnsmasq.conf.j2", "r") as f: env = jinja2.Environment() template = env.from_string(f.read()) with open("/config/dnsmasq.conf", "w") as f: f.write(template.render( default_server=os.environ.get('DNS_SERVER', "1.1.1.1"), dns_servers=dns_servers )) load_firewall() load_wireguard() gen_dns()