diff --git a/DHIS2/python_skeleton/.gitignore b/DHIS2/python_skeleton/.gitignore new file mode 100644 index 00000000..66448356 --- /dev/null +++ b/DHIS2/python_skeleton/.gitignore @@ -0,0 +1,3 @@ +.env +input/ +output/ \ No newline at end of file diff --git a/DHIS2/python_skeleton/create_missing_values_use_case.py b/DHIS2/python_skeleton/create_missing_values_use_case.py new file mode 100644 index 00000000..7d987c6f --- /dev/null +++ b/DHIS2/python_skeleton/create_missing_values_use_case.py @@ -0,0 +1,253 @@ +# create_missing_values_use_case.py + +from typing import Optional, Tuple + +import requests + +from dhis_utils import dhis_get +from file_utils import read_csv, escape_sql_literal, write_text + + +# Numeric ID of the attribute in trackedentityattributevalue table +TRACKED_ENTITY_ATTRIBUTE_ID = 11364749 +# Optional: UID of the attribute, for documentation/reference +TRACKED_ENTITY_ATTRIBUTE_UID = "Nf2VUgxqhmi" + + +class CreateMissingValuesUseCase: + """ + Use case to generate INSERT statements for missing attribute values + based on existing TEI metadata (created, lastUpdated, storedBy). + """ + + def __init__( + self, + base_url: str, + jsessionid: str, + input_path: str, + output_path: str, + ): + """ + Args: + base_url: DHIS2 base URL (without trailing slash). + jsessionid: JSESSIONID cookie value. + input_path: CSV file name (relative to 'input/' folder). + output_path: Output SQL file name (relative to 'output/' folder). + """ + self.base_url = base_url + self.jsessionid = jsessionid + self.input_path = input_path + self.output_path = output_path + + @staticmethod + def normalize_timestamp(raw_timestamp: Optional[str]) -> Optional[str]: + """ + Normalize a DHIS2 timestamp into a format that Postgres accepts. + Example: + '2025-07-18T13:48:12.502' -> '2025-07-18 13:48:12.502' + """ + if not raw_timestamp: + return None + + timestamp = raw_timestamp.rstrip("Z") + timestamp = timestamp.replace("T", " ") + return timestamp + + @classmethod + def _get_attribute_template_from_tei_level( + cls, + tei_data: dict, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """ + Try to obtain (created, lastUpdated, storedBy) from the first + attribute at TEI level. + """ + tei_level_attributes = tei_data.get("attributes") or [] + if not tei_level_attributes: + return None, None, None + + first_attribute = tei_level_attributes[0] + created = cls.normalize_timestamp(first_attribute.get("created")) + last_updated = cls.normalize_timestamp(first_attribute.get("lastUpdated")) + stored_by = first_attribute.get("storedBy") + + return created, last_updated, stored_by + + @classmethod + def _get_attribute_template_from_first_enrollment( + cls, + tei_data: dict, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """ + Fallback: try to obtain (created, lastUpdated, storedBy) from the + first attribute of the first enrollment. + """ + enrollments = tei_data.get("enrollments") or [] + if not enrollments: + return None, None, None + + first_enrollment = enrollments[0] + enrollment_attributes = first_enrollment.get("attributes") or [] + if not enrollment_attributes: + return None, None, None + + first_enrollment_attribute = enrollment_attributes[0] + created = cls.normalize_timestamp(first_enrollment_attribute.get("created")) + last_updated = cls.normalize_timestamp(first_enrollment_attribute.get("lastUpdated")) + stored_by = first_enrollment_attribute.get("storedBy") + + return created, last_updated, stored_by + + def _get_attribute_template( + self, + tei_uid: str, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """ + Get (created, lastUpdated, storedBy) to reuse as a template in the + new attribute value. + + Strategy: + 1) Try TEI-level attributes. + 2) If none, try the first enrollment's attributes. + 3) If nothing found, return (None, None, None). + """ + tei_data = dhis_get( + path=f"/api/trackedEntityInstances/{tei_uid}", + base_url=self.base_url, + jsessionid=self.jsessionid, + params={"fields": "*"}, + ) + + created, last_updated, stored_by = self._get_attribute_template_from_tei_level(tei_data) + if created and last_updated and stored_by: + return created, last_updated, stored_by + + return self._get_attribute_template_from_first_enrollment(tei_data) + + @staticmethod + def _build_insert_statement( + tracked_entity_id: str, + created_timestamp: str, + last_updated_timestamp: str, + full_name: str, + stored_by: str, + ) -> str: + """ + Build an INSERT statement for trackedentityattributevalue. + """ + full_name_sql = escape_sql_literal(full_name) + stored_by_sql = escape_sql_literal(stored_by) + + return f""" +INSERT INTO trackedentityattributevalue ( + trackedentityid, + trackedentityattributeid, + created, + lastupdated, + value, + storedby +) +VALUES ( + {tracked_entity_id}, + {TRACKED_ENTITY_ATTRIBUTE_ID}, + '{created_timestamp}'::timestamp, + '{last_updated_timestamp}'::timestamp, + '{full_name_sql}', + '{stored_by_sql}' +); +""".strip() + + def execute(self): + """ + Use case entry point. + + It expects a CSV with at least: + - trackedentityid + - tei_uid + - full_name + + For each row: + - Obtain a template for timestamps (created, lastUpdated, storedBy) + based on existing attributes of the TEI. + - Generate INSERT statements in trackedentityattributevalue to store full_name. + """ + inserts: list[str] = [] + skipped: list[tuple[str, str]] = [] + + rows = read_csv(self.input_path) + print(f"Read {len(rows)} rows from input/{self.input_path}") + + for row in rows: + tracked_entity_id = (row.get("trackedentityid") or "").strip() + tei_uid = (row.get("tei_uid") or "").strip() + firstname = (row.get("firstname") or "").strip() + surname = (row.get("surname") or "").strip() + + # Construimos el full_name como: + # firstname + " " + surname + if firstname and surname: + full_name = f"{firstname} {surname}" + elif firstname: + full_name = firstname + elif surname: + full_name = surname + else: + full_name = "" + + if not tracked_entity_id or not tei_uid or not full_name: + print(f"[SKIP] Missing required data in CSV row: {row}") + skipped.append((tei_uid, "incomplete_csv_data")) + continue + + print(f"Processing TEI {tei_uid} (trackedentityid={tracked_entity_id})...") + + try: + created_timestamp, last_updated_timestamp, stored_by = self._get_attribute_template( + tei_uid=tei_uid, + ) + except requests.HTTPError as http_error: + status_code = ( + http_error.response.status_code + if http_error.response is not None + else "?" + ) + print(f"[ERROR] TEI {tei_uid}: HTTP {status_code}") + skipped.append((tei_uid, f"http_{status_code}")) + continue + except Exception as unexpected_error: + print(f"[ERROR] TEI {tei_uid}: {unexpected_error}") + skipped.append((tei_uid, "unexpected_error")) + continue + + if not created_timestamp or not last_updated_timestamp or not stored_by: + print( + f"[WARN] TEI {tei_uid}: no template " + "(created/lastUpdated/storedBy), skipping" + ) + skipped.append((tei_uid, "no_attribute_template")) + continue + + insert_sql = self._build_insert_statement( + tracked_entity_id=tracked_entity_id, + created_timestamp=created_timestamp, + last_updated_timestamp=last_updated_timestamp, + full_name=full_name, + stored_by=stored_by, + ) + + inserts.append(insert_sql) + + print(insert_sql) + + if not inserts: + print("No INSERT statements generated. Check CSV / connection.") + return + + sql_script = "BEGIN;\n\n" + "\n\n".join(inserts) + "\n\nCOMMIT;\n" + final_path = write_text(self.output_path, sql_script) + + print(f"\nSQL written to: {final_path}") + if skipped: + print("\nSkipped TEIs:") + for tei_uid, reason in skipped: + print(f" - {tei_uid}: {reason}") diff --git a/DHIS2/python_skeleton/dhis_utils.py b/DHIS2/python_skeleton/dhis_utils.py new file mode 100644 index 00000000..a1bd7de8 --- /dev/null +++ b/DHIS2/python_skeleton/dhis_utils.py @@ -0,0 +1,75 @@ +# dhis_utils.py + +import sys +import requests + + +def dhis_get( + path: str, + base_url: str, + jsessionid: str, + params: dict | None = None, + timeout: int = 30, +) -> dict: + """ + Perform a GET request to a DHIS2 instance using a JSESSIONID cookie. + + Args: + path: API path, e.g. "/api/system/info". + base_url: Base URL of the DHIS2 instance, e.g. "https://my-dhis2". + jsessionid: Value of the JSESSIONID cookie. + params: Optional query parameters. + timeout: Request timeout in seconds. + + Returns: + Parsed JSON response as a Python dict. + + Raises: + requests.HTTPError if the response status is not 2xx. + """ + url = f"{base_url.rstrip('/')}/{path.lstrip('/')}" + cookies = {"JSESSIONID": jsessionid} + response = requests.get(url, params=params or {}, cookies=cookies, timeout=timeout) + response.raise_for_status() + return response.json() + + +def test_connection( + base_url: str, + jsessionid: str, + timeout: int = 10, +) -> dict: + """ + Test connection against /api/system/info. + If it fails, print an error and abort the script. + + Returns: + system/info JSON dict if everything is OK. + + Exits: + Calls sys.exit(1) on any error. + """ + try: + system_info = dhis_get( + path="/api/system/info", + base_url=base_url, + jsessionid=jsessionid, + params=None, + timeout=timeout, + ) + except requests.HTTPError as http_error: + status = http_error.response.status_code if http_error.response is not None else "?" + print(f"[FATAL] HTTP error {status} while calling /api/system/info at {base_url}") + sys.exit(1) + except requests.RequestException as req_error: + print(f"[FATAL] Could not connect to {base_url} (/api/system/info): {req_error}") + sys.exit(1) + except Exception as unexpected: + print(f"[FATAL] Unexpected error while testing connection to {base_url}: {unexpected}") + sys.exit(1) + + print( + f"[OK] Connected to DHIS2 at {base_url} " + f"(version={system_info.get('version', 'unknown')})" + ) + return system_info diff --git a/DHIS2/python_skeleton/file_utils.py b/DHIS2/python_skeleton/file_utils.py new file mode 100644 index 00000000..1a27fe1a --- /dev/null +++ b/DHIS2/python_skeleton/file_utils.py @@ -0,0 +1,207 @@ +# file_utils.py + +import json +import csv +from pathlib import Path +from datetime import datetime + + +def _to_input_path(path: str | Path) -> Path: + """ + Map a relative path to the 'input' folder. + + - If the path is absolute, it is returned as-is. + - If the path is relative, it is resolved as 'input/'. + """ + p = Path(path) + if p.is_absolute(): + return p + return Path("input") / p + + +def _to_output_path(path: str | Path) -> Path: + """ + Map a relative path to the 'output' folder. + + - If the path is absolute, it is returned as-is. + - If the path is relative, it is resolved as 'output/'. + """ + p = Path(path) + if p.is_absolute(): + return p + return Path("output") / p + + +def _with_timestamp_if_exists(path: Path) -> Path: + """ + If the given path already exists, append a human-readable timestamp + to the filename (before the extension). + + Example: + output/result.sql -> exists + output/result_2025-11-20_15-42-10.sql (new path) + """ + if not path.exists(): + return path + + timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + return path.with_name(f"{path.stem}_{timestamp}{path.suffix}") + + +def read_json(path: str | Path): + """ + Read a JSON file from disk and return the parsed content. + The file is read from the 'input' folder unless an absolute path is provided. + """ + path = _to_input_path(path) + with path.open(encoding="utf-8") as f: + return json.load(f) + + +def write_json(path: str | Path, data, indent: int = 2) -> Path: + """ + Write a Python object as JSON to disk. + The file is written to the 'output' folder unless an absolute path is provided. + If the target file already exists, a human-readable timestamp is appended + to the filename. + + Returns: + The final Path used to write the file. + """ + path = _to_output_path(path) + path = _with_timestamp_if_exists(path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=indent) + return path + + +def read_csv(path: str | Path) -> list[dict]: + """ + Read a CSV file and return a list of dictionaries (one per row). + The file is read from the 'input' folder unless an absolute path is provided. + """ + path = _to_input_path(path) + with path.open(newline="", encoding="utf-8") as f: + return list(csv.DictReader(f)) + + +def write_csv(path: str | Path, rows: list[dict], fieldnames: list[str]) -> Path: + """ + Write a list of dictionaries to a CSV file. + The file is written to the 'output' folder unless an absolute path is provided. + If the target file already exists, a human-readable timestamp is appended + to the filename. + + Returns: + The final Path used to write the file. + """ + path = _to_output_path(path) + path = _with_timestamp_if_exists(path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + return path + + +def write_text(path: str | Path, content: str) -> Path: + """ + Write plain text to a file. + The file is written to the 'output' folder unless an absolute path is provided. + If the target file already exists, a human-readable timestamp is appended + to the filename. + + Returns: + The final Path used to write the file. + """ + path = _to_output_path(path) + path = _with_timestamp_if_exists(path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + return path + + +def escape_sql_literal(text: str) -> str: + """ + Escape single quotes in a string so it can be safely used + as a SQL literal value. + """ + return text.replace("'", "''") + + +def load_dhis_env_config( + default_base_url: str, + default_jsessionid: str, + env_path: str | Path | None = None, +) -> tuple[str, str]: + """ + Look for a .env file and try to read DHIS2 configuration from it. + If a valid BASE_URL and JSESSIONID are found, show them to the user + (masking the JSESSIONID) and ask for confirmation. + + If the user presses ENTER, the values from .env are used. + If the user types 'n' or 'N' and presses ENTER, the defaults are kept. + + Args: + default_base_url: Fallback base URL if .env is not used or not found. + default_jsessionid: Fallback JSESSIONID if .env is not used or not found. + env_path: Optional explicit path to the .env file. If None, "./.env" is used. + + Returns: + (base_url, jsessionid) either from .env (if confirmed) or the defaults. + """ + if env_path is None: + env_path = Path(".") / ".env" + else: + env_path = Path(env_path) + + if not env_path.is_file(): + return default_base_url, default_jsessionid + + base_url_env: str | None = None + jsessionid_env: str | None = None + + try: + with env_path.open(encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + + key_upper = key.upper() + if key_upper == "BASE_URL": + base_url_env = value + elif key_upper == "JSESSIONID": + jsessionid_env = value + except Exception as e: + print(f"[WARN] Failed to read .env file at {env_path}: {e}") + return default_base_url, default_jsessionid + + if not base_url_env or not jsessionid_env: + return default_base_url, default_jsessionid + + masked_jsessionid = ( + jsessionid_env[:6] + "..." if len(jsessionid_env) > 6 else jsessionid_env + ) + + print("Found .env configuration:") + print(f" Base URL : {base_url_env}") + print(f" JSESSIONID : {masked_jsessionid}") + print() + answer = input( + "Press ENTER to use this configuration, or type 'n' and press ENTER to ignore it: " + ).strip() + + if answer.lower() == "n": + print("Using default configuration (ignoring .env).") + return default_base_url, default_jsessionid + + print("Using configuration from .env.") + return base_url_env, jsessionid_env diff --git a/DHIS2/python_skeleton/get_files_util.py b/DHIS2/python_skeleton/get_files_util.py new file mode 100644 index 00000000..77024c4e --- /dev/null +++ b/DHIS2/python_skeleton/get_files_util.py @@ -0,0 +1,47 @@ +import os +from pathlib import Path + +# === CONFIGURATION === +# Used to get all the files in one file to check in chatgpt +BASE_DIR = Path(__file__).resolve().parent +MAX_BYTES = 200_000 # maximum file size to read + + +def main(): + for root, dirs, files in os.walk(BASE_DIR): + for name in files: + path = os.path.join(root, name) + + # Skip hidden files and folders (e.g. .git, .idea, .venv, etc.) + if any(part.startswith(".") for part in path.split(os.sep)): + continue + + # Skip common binary / large formats + if any( + path.endswith(ext) + for ext in (".png", ".jpg", ".jpeg", ".gif", ".pdf", ".zip", ".pyc", ".sql", ".csv") + ): + continue + + try: + # Skip very large files + if os.path.getsize(path) > MAX_BYTES: + continue + + with open(path, "r", encoding="utf-8") as f: + content = f.read() + except (UnicodeDecodeError, OSError): + # Binary or unreadable files -> skip + continue + + rel_path = os.path.relpath(path, BASE_DIR) + + print("\n" + "=" * 80) + print(f"FILE: {rel_path}") + print("=" * 80 + "\n") + print(content) + print("\n") # extra separation + + +if __name__ == "__main__": + main() diff --git a/DHIS2/python_skeleton/main_skeleton.py b/DHIS2/python_skeleton/main_skeleton.py new file mode 100644 index 00000000..3a040484 --- /dev/null +++ b/DHIS2/python_skeleton/main_skeleton.py @@ -0,0 +1,66 @@ +# main.py + +import argparse + +from file_utils import load_dhis_env_config +from create_missing_values_use_case import CreateMissingValuesUseCase +from dhis_utils import test_connection + +# Default configuration (can be overridden by .env and CLI) +DEFAULT_BASE_URL = "" +DEFAULT_JSESSIONID = "" + +DEFAULT_INPUT_FILE = ("teis_without_storedby.csv") # read from input/ +DEFAULT_OUTPUT_FILE = "insert_attr_fullname.sql" # write to output/ + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Create missing attribute values SQL for DHIS2 tracked entities." + ) + parser.add_argument( + "--base-url", + default=DEFAULT_BASE_URL, + help="DHIS2 base URL (default from code or .env).", + ) + parser.add_argument( + "--jsessionid", + default=DEFAULT_JSESSIONID, + help="JSESSIONID cookie value (default from code or .env).", + ) + parser.add_argument( + "--input-file", + default=DEFAULT_INPUT_FILE, + help="Input CSV file name (relative to 'input/' folder).", + ) + parser.add_argument( + "--output-file", + default=DEFAULT_OUTPUT_FILE, + help="Output SQL file name (relative to 'output/' folder).", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + # Merge CLI defaults with .env (with confirmation) + base_url, jsessionid = load_dhis_env_config( + default_base_url=args.base_url, + default_jsessionid=args.jsessionid, + ) + + test_connection(base_url=base_url, jsessionid=jsessionid) + + use_case = CreateMissingValuesUseCase( + base_url=base_url, + jsessionid=jsessionid, + input_path=args.input_file, + output_path=args.output_file, + ) + + use_case.execute() + + +if __name__ == "__main__": + main() \ No newline at end of file