Add gitea-translator server and module

This commit is contained in:
2026-03-13 16:09:12 +01:00
parent 8cd2eafaa5
commit c80628a1a9
2 changed files with 185 additions and 0 deletions

View File

@@ -0,0 +1,78 @@
{ config, self, lib, inputs, pkgs, ... }:
with lib;
let
cfg = config.services.malobeo.gitea-translator;
in
{
options = {
services.malobeo.gitea-translator = {
enable = mkOption {
default = false;
type = types.bool;
description = lib.mdDoc "Start a webserver for hydra to use the gitea pull request api.";
};
baseurl = mkOption {
type = types.str;
default = "git.dynamicdiscord.de";
description = lib.mdDoc "Base URL of the Gitea instance.";
};
owner = mkOption {
type = types.str;
default = "malobeo";
description = lib.mdDoc "Repository owner on the Gitea instance.";
};
repo = mkOption {
type = types.str;
default = "infrastructure";
description = lib.mdDoc "Repository name on the Gitea instance.";
};
host = mkOption {
type = types.str;
default = "127.0.0.1";
description = lib.mdDoc "Address the server binds to.";
};
port = mkOption {
type = types.port;
default = 27364;
description = lib.mdDoc "Port the server listens on.";
};
};
};
config = mkIf cfg.enable {
systemd.services.gitea-translator = {
description = "Gitea Pull Request Translator for Hydra";
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
wantedBy = [ "multi-user.target" ];
serviceConfig = {
ExecStart = ''
${pkgs.python3}/bin/python3 ${inputs.self + /scripts/gitea_hydra_server.py} \
--baseurl ${cfg.baseurl} \
--owner ${cfg.owner} \
--repo ${cfg.repo} \
--host ${cfg.host} \
--port ${toString cfg.port}
'';
Restart = "on-failure";
RestartSec = 5;
# Hardening because why not
DynamicUser = true;
NoNewPrivileges = true;
ProtectSystem = "strict";
ProtectHome = true;
PrivateTmp = true;
PrivateDevices = true;
};
};
};
}

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
#imports
import os
import json
import argparse
from http.server import BaseHTTPRequestHandler, HTTPServer
import urllib.request
def _get_api_response(baseurl, owner, repo):
###https://docs.gitea.com/api/1.21/#tag/repository/operation/repoListPullRequests
###GET /api/v1/repos/{owner}/{repo}/pulls
url=(f"https://{baseurl}/api/v1/repos/{owner}/{repo}/pulls?state=open")
headers={"Accept": "application/json"}
req=urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read().decode("utf-8"))
def _parse_response(baseurl, owner, repo):
target_repo_url=f"https://{baseurl}/{owner}/{repo}.git"
pulls: dict={}
response=_get_api_response(baseurl, owner, repo)
for pr in response:
pr["target_repo_url"]=target_repo_url
pulls[str(pr["number"])]=pr
return pulls
class PullsHandler(BaseHTTPRequestHandler):
_VALID_PATHS={"/", "/gitea-pulls-sorted.json"}
# Class variables to store configuration
baseurl = None
owner = None
repo = None
def do_GET(self):
if self.path not in self._VALID_PATHS:
self.send_error(404, "Not Found")
return
answer=dict(_parse_response(self.baseurl, self.owner, self.repo))
body=json.dumps(answer, indent=2, sort_keys=True).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args):
print(
f"[gitea-translator] {self.address_string()} - {fmt % args}",
flush=True,
)
def main():
parser = argparse.ArgumentParser(
description="Hydra Server to Gitea-pull-request translator"
)
parser.add_argument(
"--baseurl",
default="git.dynamicdiscord.de",
help="Base URL of Gitea instance (default: git.dynamicdiscord.de)"
)
parser.add_argument(
"--owner",
default="malobeo",
help="Repository owner (default: malobeo)"
)
parser.add_argument(
"--repo",
default="infrastructure",
help="Repository name (default: infrastructure)"
)
parser.add_argument(
"--host",
default="127.0.0.1",
help="Host to bind to (default: 127.0.0.1)"
)
parser.add_argument(
"--port",
type=int,
default=27364,
help="Port to bind to (default: 27364)"
)
args = parser.parse_args()
# Set class variables so they're accessible in request handlers
PullsHandler.baseurl = args.baseurl
PullsHandler.owner = args.owner
PullsHandler.repo = args.repo
print(
f"[gitea-translator] Starting, pulling from {args.baseurl}/{args.owner}/{args.repo}",
flush=True,
)
server=HTTPServer((args.host, args.port), PullsHandler)
print(
f"[gitea-translator] online @ {args.host}:{args.port}",
flush=True,
)
server.serve_forever()
if __name__ == "__main__":
main()