diff --git a/DHIS2/notify_expiration/README.md b/DHIS2/notify_expiration/README.md new file mode 100644 index 0000000..ae76964 --- /dev/null +++ b/DHIS2/notify_expiration/README.md @@ -0,0 +1,34 @@ +# Notify expiration +This script searches for users that will have their password expired, log it and cand send a simplified notification to a webhook URL. + +# Usage +Run the script with the following parameters: + +python3 notify_expiration.py --config (--debug) +Parameters: +--config : Mandatory configuration file where the URL, credentials and other parameters are configured +--debug: Optional parameter to ask for debugging logs, to ensure proper load of the configuration file and all details regarding data gathered and filtered. + +# Config file +It accepts the following attributes as a json: +- `server`: Mandatory. The url of the DHIS2 instance. Should end with a slash `/`, but it will append it if not present. +- `user`: Mandatory. The user needed to authenticate to the instance. +- `password`: Mandatory. The password for the authentication. +- `filterGroup`: Optional. Defaults to `None`. An array of userGroup names to be considered as the only ones to evaluate. It allows to restrict the result to those who belong to any of these groups instead of the whole DHIS instance. +- `months_expiry`: Optional. Defaults to `18`. Should reflect the actual password policy of the instance. It does not retrieve the value from the instance as this allows to evaluate instances without an active password expiration policy. +- `how_many_days`: Optional. Defaults to `7`. As this script was conceived to be run periodically notifying which users will have their password expiring in the current week. +- `only_enabled`: Optional. Defaults to `true`. When set to `true`, restricts the evaluation to enabled users. When set to false it will evaluate both enabled and disable users. +- `webhook_url`: Optional. Defaults to `""`. If empty, will not send any notification and will only present the data directly in the console. If set to an URL, it will use it to notify the list of users (with expiration date and if they are disabled). This endpoint should expect to receive a json with 2 parameters: `text` and `dest`. `text` will be compossed with a `title` in bold followed by the content of the notification. `dest` should be used to determine the `destination` of the notification. +- `title`: Optional. Defaults to `"DHIS2 Password Expiration Notification"`. Allows to set a custom title to the notification. +- `destination`: Optional. Defaults to `"sysadmin"`. Allows the webhook URL to select the correct destination of the notification. +- `http_proxy`: Optional. If not present, will try to use an environmental variable of the same name. If present will be used for the webhook notification. +- `https_proxy`: Optional. If not present, will try to use an environmental variable of the same name. If present will be used for the webhook notification. + +# How It Works +Parses config file. +Sends a GET request to retrieve the list of all users and some attributes needed to determine when the user is going to expire +Displays what users are in the expected range of dates and optionally sends a notification via webhook, as well as details of it was successful. + +# Requirements +Python 3 +requests module (pip install requests) diff --git a/DHIS2/notify_expiration/config.json b/DHIS2/notify_expiration/config.json new file mode 100644 index 0000000..8365b46 --- /dev/null +++ b/DHIS2/notify_expiration/config.json @@ -0,0 +1,8 @@ +{ + "server": "https://domain/dhis2-instance/", + "user": "USER", + "password": "PASSWORD", + "months_expiry": 18, + "how_many_days": 7, + "only_enabled": false +} diff --git a/DHIS2/notify_expiration/notify_expiration.py b/DHIS2/notify_expiration/notify_expiration.py new file mode 100644 index 0000000..9d8847d --- /dev/null +++ b/DHIS2/notify_expiration/notify_expiration.py @@ -0,0 +1,157 @@ +import json +import sys +import requests +from requests.auth import HTTPBasicAuth +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta +import argparse +import os +import logging + +class DHIS2Monitor: + def __init__(self, config): + self.logger = logging.getLogger(__name__) + + self.server = config.get('server') + if self.server[-1:] != "/": + self.server = self.server + "/" + self.logger.debug(f"Adjusted base server URL to: {self.server}") + + self.auth = HTTPBasicAuth(config.get('user'), config.get('password')) + + self.filterGroup = config.get('filterGroup') + if self.filterGroup: + self.logger.debug(f"Filtering users by groups: {self.filterGroup}") + else: + self.logger.debug("No user group filtering applied.") + months_expiry = config.get('months_expiry', 18) + how_many_days = config.get('how_many_days', 7) + self.expiry_beginning = (datetime.now() - relativedelta(months=months_expiry)).replace(hour=0, minute=0, second=0, microsecond=0) + self.expiry_end = (self.expiry_beginning + relativedelta(days=how_many_days)).replace(hour=23, minute=59, second=59, microsecond=0) + self.logger.debug(f"Selecting dates between {self.expiry_beginning} and {self.expiry_end}") + self.only_enabled = config.get('only_enabled', True) + if self.only_enabled: + self.logger.debug("Filtering only enabled users") + else: + self.logger.debug("All users will be considered (enabled and disabled)") + self.webhook_url = config.get('webhook_url', None) + if self.webhook_url: + self.logger.debug("Webhook URL configured, notifications will be sent.") + self.http_proxy = config.get('http_proxy', os.getenv("http_proxy", "")) + self.https_proxy = config.get('https_proxy', os.getenv("https_proxy", "")) + if self.http_proxy or self.https_proxy: + self.logger.debug(f"Using proxies: HTTP: {self.http_proxy} / HTTPS: {self.https_proxy}") + else: + self.logger.debug("No proxies configured.") + self.title = config.get('title', "DHIS2 Password Expiration Notification") + self.destination = config.get('destination', "sysadmin") + else: + self.logger.debug("No webhook URL configured, notifications will not be sent.") + + def request(self, url): + response = requests.get(url, auth=self.auth) + return response + + def parse_dt(self, value=None): + """Parses a datetime string avoiding milliseconds.""" + if not value: + return None + value = value.split(".")[0] + return datetime.strptime(value, "%Y-%m-%dT%H:%M:%S") + + def get_users_data(self): + """Fetches the list of users and their userGroups.""" + return self.request(f'{self.server}api/users?fields=id,username,userGroups[name],created,passwordLastUpdated,disabled&paging=false') + + def filter_users(self, users): + """Filters out users based on groups, whether the user is disabled and the password expiration time.""" + filtered = [] + for u in users: + if self.only_enabled and u.get("disabled", False): + continue + if self.filterGroup: + group_names = [g["name"] for g in u.get("userGroups", []) if "name" in g] + if not set(group_names) & set(self.filterGroup): + continue + pwdlast_dt = self.parse_dt(u.get("passwordLastUpdated")) + created_dt = self.parse_dt(u.get("created")) + if pwdlast_dt: + if not (self.expiry_beginning <= pwdlast_dt <= self.expiry_end): + continue + last_change = pwdlast_dt + else: + if not (created_dt and self.expiry_beginning <= created_dt <= self.expiry_end): + continue + last_change = created_dt + + days_since = int((datetime.now() - last_change).total_seconds() / 86400) + + filtered.append({ + "id": u["id"], + "username": u["username"], + "days_since_change": days_since, + "last_change": (pwdlast_dt or created_dt).isoformat(), + "disabled": u.get("disabled") + }) + + return sorted(filtered, key=lambda x: x["username"]) + + def handle_api_error(self, response): + self.logger.error(f"API request failed: {response.status_code} - {response.headers} = {response.text[:500]}") + sys.exit(0) + + def send_notification(self, content): + proxies = { + "http": self.http_proxy, + "https": self.https_proxy + } + + payload = {"text": f"**{self.title}**\n{content}", "dest": f"{self.destination}"} + try: + response = requests.post(self.webhook_url, json=payload, headers={"Content-Type": "application/json"}, proxies=proxies, verify=True) + response.raise_for_status() + self.logger.info(f"[OK] Notification sent: {response.status_code}") + except requests.RequestException as e: + self.logger.error(f"Failed to send notification: {e}") + + +def main(config_path, debug=False): + logging.basicConfig( + level=logging.INFO if not debug else logging.DEBUG, + format="%(asctime)s %(levelname)s %(message)s" + ) + + print("Starting at: " + datetime.now().strftime("%Y-%m-%d_%H-%M-%S")) + + with open(config_path, 'r') as conf_file: + config_data = json.load(conf_file) + + monitor = DHIS2Monitor(config_data) + + response = monitor.get_users_data() + + if response.status_code != 200: + monitor.handle_api_error(response) + + monitor.logger.debug(f"Found {len(response.json().get('users', []))} users in total. Checking for expiring passwords...") + + data_filtered = monitor.filter_users(response.json()["users"]) + + if data_filtered: + monitor.logger.info(f"Found {len(data_filtered)} users with expiring passwords during the current week:") + monitor.logger.info(json.dumps(data_filtered, indent=4)) + if monitor.webhook_url: + content = f"The following {len(data_filtered)} users have passwords that will expire during the current week:\n" + for user in data_filtered: + content += f"- {user['username']} ({user['last_change'][:10]})" + (f" [Disabled]" if user['disabled'] else "") + "\n" + monitor.send_notification(content) + else: + monitor.logger.info("No users found with expiring passwords.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="DHIS2 User password expiration notification script") + parser.add_argument('--config', required=True, help="Config file path") + parser.add_argument('--debug', action="store_true", help="Enable debug logging") + args = parser.parse_args() + main(args.config, args.debug)