# PolyculeNetwork # Copyright (C) 2024 PolyculeConnect # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os from ipaddress import IPv4Address, IPv4Network import tomllib import jinja2 dry_run = False if dry_run: run = print else: run = os.system class Config: def __init__(self, path): with open(path, "rb") as f: data = tomllib.load(f) self.data = data self.networks = data["network"].keys() self.local_network = os.environ.get('LOCAL_NETWORK') self.remote_networks = list(filter(lambda k: k != self.local_network, self.networks)) self.local_range = str(IPv4Network(data["network"][self.local_network]["local_range"])) self.local_translated_range = str(IPv4Network(data["network"][self.local_network]["local_translated_range"])) self.remote_ranges = [str(IPv4Network(data["network"][net]["local_translated_range"])) for net in self.remote_networks] self.dns_servers = [] for net in self.networks: for domain in data["network"][net]["dns"].keys(): self.dns_servers.append({ "ip": data["network"][net]["dns"][domain], "domain": domain, }) def load_firewall(config): run("nft -f templates/rules.nft") run(f"nft add element ip filter local_range {{ {config.local_range} }}") run(f"nft add element ip filter local_translated_range {{ {config.local_translated_range} }}") for net in config.remote_ranges: run(f"nft add element ip filter remote_range {{ {net} }}") for (loc, trans) in zip(IPv4Network(config.local_range), IPv4Network(config.local_translated_range)): run(f"nft add element ip filter ip_map_snat {{ {loc} : {trans} }}") run(f"nft add element ip filter ip_map_dnat {{ {trans} : {loc} }}") def load_wireguard(config): with open("templates/wg-pn.conf.j2", "r") as f: env = jinja2.Environment() template = env.from_string(f.read()) peers = [] for net in config.remote_networks: peer = { "public_key": config.data["network"][net]["public_key"], } endpoint = config.data["network"][net].get("endpoint", "") if endpoint != "": peer["endpoint"] = endpoint peer["allowed_ips"] = config.data["network"][net]["local_translated_range"] + ", " + config.data["network"][net]["wireguard_address"] untranslated_networks = config.data["network"][net].get("untranslated_networks", "") if untranslated_networks != "": peer["allowed_ips"] += ", " + untranslated_networks peers.append(peer) with open("wg-pn.conf", "w") as f: f.write(template.render( private_key=os.environ.get('PRIVATE_KEY'), listen_port=os.environ.get('LISTEN_PORT', "51820"), wireguard_address=config.data["network"][config.local_network]["wireguard_address"], peers=peers )) config = Config("/config/config.toml") load_firewall(config) load_wireguard(config) run("wg-quick up ./wg-pn.conf") import dns dns.run(config, port=5353)