From 64fe1328d505eefb640405f8e1bec1ff25b1ca45 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 Feb 2023 17:20:55 -0500 Subject: [PATCH 01/23] A possible implementation for managing non-python agents --- ocs/agents/host_manager/agent.py | 121 +++++++++++++++++++---------- ocs/agents/host_manager/drivers.py | 11 ++- ocs/ocsbow.py | 38 ++++++--- ocs/site_config.py | 43 ++++++++-- 4 files changed, 152 insertions(+), 61 deletions(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 36507b9a..539141b7 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -41,8 +41,8 @@ def _get_local_instances(self): Returns: agent_dict (dict): Maps instance-id to a dict describing the agent config. The config is as contained in - HostConfig.instances but where 'instance-id', - 'agent-class', and 'manage' are all guaranteed populated + HostConfig.instances but where 'instance-id', 'agent-class' + or 'agent-exe', and 'manage' are all guaranteed populated (and manage is one of ['yes', 'no', 'docker']). warnings: A list of strings, each of which corresponds to some problem found in the config. @@ -54,6 +54,10 @@ def _get_local_instances(self): self.site_config_file = site.source_file self.host_name = hc.name self.working_dir = hc.working_dir + self.wamp_url = site.hub.data['wamp_server'] + self.wamp_realm = site.hub.data['wamp_realm'] + self.address_root = site.hub.data['address_root'] + self.log_dir = hc.log_dir # Scan for agent scripts in (deprecated) script registry for p in hc.agent_paths: @@ -72,7 +76,7 @@ def _get_local_instances(self): continue # Make sure 'manage' is set, and valid. default_manage = 'no' \ - if inst['agent-class'] == 'HostManager' else 'yes' + if 'agent-class' in inst and inst['agent-class'] == 'HostManager' else 'yes' inst['manage'] = inst.get('manage', default_manage) if inst['manage'] not in ['yes', 'no', 'docker']: warnings.append( @@ -80,6 +84,17 @@ def _get_local_instances(self): f'for instance_id={inst["instance-id"]}.') continue instances[inst['instance-id']] = inst + # Make sure either 'agent-class' or 'agent-exe' is set, but not both + if 'agent-class' not in inst and 'agent-exe' not in inst: + warnings.append( + f'Configuration problem, neither agent-class nor agent-exe is set' + f'for instance_id={inst["instance-id"]}.') + continue + if inst.get('agent-class') is not None and inst.get('agent-exe') is not None: + warnings.append( + f'Configuration problem, both agent-class and agent-exe are set' + f'for instance_id={inst["instance-id"]}.') + continue returnValue((instances, warnings)) yield @@ -169,13 +184,13 @@ def retire(db_key): docker_nonagents = list(self.docker_services.keys()) for iid, hinst in agent_dict.items(): - srv = self.docker_service_prefix + iid - cls = hinst['agent-class'] - mgmt = 'host' - if srv in docker_nonagents: - docker_nonagents.remove(srv) - cls += '[d]' - mgmt = 'docker' + record = dict(hinst) + record['srv'] = self.docker_service_prefix + iid + record['mgmt'] = 'host' + if record['srv'] in docker_nonagents: + docker_nonagents.remove(record['srv']) + record['agent-class'] += '[d]' + record['mgmt'] = 'docker' if hinst['manage'] != 'docker': session.add_message( f'The agent config for instance-id=' @@ -185,7 +200,7 @@ def retire(db_key): retire(iid) continue else: - srv = None + record['srv'] = None if hinst['manage'] == 'no': continue if hinst['manage'] == 'docker': @@ -195,36 +210,48 @@ def retire(db_key): f'in config. Dropping.') retire(iid) continue - new_managed.append((iid, iid, srv, cls, mgmt)) + record['db_key'] = iid + new_managed.append(record) for srv in docker_nonagents: - new_managed.append((srv, srv, srv, '[docker]', 'docker')) + new_managed.append({'db_key': srv, 'instance-id': srv, 'srv': srv, + 'agent-class': '[docker]', 'mgmt': 'docker'}) # Compare new managed items to stuff already in our database. - for db_key, iid, srv, cls, mgmt in new_managed: + for record in new_managed: + db_key = record['db_key'] instance = self.database.get(db_key, None) if instance is not None and \ instance['management'] == 'retired': instance = None if instance is not None: # So instance is some kind of actively managed container. - if (instance['agent_class'] != cls - or instance['management'] != mgmt): + if (instance['agent_class'] != record.get('agent-class') + or instance['agent_exe'] != record.get('agent-exe') + or instance['management'] != record.get('mgmt')): session.add_message( f'Managed agent "{db_key}" changed agent_class ' - f'({instance["agent_class"]} -> {cls}) or management ' - f'({instance["management"]} -> {mgmt}) and is being ' + f'({instance["agent_class"]} -> {record.get("agent-class")}) or agent_exe ' + f'({instance["agent_exe"]} -> {record.get("agent-exe")}) or management ' + f'({instance["management"]} -> {record.get("mgmt")}) and is being ' f'reset!') instance = None if instance is None: + if record.get("agent-class") is not None: + full_name=(f'{record["agent-class"]}:{record["db_key"]}') + else: + full_name=(f'{record["agent-exe"]}:{record["db_key"]}') instance = hm_utils.ManagedInstance.init( - management=mgmt, - instance_id=iid, - agent_class=cls, - full_name=(f'{cls}:{db_key}'), + management=record.get("mgmt"), + instance_id=record.get("instance-id"), + agent_class=record.get("agent-class"), + agent_exe=record.get("agent-exe"), + full_name=full_name, + agent_arguments=record.get("arguments"), + write_logs=record.get("write-logs", False) ) - if mgmt == 'docker': - instance['agent_script'] = srv + if record['mgmt'] == 'docker': + instance['agent_script'] = record['srv'] instance['prot'] = self._get_docker_helper(instance) if instance['prot'].status[0] is None: session.add_message( @@ -237,15 +264,17 @@ def retire(db_key): else: # Check for the agent class in the plugin system; # then check the (deprecated) agent script registry. - if cls in agent_plugins: - session.add_message(f'Found plugin for "{cls}"') + if record.get("agent-exe") is not None: + pass + elif record.get("agent-class") in agent_plugins: + session.add_message(f'Found plugin for "{record.get("agent-class")}"') instance['agent_script'] = '__plugin__' - elif cls in site_config.agent_script_reg: - session.add_message(f'Found launcher script for "{cls}"') - instance['agent_script'] = site_config.agent_script_reg[cls] + elif record.get("agent-class") in site_config.agent_script_reg: + session.add_message(f'Found launcher script for "{record.get("agent-class")}"') + instance['agent_script'] = site_config.agent_script_reg[record.get("agent-class")] else: session.add_message(f'No plugin (nor launcher script) ' - f'found for agent_class "{cls}"!') + f'found for agent_class "{record.get("agent-class")}"!') session.add_message(f'Tracking {instance["full_name"]}') self.database[db_key] = instance yield warnings @@ -303,18 +332,28 @@ def _launch_instance(self, instance): prot = self._get_docker_helper(instance) else: iid = instance['instance_id'] - pyth = sys.executable - script = instance['agent_script'] - if script == '__plugin__': - cmd = [pyth, '-m', 'ocs.agent_cli'] + if instance.get('agent_script') is not None: + pyth = sys.executable + script = instance['agent_script'] + if script == '__plugin__': + cmd = [pyth, '-m', 'ocs.agent_cli'] + else: + cmd = [pyth, script] + cmd.extend([ + '--instance-id', iid, + '--site-file', self.site_config_file, + '--site-host', self.host_name, + '--working-dir', self.working_dir]) + elif instance.get('agent_exe') is not None: + cmd = [instance['agent_exe'], '--instance-id', self.address_root+'.'+iid, + '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] + if "agent_arguments" in instance: + cmd.extend(instance["agent_arguments"]) + if instance['write_logs']: + log_file_path = self.log_dir+'/'+self.address_root+'.'+iid+".log" else: - cmd = [pyth, script] - cmd.extend([ - '--instance-id', iid, - '--site-file', self.site_config_file, - '--site-host', self.host_name, - '--working-dir', self.working_dir]) - prot = hm_utils.AgentProcessHelper(iid, cmd) + log_file_path = None + prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path) prot.up() instance['prot'] = prot diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index bc9d0ad1..1ec7d697 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -17,6 +17,8 @@ class ManagedInstance(dict): - 'agent_class' (str): The agent class. This will have special value 'docker' if the instance corresponds to a docker-compose service that has not been matched to a site_config entry. + - 'agent_exe' (str): The agent executable. This setting is mutually + exclusive with 'agent_class'. - 'instance_id' (str): The agent instance-id, or the docker service name if the instance is an unmatched docker-compose service. @@ -214,7 +216,7 @@ def stability_factor(times, window=120): class AgentProcessHelper(protocol.ProcessProtocol): - def __init__(self, instance_id, cmd): + def __init__(self, instance_id, cmd, log_file=None): super().__init__() self.status = None, None self.killed = False @@ -222,6 +224,7 @@ def __init__(self, instance_id, cmd): self.cmd = cmd self.lines = {'stderr': [], 'stdout': []} + self.log_file = open(log_file,"ab") if log_file is not None else None def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ) @@ -231,6 +234,8 @@ def down(self): # race condition, but it could be worse. if self.status[0] is None: reactor.callFromThread(self.transport.signalProcess, 'INT') + if self.log_file is not None: + self.log_file.flush() # See https://twistedmatrix.com/documents/current/core/howto/process.html # @@ -262,11 +267,15 @@ def processExited(self, status): self.status = status, time.time() def outReceived(self, data): + if self.log_file is not None: + self.log_file.write(data) self.lines['stdout'].extend(data.decode('utf8').split('\n')) if len(self.lines['stdout']) > 100: self.lines['stdout'] = self.lines['stdout'][-100:] def errReceived(self, data): + if self.log_file is not None: + self.log_file.write(data) self.lines['stderr'].extend(data.decode('utf8').split('\n')) if len(self.lines['stderr']) > 100: self.lines['stderr'] = self.lines['stderr'][-100:] diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index 2915099f..bf4728dd 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -200,7 +200,7 @@ def get_status(args, site_config, restrict_hosts=None): if inst.get('manage') is None: inst['manage'] = 'yes' inst.update(blank_state) - if inst['agent-class'] == HOSTMANAGER_CLASS: + if 'agent-class' in inst and inst['agent-class'] == HOSTMANAGER_CLASS: sort_order = 0 hms.append(HostManagerManager( args, site_config, instance_id=inst['instance-id'])) @@ -235,7 +235,7 @@ def get_status(args, site_config, restrict_hosts=None): for cinfo in info['child_states']: this_id = cinfo['instance_id'] # Watch for [d] suffix, and steal it. - if cinfo['agent_class'].endswith('[d]'): + if cinfo.get('agent_class') is not None and cinfo['agent_class'].endswith('[d]'): agent_info[this_id]['agent-class'] = cinfo['agent_class'] if this_id in found: output['warnings'].append( @@ -282,23 +282,37 @@ def print_status(args, site_config): for hstat in status['hosts']: header = {'instance-id': '[instance-id]', - 'agent-class': '[agent-class]', + 'agent': '[agent]', 'current': '[state]', 'target': '[target]'} - field_widths = {'instance-id': 30, - 'agent-class': 20} + field_widths = {'instance-id': 20, + 'agent-class': 20, + 'agent-exe': 20} if len(hstat['agent_info']): - field_widths = {k: max(v0, max([len(v[k]) + field_widths = {k: max(v0, max([len(v[k]) if k in v and v[k] is not None else 0 for v in hstat['agent_info'].values()])) for k, v0 in field_widths.items()} - fmt = ' {instance-id:%i} {agent-class:%i} {current:>10} {target:>10}' % ( - field_widths['instance-id'], field_widths['agent-class']) + field_widths['agent'] = max(field_widths['agent-class'], field_widths['agent-exe']) + del field_widths['agent-class'] + del field_widths['agent-exe'] + fmt = ' {instance-id:%i} {agent:%i} {current:>10} {target:>10}' % ( + field_widths['instance-id'], field_widths['agent']) header = fmt.format(**header) print('-' * len(header)) print(f'Host: {hstat["host_name"]}\n') print(header) - for v in hstat['agent_info'].values(): - print(fmt.format(**v)) + class FindAgentType(dict): + def __missing__(self, key): + if key == 'agent': + if 'agent-class' in self: + return self['agent-class'] + if 'agent-exe' in self: + return self['agent-exe'] + raise KeyError + hdata = sorted([FindAgentType(v) for v in hstat['agent_info'].values()], + key=lambda i: i['instance-id']) + for v in hdata: + print(fmt.format_map(v)) print() if len(status['warnings']): @@ -814,9 +828,9 @@ def main(args=None): status = get_status(args, site_config) for host_data in status['hosts']: active_hms = [v for v in host_data['agent_info'].values() - if v['agent-class'] == HOSTMANAGER_CLASS] + if v.get('agent-class') == HOSTMANAGER_CLASS] others = [v for v in host_data['agent_info'].values() - if v['agent-class'] != HOSTMANAGER_CLASS] + if v.get('agent-class') != HOSTMANAGER_CLASS] for inst in active_hms: if args.all or inst['instance-id'] in args.instance: hms.append(inst) diff --git a/ocs/site_config.py b/ocs/site_config.py index 0315c207..6d93c62c 100644 --- a/ocs/site_config.py +++ b/ocs/site_config.py @@ -229,6 +229,12 @@ def from_dict(cls, data, parent=None): the Agent instance, as a way of finding the right InstanceConfig. + ``agent-exe`` (str, optional) + The Agent executable. This + may be matched against the agent_exe name provided by + the Agent instance, as a way of finding the right + InstanceConfig. + ``arguments`` (list, optional): A list of arguments that should be passed back to the agent. Historically the arguments have been grouped into @@ -250,6 +256,10 @@ def from_dict(cls, data, parent=None): self.manage = self.data.get('manage') if self.manage is None: self.manage = "yes" + if 'agent-class' not in self.data: + self.data['agent-class'] = None + if 'agent-exe' not in self.data: + self.data['agent-exe'] = None return self @@ -391,7 +401,7 @@ def add_arguments(parser=None): return parser -def get_config(args, agent_class=None): +def get_config(args, agent_class=None, agent_exe=None): """ Args: args: The argument object returned by @@ -400,13 +410,15 @@ def get_config(args, agent_class=None): in this object. agent_class: Class name passed in to match against the list of device classes in each host's list. + agent_exe: Executable path or name passed in to match against the list of + agent executables in each host's list. Special values accepted for agent_class: - '*control*': do not insist on matching host or device. - '*host*': do not insist on matching device (but do match host). Returns: - The tuple (site_config, host_config, device_config). + The tuple (site_config, host_config, instance_config). """ if args.site == 'none': return (None, None, None) @@ -480,16 +492,26 @@ def get_config(args, agent_class=None): instance_config = InstanceConfig.from_dict( dev, parent=host_config) break - else: + elif agent_class is not None: # Use the agent_class to figure it out... for dev in host_config.instances: - if dev['agent-class'] == agent_class: + if 'agent-class' in dev and dev['agent-class'] == agent_class: if instance_config is not None: raise RuntimeError( f"Multiple matches found for agent-class={agent_class}" " ... you probably need to pass --instance-id=") instance_config = InstanceConfig.from_dict( dev, parent=host_config) + elif agent_exe is not None: + # Use the agent_exe to figure it out... + for dev in host_config.instances: + if 'agent-exe' in dev and dev['agent-exe'] == agent_exe: + if instance_config is not None: + raise RuntimeError( + f"Multiple matches found for agent-exe={agent_exe}" + " ... you probably need to pass --instance-id=") + instance_config = InstanceConfig.from_dict( + dev, parent=host_config) if instance_config is None and not no_dev_match: raise RuntimeError("Could not find matching device description.") return collections.namedtuple('SiteConfig', ['site', 'host', 'instance'])(site_config, host_config, instance_config) @@ -657,7 +679,7 @@ def scan_for_agents(do_registration=True): return items -def parse_args(agent_class=None, parser=None, args=None): +def parse_args(agent_class=None, agent_exe=None, parser=None, args=None): """ Function to parse site-config and agent arguments. This function takes site, host, and instance arguments into account by making sure the instance @@ -672,6 +694,11 @@ def parse_args(agent_class=None, parser=None, args=None): may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig. + agent_exe (str, optional): + Name of the Agent executable. This + may be matched against the agent_exe name provided by + the Agent instance, as a way of finding the right + InstanceConfig. parser (argparse.ArgumentParser, optional): Argument parser containing agent-specific arguments. If None, an empty parser will be created. @@ -705,7 +732,7 @@ def parse_args(agent_class=None, parser=None, args=None): pre_args, _ = pre_parser.parse_known_args(args=args) - site, host, instance = get_config(pre_args, agent_class=agent_class) + site, host, instance = get_config(pre_args, agent_class=agent_class, agent_exe=agent_exe) if instance is not None: # When the user omits instance_id, it can still be matched, @@ -743,7 +770,9 @@ def flatten(container): add_site_attributes(args, site, host=host) # Add agent_class attribute. - if not hasattr(args, 'agent_class'): + if not hasattr(args, 'agent_class') and agent_class is not None: setattr(args, 'agent_class', agent_class) + if not hasattr(args, 'agent_exe') and agent_exe is not None: + setattr(args, 'agent_exe', agent_exe) return args From 3bb78423fdeb9c48e3962f52d612656d2dcd0d54 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 24 Feb 2023 22:30:10 +0000 Subject: [PATCH 02/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ocs/agents/host_manager/agent.py | 8 ++++---- ocs/agents/host_manager/drivers.py | 2 +- ocs/ocsbow.py | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 539141b7..f2d745c0 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -238,9 +238,9 @@ def retire(db_key): instance = None if instance is None: if record.get("agent-class") is not None: - full_name=(f'{record["agent-class"]}:{record["db_key"]}') + full_name = (f'{record["agent-class"]}:{record["db_key"]}') else: - full_name=(f'{record["agent-exe"]}:{record["db_key"]}') + full_name = (f'{record["agent-exe"]}:{record["db_key"]}') instance = hm_utils.ManagedInstance.init( management=record.get("mgmt"), instance_id=record.get("instance-id"), @@ -345,12 +345,12 @@ def _launch_instance(self, instance): '--site-host', self.host_name, '--working-dir', self.working_dir]) elif instance.get('agent_exe') is not None: - cmd = [instance['agent_exe'], '--instance-id', self.address_root+'.'+iid, + cmd = [instance['agent_exe'], '--instance-id', self.address_root + '.' + iid, '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] if "agent_arguments" in instance: cmd.extend(instance["agent_arguments"]) if instance['write_logs']: - log_file_path = self.log_dir+'/'+self.address_root+'.'+iid+".log" + log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log" else: log_file_path = None prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path) diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index 1ec7d697..6c68b809 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -224,7 +224,7 @@ def __init__(self, instance_id, cmd, log_file=None): self.cmd = cmd self.lines = {'stderr': [], 'stdout': []} - self.log_file = open(log_file,"ab") if log_file is not None else None + self.log_file = open(log_file, "ab") if log_file is not None else None def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ) diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index bf4728dd..81a5a66d 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -301,6 +301,7 @@ def print_status(args, site_config): print('-' * len(header)) print(f'Host: {hstat["host_name"]}\n') print(header) + class FindAgentType(dict): def __missing__(self, key): if key == 'agent': From feab97fd66a1eeb204cb1554e951316e7656fb24 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 Feb 2023 17:45:32 -0500 Subject: [PATCH 03/23] Fix reference to removed variable --- ocs/agents/host_manager/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index f2d745c0..bddd9868 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -195,7 +195,7 @@ def retire(db_key): session.add_message( f'The agent config for instance-id=' f'{iid} was matched to docker service ' - f'{srv}, but config does not specify ' + f'{record["srv"]}, but config does not specify ' f'manage:docker! Dropping both.') retire(iid) continue From e1fbd11712c64506b59f4102e0960ca62739cc57 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 6 Mar 2023 19:25:56 +0000 Subject: [PATCH 04/23] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/pre-commit/mirrors-autopep8: v2.0.1 → v2.0.2](https://github.com/pre-commit/mirrors-autopep8/compare/v2.0.1...v2.0.2) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b0b8c5c2..a5bb52b3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-autopep8 - rev: v2.0.1 + rev: v2.0.2 hooks: - id: autopep8 - repo: https://github.com/pycqa/flake8 From 92371ffd799e610b383a783404232fc6b5a5ae56 Mon Sep 17 00:00:00 2001 From: Jack Lashner Date: Wed, 8 Mar 2023 10:52:25 -0500 Subject: [PATCH 05/23] Separate blocks for each registered operation (#312) * Separate blocks for each registered operation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * tests: Remove check for exception in changed agent ops test * Switch dict representation --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Brian Koopman --- ocs/agents/registry/agent.py | 8 +++----- tests/agents/test_registry_agent.py | 4 +--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/ocs/agents/registry/agent.py b/ocs/agents/registry/agent.py index fc409c76..7970ccb2 100644 --- a/ocs/agents/registry/agent.py +++ b/ocs/agents/registry/agent.py @@ -132,9 +132,6 @@ def _publish_agent_ops(self, reg_agent): """ addr = reg_agent.agent_address - msg = {'block_name': addr, - 'timestamp': time.time(), - 'data': {}} self.log.debug(addr) for op_name, op_code in reg_agent.op_codes.items(): field = f'{addr}_{op_name}' @@ -146,8 +143,9 @@ def _publish_agent_ops(self, reg_agent): except ValueError as e: self.log.warn(f"Improper field name: {field}\n{e}") continue - msg['data'][field] = op_code - if msg['data']: + msg = {'block_name': field, + 'timestamp': time.time(), + 'data': {field: op_code}} self.agent.publish_to_feed('agent_operations', msg) @ocs_agent.param('test_mode', default=False, type=bool) diff --git a/tests/agents/test_registry_agent.py b/tests/agents/test_registry_agent.py index 06c7f01c..20a15edb 100644 --- a/tests/agents/test_registry_agent.py +++ b/tests/agents/test_registry_agent.py @@ -1,5 +1,4 @@ import time -import pytest import pytest_twisted from agents.util import create_session, create_agent_fixture @@ -120,5 +119,4 @@ def test_registry_handles_changed_agent_ops(agent): # Add a new op to the registered agent reg_agent.op_codes = {'op_name': 1, 'new_op': 1} - with pytest.raises(Exception): - agent._publish_agent_ops(reg_agent) + agent._publish_agent_ops(reg_agent) From 6bfd15451eeed42e6dc3d5cf09f206b53931be9d Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 15 Mar 2023 12:53:48 -0400 Subject: [PATCH 06/23] Add importlib_metadata dependency for ocs-agent-cli --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index b1b553ac..88803ffd 100644 --- a/setup.py +++ b/setup.py @@ -48,6 +48,7 @@ 'PyYAML', 'influxdb', 'numpy', + 'importlib_metadata;python_version<"3.10"', ], extras_require={ "so3g": ["so3g"], From 919b9669525233c654f9740c3752badbaafe782e Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Wed, 12 Apr 2023 20:45:22 +0000 Subject: [PATCH 07/23] Use procsettitle to have ocs-agent-cli rename itself Instead of showing up in top as "ocs-agent-cli", it shows up as "ocs-agent:{instance_id}". --- ocs/agent_cli.py | 5 +++++ requirements.txt | 1 + setup.py | 1 + 3 files changed, 7 insertions(+) diff --git a/ocs/agent_cli.py b/ocs/agent_cli.py index e3e2b786..bcbe6816 100755 --- a/ocs/agent_cli.py +++ b/ocs/agent_cli.py @@ -1,6 +1,7 @@ import argparse import importlib import os +import setproctitle import sys import warnings @@ -182,6 +183,10 @@ def main(args=None): mod = importlib.import_module(_module) + title = f'ocs-agent:{instance.data["instance-id"]}' + print(f'Renaming this process to: "{title}"') + setproctitle.setproctitle(title) + start = getattr(mod, entrypoint) # This is the start function. start(args=post_args) diff --git a/requirements.txt b/requirements.txt index aae6bca3..09aecc20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ twisted deprecation PyYAML importlib_metadata +setproctitle # InfluxDB Publisher influxdb diff --git a/setup.py b/setup.py index 88803ffd..788d6af7 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ 'influxdb', 'numpy', 'importlib_metadata;python_version<"3.10"', + 'setproctitle', ], extras_require={ "so3g": ["so3g"], From ff41bb8f7d95eea69558331d4fc838c4c295b0a9 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 10 May 2023 16:54:55 -0400 Subject: [PATCH 08/23] Fix readthedocs builds (#328) * Update os+python version on readthedocs * Add rtd config to ignore paths for workflows --- .github/workflows/develop.yml | 1 + .github/workflows/pytest.yml | 1 + .github/workflows/skipped-pytest.yml | 1 + .readthedocs.yaml | 6 +++++- 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 9f07095a..9e54eadc 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -9,6 +9,7 @@ on: - '**.md' - '.flake8' - '.pre-commit-config.yaml' + - '.readthedocs.yaml' jobs: test: diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index f4cdc15b..c833815f 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -8,6 +8,7 @@ on: - '**.md' - '.flake8' - '.pre-commit-config.yaml' + - '.readthedocs.yaml' workflow_call: jobs: diff --git a/.github/workflows/skipped-pytest.yml b/.github/workflows/skipped-pytest.yml index 400ca2d9..bd98b2db 100644 --- a/.github/workflows/skipped-pytest.yml +++ b/.github/workflows/skipped-pytest.yml @@ -8,6 +8,7 @@ on: - '**.md' - '.flake8' - '.pre-commit-config.yaml' + - '.readthedocs.yaml' jobs: test: diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 9a17f94d..85c668b8 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,10 +1,14 @@ version: 2 +build: + os: ubuntu-22.04 + tools: + python: "3.10" + sphinx: configuration: docs/conf.py python: - version: 3.7 install: - requirements: docs/requirements.txt - requirements: requirements.txt From 0a0404de8de128580296570d310dbffba757ece1 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Thu, 11 May 2023 17:20:45 -0400 Subject: [PATCH 09/23] Improve test/development portability (#319) * Add extra dependencies for development * Qualify image names with repository * Fix test failures from calling `os.path.join` on mock objects * Fix mistakes in dev dependency list * Revert "Qualify image names with repository" This change was unsafe, as it would run tests over previously released code, instead of possibly modified, local code. This reverts commit f357ad3612f2c309a2f281ec3f2a7d62f0c64736. --------- Co-authored-by: C. Weaver --- setup.py | 7 ++++++- tests/test_aggregator.py | 8 ++++---- tests/test_ocs_agent.py | 1 + 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index 788d6af7..95eea52b 100644 --- a/setup.py +++ b/setup.py @@ -5,6 +5,10 @@ with open("README.rst", "r", encoding="utf-8") as fh: long_description = fh.read() +so3g_etxras = ["so3g"] +dev_extras = ["pytest", "pytest-twisted", "pytest-docker-compose", "pytest-cov", "coverage", "docker"] +dev_extras.extend(so3g_etxras) + setup(name='ocs', description='Observatory Control System', long_description=long_description, @@ -52,6 +56,7 @@ 'setproctitle', ], extras_require={ - "so3g": ["so3g"], + "so3g": so3g_etxras, + "dev": dev_extras, }, ) diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py index f874f58d..8fe52fa9 100644 --- a/tests/test_aggregator.py +++ b/tests/test_aggregator.py @@ -424,7 +424,6 @@ def test_make_filename_directory_creation_no_subdirs(tmpdir): make_filename(test_dir, make_subdirs=False) -@patch('os.makedirs', side_effect=PermissionError('mocked permission error')) def test_make_filename_directory_creation_permissions(tmpdir): """make_filename() should raise a PermissionError if it runs into one when making the directories. @@ -433,6 +432,7 @@ def test_make_filename_directory_creation_permissions(tmpdir): """ test_dir = os.path.join(tmpdir, 'data') - with pytest.raises(PermissionError) as e_info: - make_filename(test_dir) - assert str(e_info.value) == 'mocked permission error' + with patch('os.makedirs', side_effect=PermissionError('mocked permission error')): + with pytest.raises(PermissionError) as e_info: + make_filename(test_dir) + assert str(e_info.value) == 'mocked permission error' diff --git a/tests/test_ocs_agent.py b/tests/test_ocs_agent.py index d300276c..e6c1a83f 100644 --- a/tests/test_ocs_agent.py +++ b/tests/test_ocs_agent.py @@ -50,6 +50,7 @@ def mock_agent(): """ mock_config = MagicMock() mock_site_args = MagicMock() + mock_site_args.working_dir = "./" mock_site_args.log_dir = "./" a = OCSAgent(mock_config, mock_site_args, address='test.address') return a From 93cc8b2e4f174112f4db1a942dc9d0e3a1ca002e Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Tue, 16 May 2023 13:38:27 -0400 Subject: [PATCH 10/23] Revist address_root != "observatory" (#327) * ocs-local-support: warn if crossbar/scf disagree on realm/address_root * registry subscribes to {address_root}.* not observatory.* Also changed OCSAgent to fail if any startup subscriptions fail. * aggregator subscribes to {address_root}.* not observatory.* * influxdb_publisher subscribes to {address_root}.* not observatory.* * InfluxDBAgent: exit cleanly on ctrl-c ... even when looping on failed connections to influxdb. * Tweak a few references to "observatory" in docstrings * docs: clarify "observatory" as default address_root * Remove all references to "registry_address" This SCF param actually wasn't used anywhere, except optionally in the ocs-agent-cli (which now simply defaults to {address_root}.registry and can still be overridden). The command-line argument remains, but is doc'ed as deprecated and has no effect on anything. * Fix registry tests (needs args.address_root) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * ocs-local-support: better error message if crossbar not managed ... in response to requests for "generate_crossbar_config" and "start crossbar". Also update erroneous docs for the former. * ocs-crossbar image can be configured more, with env Now the crossbar config can be bind-mounted in, or else some envvars are used to generate one from a template before crossbar is started. * ocs_agent: more informative error message on realm mismatch Also the SCF docs warn about updating the crossbar config. * Update documentation related to crossbar configuration --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Brian Koopman --- docker/crossbar/Dockerfile | 9 +- .../{config.json => config.json.template} | 10 +- docker/crossbar/run-crossbar.sh | 28 +++ docs/agents/aggregator.rst | 4 +- docs/developer/writing_an_agent/docker.rst | 1 - docs/user/crossbar_config.rst | 167 ++++++++++-------- docs/user/docker_config.rst | 4 +- docs/user/quickstart.rst | 1 - docs/user/site_config.rst | 13 +- example/miniobs/default.yaml | 1 - ocs/agents/aggregator/agent.py | 2 +- ocs/agents/host_manager/agent.py | 3 - ocs/agents/influxdb_publisher/agent.py | 3 +- ocs/agents/influxdb_publisher/drivers.py | 9 +- ocs/agents/registry/agent.py | 2 +- ocs/checkdata.py | 4 +- ocs/client_cli.py | 7 +- ocs/ocs_agent.py | 22 ++- ocs/ocsbow.py | 42 ++++- ocs/site_config.py | 15 +- tests/agents/test_registry_agent.py | 2 + tests/default.yaml | 1 - tests/test_site_config.py | 6 +- 23 files changed, 228 insertions(+), 128 deletions(-) rename docker/crossbar/{config.json => config.json.template} (92%) create mode 100644 docker/crossbar/run-crossbar.sh diff --git a/docker/crossbar/Dockerfile b/docker/crossbar/Dockerfile index b479604e..2adb8885 100644 --- a/docker/crossbar/Dockerfile +++ b/docker/crossbar/Dockerfile @@ -8,8 +8,11 @@ FROM crossbario/crossbar:cpy3-20.8.1 # Run as root to put config in place and chown USER root -# Copy in config and requirements files -COPY config.json /ocs/.crossbar/config.json +# Copy in config template and wrapper script +COPY run-crossbar.sh /ocs/run-crossbar.sh +RUN chmod a+x /ocs/run-crossbar.sh + +COPY config.json.template /ocs/.crossbar/config-with-address.json.template RUN chown -R crossbar:crossbar /ocs # Run image as crossbar user during normal operation @@ -20,4 +23,4 @@ EXPOSE 8001 # Run crossbar when the container launches # User made config.json should be mounted to /ocs/.crossbar/config.json -ENTRYPOINT ["crossbar", "start", "--cbdir", "/ocs/.crossbar"] +ENTRYPOINT ["/ocs/run-crossbar.sh"] diff --git a/docker/crossbar/config.json b/docker/crossbar/config.json.template similarity index 92% rename from docker/crossbar/config.json rename to docker/crossbar/config.json.template index 5f1519a0..c632c48c 100644 --- a/docker/crossbar/config.json +++ b/docker/crossbar/config.json.template @@ -10,13 +10,13 @@ }, "realms": [ { - "name": "test_realm", + "name": "{realm}", "roles": [ { "name": "iocs_agent", "permissions": [ { - "uri": "observatory.", + "uri": "{address_root}.", "match": "prefix", "allow": { "call": true, @@ -36,7 +36,7 @@ "name": "iocs_controller", "permissions": [ { - "uri": "observatory.", + "uri": "{address_root}.", "match": "prefix", "allow": { "call": true, @@ -60,7 +60,7 @@ "type": "web", "endpoint": { "type": "tcp", - "port": 8001 + "port": {port} }, "paths": { "ws": { @@ -81,7 +81,7 @@ }, "call": { "type": "caller", - "realm": "test_realm", + "realm": "{realm}", "role": "iocs_controller", "options": { } diff --git a/docker/crossbar/run-crossbar.sh b/docker/crossbar/run-crossbar.sh new file mode 100644 index 00000000..f074afc5 --- /dev/null +++ b/docker/crossbar/run-crossbar.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +CONFIG_DIR=/ocs/.crossbar +OCS_ADDRESS_ROOT=${OCS_ADDRESS_ROOT:-observatory} +OCS_CROSSBAR_REALM=${OCS_REALM:-test_realm} +OCS_CROSSBAR_PORT=${OCS_PORT:-8001} + +# Did user mount in a config.json? +if [ -e $CONFIG_DIR/config.json ]; then + echo Launching user-provided config.json + CONFIG_FILE=$CONFIG_DIR/config.json +else + pattern=" + s/{address_root}/${OCS_ADDRESS_ROOT}/g + s/{realm}/${OCS_CROSSBAR_REALM}/g + s/{port}/${OCS_CROSSBAR_PORT}/g + " + echo "Processing template with replacements:" + echo "$pattern" + echo + + CONFIG_FILE=$CONFIG_DIR/config-with-address.json + sed "$pattern" \ + $CONFIG_DIR/config-with-address.json.template \ + > $CONFIG_FILE +fi + +crossbar start --cbdir $CONFIG_DIR --config $CONFIG_FILE diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index 31e624b2..2e3c63d8 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -90,7 +90,9 @@ to register a feed so that it will be recorded by the aggregator. Unregistered providers will automatically be added when they send data, and stale providers will be removed if no data is received in a specified time period. -To do this, the aggregator monitors all feeds in the ``observatory`` namespace to find + +To do this, the aggregator monitors all feeds in the namespace defined +by the `{address_root}` prefix to find feeds that should be recorded. If the aggregator receives data from a feed registered with ``record=True``, it will automatically add that feed as a Provider, and will start putting incoming data into frames every ``frame_length`` diff --git a/docs/developer/writing_an_agent/docker.rst b/docs/developer/writing_an_agent/docker.rst index 2e4a7eea..e8d97656 100644 --- a/docs/developer/writing_an_agent/docker.rst +++ b/docs/developer/writing_an_agent/docker.rst @@ -84,7 +84,6 @@ the BarbonesAgent config to the ``ocs-docker`` host.: wamp_http: http://localhost:8001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/docs/user/crossbar_config.rst b/docs/user/crossbar_config.rst index dd1e5ac2..0a02a07a 100644 --- a/docs/user/crossbar_config.rst +++ b/docs/user/crossbar_config.rst @@ -12,48 +12,118 @@ the interface to the crossbar server. .. note:: - For most test deployments of OCS, you should not need to modify this file - and can use the one that comes with the ``simonsobs/ocs-crossbar`` Docker - Image. + For most simple lab deployments of OCS, you should not need to modify this + file and can use the one that comes with the ``simonsobs/ocs-crossbar`` + Docker Image. -Example Config --------------- -An example of the default OCS crossbar config that is bundled into -``simonsobs/ocs-crossbar`` can be found in the repository at -`ocs/docker/crossbar/config.json`_. This is based on the template in -`ocs/ocs/support/crossbar_config.json`_. +Configuration File Template +--------------------------- +The template that the default OCS crossbar config is built with is shown here: + +.. literalinclude:: ../../docker/crossbar/config.json.template -The unique parts of this to OCS are the realm name, "test_realm", defined -roles of "iocs_agent" and "iocs_controller, and "address_root" of -"observatory.". Additionally, we run on port 8001. +The variables `realm`, `address_root`, and `port` are all configurable and must +match configuration options set in your SCF. Keep reading this page to see how +to configure these variables. + +.. note:: + Changing the `address_root` has implications for the how your data is + stored and accessed in tools such as Grafana. It is recommended you pick + something reasonable when you first configure your system and do not change it + later. For further details on crossbar server configuration, see the crossbar `Router Configuration`_ page. -.. _`ocs/docker/crossbar/config.json`: https://github.com/simonsobs/ocs/blob/main/docker/crossbar/config.json -.. _`ocs/ocs/support/crossbar_config.json`: https://github.com/simonsobs/ocs/blob/main/ocs/support/crossbar_config.json .. _`Router Configuration`: https://crossbar.io/docs/Router-Configuration/ +Running with Docker +=================== + +We recommend running crossbar within a Docker container. We build the +``simonsobs/ocs-crossbar`` container from the official `crossbar.io Docker +image`_, specifically the cpy3 version. Bundled within the container is a +simple crossbar configuration file template with defaults that are +compatible with examples in this documentation. + +To adjust the crossbar configuration in the container, you can either: + +- Use environment variables to alter the most basic settings +- Generate and mount a new configuration file over + ``/ocs/.crossbar/config.json`` with the proper permissions + +.. _`crossbar.io Docker image`: https://hub.docker.com/r/crossbario/crossbar + +Environment variables in ocs-crossbar +------------------------------------- +The following environment variables can be set, to affect the +generation of the crossbar configuration file when the container +starts up: + +- OCS_ADDRESS_ROOT (default "observatory"): the base URI for OCS + entities (this needs to match the `address_root` set in the SCF). +- OCS_CROSSBAR_REALM (default "test_realm"): the WAMP realm to + configure for OCS. +- OCS_CROSSBAR_PORT (default 8001): the port on which crossbar will + accept requests. + +Here is an example of a docker-compose entry that overrides the +OCS_ADDRESS_ROOT:: + + crossbar: + image: simonsobs/ocs-crossbar:latest + ports: + - "127.0.0.1:8001:8001" # expose for OCS + environment: + - PYTHONUNBUFFERED=1 + - OCS_ADDRESS_ROOT=laboratory + +Bind Mounting the Configuration +------------------------------- +To instead mount a new configuration into the pre-built image, first chown +your file to be owned by user and group 242 (the default crossbar UID/GID), +then mount it appropriately in your docker-compose file. Here we assume you +put the configuration in the directory ``./dot_crossbar/``:: + + $ chown -R 242:242 dot_crossbar/ + +.. note:: + If you do not already have a configuration file to modify and use, see the + next section on generating one. + +Your docker-compose service should then be configured like:: + + crossbar: + image: simonsobs/ocs-crossbar + ports: + - "8001:8001" # expose for OCS + volumes: + - ./dot_crossbar:/ocs/.crossbar + environment: + - PYTHONUNBUFFERED=1 + Generating a New Config File ---------------------------- -``ocsbow`` can be used to generate a default configuration file, based on -options in your OSC file, which can then be modified if needed. +``ocs-local-support`` can be used to generate a default configuration +file, based on options in your SCF, which can then be modified if +needed. First, we make sure our ``OCS_CONFIG_DIR`` environment variable is set:: $ cd ocs-site-configs/ $ export OCS_CONFIG_DIR=`pwd` -We should make a directory for the crossbar config, let's call it -``dot_crossbar/`` (typically a dot directory, but for visibility we'll avoid -that):: +We should make a directory for the crossbar config, following along above let's +call it ``dot_crossbar/`` (typically a dot directory, but for visibility we'll +avoid that):: $ mkdir -p ocs-site-configs/dot_crossbar/ This directory needs to be configured as your crossbar 'config-dir' in your -ocs-site-config file. Now we can generate the config:: +ocs-site-config file. (See example in :ref:`site_config_user`.) Now we can +generate the config:: - $ ocsbow crossbar generate_config + $ ocs-local-support generate_crossbar_config The crossbar config-dir is set to: ./dot_crossbar/ Using @@ -68,54 +138,7 @@ modifications needed for your deployment. .. note:: - The crossbar 'config-dir' block and the 'agent-instance' block defining the - 'HostManager' Agent are both required for the system you are running ocsbow - on. Be sure to add these to your SCF if they do not exist. - -Running with Docker -=================== - -We recommend running crossbar within a Docker container. We build the -``simonsobs/ocs-crossbar`` container from the official `crossbar.io Docker -image`_, specifically the cpy3 version. Bundled within the container is a -simple OCS configuration that should work with the configuration -recommendations in this documentation. - -If changes need to be made, then you will need to generate your own -configuration file as described above. To use a modified configuration in the -container you can either: - -- Edit the default configuration file and rebuild the Docker image -- Mount the new configuration file over ``/ocs/.crossbar/config.json`` with the - proper permissions - -.. _`crossbar.io Docker image`: https://hub.docker.com/r/crossbario/crossbar - -Rebuilding the Docker Image ---------------------------- -To rebuild the Docker image after modifying ``ocs/docker/config.json`` run:: - - $ docker build -t ocs-crossbar . - -You should then update your configuration to use the new, local, -``ocs-crossbar`` image. - -Bind Mounting the Configuration -------------------------------- -To instead mount the new configuration into the pre-built image, first chown -your file to be owned by user and group 242 (the default crossbar UID/GID), -then mount it appropriately in your docker-compose file. Here we assume you -put the configuration in the directory ``./dot_crossbar/``:: - - $ chown -R 242:242 dot_crossbar/ - -Your docker-compose service should then be configured like:: - - crossbar: - image: simonsobs/ocs-crossbar - ports: - - "8001:8001" # expose for OCS - volumes: - - ./dot_crossbar:/ocs/.crossbar - environment: - - PYTHONUNBUFFERED=1 + The crossbar 'config-dir' block and the 'agent-instance' block + defining the 'HostManager' Agent are both required for the system + you are running `ocs-local-support` on. Be sure to add these to + your SCF if they do not exist. diff --git a/docs/user/docker_config.rst b/docs/user/docker_config.rst index 90d27853..9d0bb626 100644 --- a/docs/user/docker_config.rst +++ b/docs/user/docker_config.rst @@ -61,7 +61,7 @@ components):: ports: - "127.0.0.1:8001:8001" # expose for OCS environment: - - PYTHONUNBUFFERED=1 + - PYTHONUNBUFFERED=1 # -------------------------------------------------------------------------- # OCS Agents @@ -242,7 +242,7 @@ Where the separate compose files would look something like:: ports: - "127.0.0.1:8001:8001" # expose for OCS environment: - - PYTHONUNBUFFERED=1 + - PYTHONUNBUFFERED=1 :: diff --git a/docs/user/quickstart.rst b/docs/user/quickstart.rst index bf42ab5d..3886a0a9 100644 --- a/docs/user/quickstart.rst +++ b/docs/user/quickstart.rst @@ -48,7 +48,6 @@ structure. wamp_http: http://localhost:8001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/docs/user/site_config.rst b/docs/user/site_config.rst index b6e0760c..a5023c8a 100644 --- a/docs/user/site_config.rst +++ b/docs/user/site_config.rst @@ -35,7 +35,6 @@ instances (running two different classes of agent): wamp_http: http://10.10.10.3:8001/call wamp_realm: test_realm address_root: observatory - registry_agent: observatory.registry hosts: @@ -116,6 +115,18 @@ The `hub` section defines the connection parameters for the crossbar server. This entire section will likely remain unchanged, except for the ``wamp_server`` and ``wamp_http`` IP addresses. +The `address_root` setting determines the leading token in all agent +and feed addresses on the crossbar network. While "observatory" is +the default, it can be changed as long as the crossbar configuration +is also updated to permit operations on the `{address_root}.` uri. + +.. warning:: + The hub settings must match the crossbar configuration. If you + change `wamp_realm` or `address_root`, especially, be sure to + update your crossbar configuration accordingly. (If using the + ocs-crossbar docker image, this can be done through environment + variables in the ``docker-compose.yaml`` file.) + Under `hosts` we have defined a three hosts, `host-1`, `host-1-docker`, and `host-2`. This configuration example shows a mix of Agents running directly on hosts and running within Docker containers. diff --git a/example/miniobs/default.yaml b/example/miniobs/default.yaml index 3b3d7a44..43710ee8 100644 --- a/example/miniobs/default.yaml +++ b/example/miniobs/default.yaml @@ -5,7 +5,6 @@ hub: wamp_http: http://localhost:8001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/ocs/agents/aggregator/agent.py b/ocs/agents/aggregator/agent.py index 06a1bb34..3f16cb45 100644 --- a/ocs/agents/aggregator/agent.py +++ b/ocs/agents/aggregator/agent.py @@ -57,7 +57,7 @@ def __init__(self, agent, args): # If this ends up being too much data, we can add a tag '.record' # at the end of the address of recorded feeds, and filter by that. self.agent.subscribe_on_start(self._enqueue_incoming_data, - 'observatory..feeds.', + f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 36507b9a..dc5bb650 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -601,9 +601,6 @@ def main(args=None): os.dup2(null, stream.fileno()) os.close(null) - # To reduce "try again" noise, don't tell Registry about HostManager. - args.registry_address = 'none' - agent, runner = ocs_agent.init_site_agent(args) docker_composes = [] diff --git a/ocs/agents/influxdb_publisher/agent.py b/ocs/agents/influxdb_publisher/agent.py index aa274f3b..e35ef00a 100644 --- a/ocs/agents/influxdb_publisher/agent.py +++ b/ocs/agents/influxdb_publisher/agent.py @@ -50,7 +50,7 @@ def __init__(self, agent, args): self.loop_time = 1 self.agent.subscribe_on_start(self._enqueue_incoming_data, - 'observatory..feeds.', + f'{args.address_root}..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') @@ -97,6 +97,7 @@ def record(self, session: ocs_agent.OpSession, params): port=self.args.port, protocol=self.args.protocol, gzip=self.args.gzip, + operate_callback=lambda: self.aggregate, ) session.set_status('running') diff --git a/ocs/agents/influxdb_publisher/drivers.py b/ocs/agents/influxdb_publisher/drivers.py index 7f09cba8..23fdc889 100644 --- a/ocs/agents/influxdb_publisher/drivers.py +++ b/ocs/agents/influxdb_publisher/drivers.py @@ -51,6 +51,9 @@ class Publisher: Protocol for writing data. Either 'line' or 'json'. gzip (bool, optional): compress influxdb requsts with gzip + operate_callback (callable, optional): + Function to call to see if failed connections should be + retried (to prevent a thread from locking). Attributes: host (str): @@ -66,7 +69,8 @@ class Publisher: """ - def __init__(self, host, database, incoming_data, port=8086, protocol='line', gzip=False): + def __init__(self, host, database, incoming_data, port=8086, protocol='line', + gzip=False, operate_callback=None): self.host = host self.port = port self.db = database @@ -88,6 +92,9 @@ def __init__(self, host, database, incoming_data, port=8086, protocol='line', gz LOG.error("Connection error, attempting to reconnect to DB.") self.client = InfluxDBClient(host=self.host, port=self.port, gzip=gzip) time.sleep(1) + if operate_callback and not operate_callback(): + break + db_names = [x['name'] for x in db_list] if self.db not in db_names: diff --git a/ocs/agents/registry/agent.py b/ocs/agents/registry/agent.py index 7970ccb2..f53d644d 100644 --- a/ocs/agents/registry/agent.py +++ b/ocs/agents/registry/agent.py @@ -98,7 +98,7 @@ def __init__(self, agent, args): self.agent_timeout = 5.0 # Removes agent after 5 seconds of no heartbeat. self.agent.subscribe_on_start( - self._register_heartbeat, 'observatory..feeds.heartbeat', + self._register_heartbeat, f'{args.address_root}..feeds.heartbeat', options={'match': 'wildcard'} ) diff --git a/ocs/checkdata.py b/ocs/checkdata.py index f3d21f03..b589f284 100644 --- a/ocs/checkdata.py +++ b/ocs/checkdata.py @@ -108,11 +108,11 @@ def _populate_instances(self): {FEED1: {"fields": {FIELD1: - {'full_name': 'observatory.INSTANCE_ID.feeds.FEED.FIELD', + {'full_name': 'ADDRESS_ROOT.INSTANCE_ID.feeds.FEED.FIELD', 't_last': float, 'v_last': float}, FIELD2: - {'full_name': 'observatory.INSTANCE_ID.feeds.FEED.FIELD', + {'full_name': 'ADDRESS_ROOT.INSTANCE_ID.feeds.FEED.FIELD', 't_last': float, 'v_last': float} }, diff --git a/ocs/client_cli.py b/ocs/client_cli.py index 7065f622..35b73724 100755 --- a/ocs/client_cli.py +++ b/ocs/client_cli.py @@ -62,7 +62,8 @@ def get_parser(): # scan p = client_sp.add_parser('scan', help="Gather and print list of Agents.") p.add_argument('--details', action='store_true', help="List all Operations with their current status OpCode.") - p.add_argument('--use-registry', action='store_true', help="Query the registry (faster than listening for heartbeats).") + p.add_argument('--use-registry', nargs='?', const='registry', help="Query the registry (faster than listening for heartbeats). " + "Pass the registry instance_id as an argument (default to 'registry').") # scan p = client_sp.add_parser('listen', help="Subscribe to feed(s) and dump to stdout.") @@ -122,9 +123,7 @@ def scan(parser, args): parser.error('Unable to find the OCS config; set OCS_CONFIG_DIR?') if args.use_registry: - reg_addr = args.registry_address - if reg_addr is None: - reg_addr = 'registry' + reg_addr = f'{args.address_root}.{args.use_registry}' try: c = OCSClient(get_instance_id(reg_addr, args), args=args) except RuntimeError as e: diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 97aa97d0..fc06a6e8 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -4,7 +4,7 @@ txaio.use_twisted() from twisted.internet import reactor, task, threads -from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList, FirstError +from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList, FirstError, maybeDeferred from twisted.internet.error import ReactorNotRunning from twisted.python import log @@ -171,9 +171,16 @@ def onJoin(self, details): try: yield self.register(self._ops_handler, self.agent_address + '.ops') yield self.register(self._management_handler, self.agent_address) - except ApplicationError: - self.log.error('Failed to register basic handlers @ %s; ' - 'agent probably running already.' % self.agent_address) + except ApplicationError as e: + self.log.error('Failed to register basic handlers! ' + 'Error: {error}', error=e) + if e.error == 'wamp.error.not_authorized': + self.log.error('Are the WAMP realm and OCS address_root consistent ' + 'in OCS site config and crossbar config.json?') + elif e.error == 'wamp.error.procedure_already_exists': + self.log.error('Is this agent already running? ' + 'agent_address="{agent_address}"', + agent_address=self.agent_address) self.leave() return @@ -196,8 +203,13 @@ def heartbeat(): self.heartbeat_call.start(1.0) # Calls the hearbeat every second # Subscribe to startup_subs + def _subscribe_fail(*args, **kwargs): + self.log.error('Failed to subscribe to a feed or feed pattern; possible configuration problem.') + self.log.error(str(args) + str(kwargs)) + self.leave() + for sub in self.startup_subs: - self.subscribe(**sub) + maybeDeferred(self.subscribe, **sub).addErrback(_subscribe_fail) # Now do the startup activities, only the first time we join if self.first_time_startup: diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index 2915099f..e48192b6 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -156,16 +156,32 @@ def crossbar_test(args, site_config): '%s._crossbar_check_' % site.hub.data['address_root'], url=site.hub.data['wamp_http'], realm=site.hub.data['wamp_realm']) try: + # This is not expected to succeed, but the different errors + # tell us different things... client.call(client.agent_addr) except client_http.ControlClientError as ccex: suberr = ccex.args[0][4] - if suberr == 'client_http.error.connection_error': - ok, msg = False, 'http bridge not found at {wamp_http}.' - elif suberr == 'wamp.error.no_such_procedure': - ok, msg = True, 'http bridge reached at {wamp_http}.' + if suberr == 'wamp.error.no_such_procedure': + # This indicates we got through to the bridge, it liked + # the realm and our address_root. Return True. + ok, msg = True, 'http bridge reached at {wamp_http}.'.format(**site.hub.data) + elif suberr == 'client_http.error.connection_error': + # Possibly crossbar server is not running. + ok, msg = False, 'http bridge not found at {wamp_http}.'.format(**site.hub.data) + elif suberr == 'wamp.error.not_authorized': + # This is likely a configuration issue, print a banner and reraise it. + print('***** crossbar / ocs configuration mismatch *****') + print('The exception here indicates a likely configuration mismatch issue') + print('with the crossbar server and OCS. Specifically, the WAMP realm and') + print('the OCS address_root must match between the site config file') + print('and the crossbar config.') + print('*****\n') + raise ccex else: - ok, msg = True, 'unexpected bridge connection problem; raised %s' % (str(ccex)) - return ok, msg.format(**site.hub.data) + # I think this case hasn't been encountered much. + print('***** unhandled error case *****\n') + raise ccex + return ok, msg def get_status(args, site_config, restrict_hosts=None): @@ -399,9 +415,11 @@ def generate_crossbar_config(cm, site_config): print(line, end='') print('\n') print('To adopt the new config, remove %s and re-run me.' % cb_filename) + print() else: open(cb_filename, 'w').write(config_text) print('Wrote %s' % cb_filename) + print() class CrossbarManager: @@ -789,6 +807,15 @@ def update(self): 'running, but should start if you run "ocs-local-support start".')) self.analysis = solutions + def fail_on_missing_crossbar_config(self): + """Check if crossbar is managed by this config. If not, print + error message and exit(1).""" + if not self.crossbar['manage']: + print('Error! Crossbar config file not set.\n\n' + 'To start crossbar or to generate a config file, the site ' + 'config file must have a "crossbar" entry; see docs.\n') + sys.exit(1) + def main(args=None): args, site_config = get_args_and_site_config(args) @@ -953,8 +980,10 @@ def eligible(subsys): print('Trouble!') for text in fatals: print(_term_format(text, ' ', 4)) + sys.exit(1) if any([soln == 'crossbar' for soln, text in supports.analysis]): + supports.fail_on_missing_crossbar_config() print('Trying to start crossbar...') supports.crossbar['ctrl'].action('start', foreground=args.foreground) supports.update() # refresh .analysis @@ -995,5 +1024,6 @@ def eligible(subsys): print('No running crossbar detected, system is already "down".') elif action == 'generate_crossbar_config': + supports.fail_on_missing_crossbar_config() cm = supports.crossbar['ctrl'] generate_crossbar_config(cm, site_config) diff --git a/ocs/site_config.py b/ocs/site_config.py index 0315c207..f25fa375 100644 --- a/ocs/site_config.py +++ b/ocs/site_config.py @@ -187,12 +187,8 @@ def from_dict(cls, data, parent=None): ``address_root`` (required): The base address to be used by all OCS Agents. This is normally something simple like - ``observatory`` or ``detlab.system1``. (Command line - override: ``--address-root``.) - - ``registry_address`` (optional): The address of the OCS Registry - Agent. See :ref:`registry`. (Command line override: - ``--registry-address``.) + ``observatory`` or ``detlab``. (Command line override: + ``--address-root``.) """ self = cls() @@ -385,7 +381,7 @@ def add_arguments(parser=None): group.add_argument('--instance-id', help="""Look in the SCF for Agent-instance specific configuration options, and use those to launch the Agent.""") group.add_argument('--address-root', help="""Override the site default address root.""") - group.add_argument('--registry-address', help="""Override the site default registry address.""") + group.add_argument('--registry-address', help="""Deprecated.""") group.add_argument('--log-dir', help="""Set the logging directory.""") group.add_argument('--working-dir', help="""Propagate the working directory.""") return parser @@ -466,9 +462,6 @@ def get_config(args, agent_class=None): if args.site_realm is not None: site_config.hub.data['wamp_realm'] = args.site_realm - if args.registry_address is not None: - site_config.hub.data['registry_address'] = args.registry_address - # Identify our agent-instance. instance_config = None if no_dev_match: @@ -515,8 +508,6 @@ def add_site_attributes(args, site, host=None): args.site_realm = site.hub.data['wamp_realm'] if args.address_root is None: args.address_root = site.hub.data['address_root'] - if args.registry_address is None: - args.registry_address = site.hub.data.get('registry_address') if (args.log_dir is None) and (host is not None): args.log_dir = host.log_dir diff --git a/tests/agents/test_registry_agent.py b/tests/agents/test_registry_agent.py index 20a15edb..6eb0339f 100644 --- a/tests/agents/test_registry_agent.py +++ b/tests/agents/test_registry_agent.py @@ -4,8 +4,10 @@ from agents.util import create_session, create_agent_fixture from ocs.agents.registry.agent import RegisteredAgent, Registry, make_parser +from ocs import site_config parser = make_parser() +site_config.add_arguments(parser) args = parser.parse_args(['--wait-time', '0.1']) agent = create_agent_fixture(Registry, agent_kwargs=dict(args=args)) diff --git a/tests/default.yaml b/tests/default.yaml index 4207c592..449ac234 100644 --- a/tests/default.yaml +++ b/tests/default.yaml @@ -5,7 +5,6 @@ hub: wamp_http: http://127.0.0.1:18001/call wamp_realm: test_realm address_root: observatory - registry_address: observatory.registry hosts: diff --git a/tests/test_site_config.py b/tests/test_site_config.py index 0100374e..a2ed359d 100644 --- a/tests/test_site_config.py +++ b/tests/test_site_config.py @@ -28,8 +28,7 @@ def test_none_client_type_w_wamp_http_site(self): mock_site = MagicMock() mock_site.hub.data = {'wamp_http': 'http://127.0.0.1:8001', 'wamp_realm': 'test_realm', - 'address_root': 'observatory', - 'registry_address': 'observatory.registry'} + 'address_root': 'observatory'} get_control_client('test', site=mock_site, client_type=None) @@ -40,8 +39,7 @@ def test_none_client_type_wo_wamp_http_site(self): """ mock_site = MagicMock() mock_site.hub.data = {'wamp_realm': 'test_realm', - 'address_root': 'observatory', - 'registry_address': 'observatory.registry'} + 'address_root': 'observatory'} with pytest.raises(ValueError): get_control_client('test', site=mock_site, client_type=None) From 5edbde16e074a78fa969299d0c0fec96aa2b1ef0 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 22 May 2023 16:34:23 -0400 Subject: [PATCH 11/23] Cast tuple param before type check Fixes #274. --- ocs/ocs_agent.py | 3 +++ tests/test_ocs_agent.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index fc06a6e8..8238a4f1 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -1389,6 +1389,9 @@ def get(self, key, default=ParamError(''), check=None, cast=None, type=None, # Free cast from int to float. if type is float and isinstance(value, int): value = float(value) + # Fix type after json conversion + if type is tuple and isinstance(value, list) and cast in [tuple, None]: + value = tuple(value) if not isinstance(value, type): raise ParamError(f"Param '{key}'={value} is not of required type ({type})") if choices is not None: diff --git a/tests/test_ocs_agent.py b/tests/test_ocs_agent.py index e6c1a83f..be213ce0 100644 --- a/tests/test_ocs_agent.py +++ b/tests/test_ocs_agent.py @@ -503,12 +503,14 @@ def test_params_get(): 'float_param': 1e8, 'numerical_string_param': '145.12', 'none_param': None, + 'tuple_param': [20., 120.], # gets cast to list by json conversion }) # Basic successes params.get('int_param', type=int) params.get('string_param', type=str) params.get('float_param', type=float) + params.get('tuple_param', type=tuple) # Tricky successes params.get('int_param', type=float) From e7d331f712ea51d0eb2537b9460c0559697c56e4 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 24 May 2023 10:55:56 -0400 Subject: [PATCH 12/23] Support direct start calls with params to param decorated Tasks/Processes (#333) * Log exceptions caught within start() * Create failing test for start task with None params * Handle ParamHandler getting None params * Simplify None params handling * Fix log formatting syntax --- ocs/ocs_agent.py | 4 ++++ tests/test_ocs_agent.py | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 8238a4f1..1efc9db1 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -694,8 +694,10 @@ def start(self, op_name, params=None): handler = ParamHandler(params) params = handler.batch(op.launcher._ocs_prescreen) except ParamError as err: + self.log.error("Caught ParamError during start call: {err}", err=err) return (ocs.ERROR, err.msg, {}) except Exception as err: + self.log.error("Caught Exception during start call: {err}", err=err) return (ocs.ERROR, f'CRASH: during param pre-processing: {str(err)}', {}) # Mark as started. @@ -1302,6 +1304,8 @@ def my_task(self, session, params): """ def __init__(self, params): + if params is None: + params = {} self._params = params self._checked = set() diff --git a/tests/test_ocs_agent.py b/tests/test_ocs_agent.py index be213ce0..53cc3d21 100644 --- a/tests/test_ocs_agent.py +++ b/tests/test_ocs_agent.py @@ -43,6 +43,12 @@ def tfunc_raise(session, a): return tfunc(session, a) +@param('test', default=1) +def tfunc_param_dec(session, a): + """tfunc but decorated with @ocs_agent.param.""" + return True, 'Task completed successfully' + + @pytest.fixture def mock_agent(): """Test fixture to setup a mocked OCSAgent. @@ -261,6 +267,19 @@ def test_start_unregistered_task(mock_agent): assert res[2] == {} +def test_start_task_none_params(mock_agent): + """Test passing params=None to task decorated with @param that has set + defaults. + + See issue: https://github.com/simonsobs/ocs/issues/251 + + """ + mock_agent.register_task('test_task', tfunc_param_dec) + res = mock_agent.start('test_task', params=None) + print(res) + assert res[0] == ocs.OK + + # Wait @pytest_twisted.inlineCallbacks def test_wait(mock_agent): From 003260ae91c76ff7172272b675c1f919ac9370e9 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Wed, 9 Aug 2023 12:16:48 -0400 Subject: [PATCH 13/23] Replace pytest-docker-compose with pytest-docker plugin --- requirements/testing.txt | 2 +- .../test_aggregator_agent_integration.py | 3 +-- .../integration/test_crossbar_integration.py | 7 ++++--- .../test_fake_data_agent_integration.py | 3 +-- .../test_host_manager_agent_integration.py | 4 +--- .../test_influxdb_publisher_integration.py | 3 +-- .../test_registry_agent_integration.py | 3 +-- tests/integration/util.py | 20 ++++++++++++------- 8 files changed, 23 insertions(+), 22 deletions(-) diff --git a/requirements/testing.txt b/requirements/testing.txt index 4aa6e848..62b3147f 100644 --- a/requirements/testing.txt +++ b/requirements/testing.txt @@ -1,6 +1,6 @@ pytest pytest-cov docker -pytest-docker-compose +pytest-docker pytest-twisted so3g diff --git a/tests/integration/test_aggregator_agent_integration.py b/tests/integration/test_aggregator_agent_integration.py index 02dcb9bb..b0044943 100644 --- a/tests/integration/test_aggregator_agent_integration.py +++ b/tests/integration/test_aggregator_agent_integration.py @@ -11,8 +11,7 @@ from integration.util import ( create_crossbar_fixture ) - -pytest_plugins = ("docker_compose") +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( diff --git a/tests/integration/test_crossbar_integration.py b/tests/integration/test_crossbar_integration.py index 7f2f044a..382c0cbe 100644 --- a/tests/integration/test_crossbar_integration.py +++ b/tests/integration/test_crossbar_integration.py @@ -8,9 +8,7 @@ from so3g import hk from integration.util import create_crossbar_fixture, restart_crossbar - -pytest_plugins = ("docker_compose",) - +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() @@ -177,3 +175,6 @@ def test_proper_agent_shutdown_on_lost_transport(wait_for_crossbar): fake_data_container = client.containers.get('ocs-tests-fake-data-agent') assert fake_data_container.status == "exited" + + # Restart crossbar, else docker plugin loses track of it + crossbar_container.start() diff --git a/tests/integration/test_fake_data_agent_integration.py b/tests/integration/test_fake_data_agent_integration.py index 70aec7c4..aab999ac 100644 --- a/tests/integration/test_fake_data_agent_integration.py +++ b/tests/integration/test_fake_data_agent_integration.py @@ -11,11 +11,10 @@ from integration.util import ( create_crossbar_fixture, ) +from integration.util import docker_compose_file # noqa: F401 AGENT_PATH = '../ocs/agents/fake_data/agent.py' -pytest_plugins = ("docker_compose") - wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( AGENT_PATH, 'fake_data') diff --git a/tests/integration/test_host_manager_agent_integration.py b/tests/integration/test_host_manager_agent_integration.py index c500decd..983181e2 100644 --- a/tests/integration/test_host_manager_agent_integration.py +++ b/tests/integration/test_host_manager_agent_integration.py @@ -14,9 +14,7 @@ from integration.util import ( create_crossbar_fixture ) - -pytest_plugins = ("docker_compose") - +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture('../ocs/agents/host_manager/agent.py', diff --git a/tests/integration/test_influxdb_publisher_integration.py b/tests/integration/test_influxdb_publisher_integration.py index faeeee05..a10af963 100644 --- a/tests/integration/test_influxdb_publisher_integration.py +++ b/tests/integration/test_influxdb_publisher_integration.py @@ -11,8 +11,7 @@ from integration.util import ( create_crossbar_fixture ) - -pytest_plugins = ("docker_compose") +from integration.util import docker_compose_file # noqa: F401 wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( diff --git a/tests/integration/test_registry_agent_integration.py b/tests/integration/test_registry_agent_integration.py index ccb9f2d4..ccab10ef 100644 --- a/tests/integration/test_registry_agent_integration.py +++ b/tests/integration/test_registry_agent_integration.py @@ -9,11 +9,10 @@ from integration.util import ( create_crossbar_fixture ) +from integration.util import docker_compose_file # noqa: F401 from ocs.base import OpCode -pytest_plugins = ("docker_compose") - wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture('../ocs/agents/registry/agent.py', 'registry', diff --git a/tests/integration/util.py b/tests/integration/util.py index 07b853ce..fc847158 100644 --- a/tests/integration/util.py +++ b/tests/integration/util.py @@ -1,3 +1,5 @@ +import os + import pytest import docker @@ -7,13 +9,10 @@ def create_crossbar_fixture(): # Fixture to wait for crossbar server to be available. # Speeds up tests a bit to have this session scoped - # If tests start interfering with one another this should be changed to - # "function" scoped and session_scoped_container_getter should be changed - # to function_scoped_container_getter - # @pytest.fixture(scope="session") - # def wait_for_crossbar(session_scoped_container_getter): - @pytest.fixture(scope="function") - def wait_for_crossbar(function_scoped_container_getter): + # If tests interfere with eachother change to "function" scoped + # @pytest.fixture(scope="function") + @pytest.fixture(scope="session") + def wait_for_crossbar(docker_services): """Wait for the crossbar server from docker-compose to become responsive. @@ -29,3 +28,10 @@ def restart_crossbar(): crossbar_container = client.containers.get('ocs-tests-crossbar') crossbar_container.restart() check_crossbar_connection() + + +# Overrides the default location that pytest-docker looks for the compose file. +# https://pypi.org/project/pytest-docker/ +@pytest.fixture(scope="session") +def docker_compose_file(pytestconfig): + return os.path.join(str(pytestconfig.rootdir), "docker-compose.yml") From 4222f53b6d1e44099a052e3f9a0f938b2af920f5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 31 Jul 2023 20:06:13 +0000 Subject: [PATCH 14/23] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/pycqa/flake8: 6.0.0 → 6.1.0](https://github.com/pycqa/flake8/compare/6.0.0...6.1.0) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a5bb52b3..78ac22e2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,6 +11,6 @@ repos: hooks: - id: autopep8 - repo: https://github.com/pycqa/flake8 - rev: 6.0.0 + rev: 6.1.0 hooks: - id: flake8 From cf39d9c7e302a24c1146b68d401a3f14d2c4832f Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Fri, 18 Aug 2023 15:55:40 -0400 Subject: [PATCH 15/23] systemd service file: add RestartSec=10s HostManager can crash and restart really quickly, so this rate-limiting is necessary on some systems to prevent systemd from giving up on the service. --- docs/user/centralized_management.rst | 1 + ocs/ocs_systemd.py | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/user/centralized_management.rst b/docs/user/centralized_management.rst index 97bf6a15..ad74fe66 100644 --- a/docs/user/centralized_management.rst +++ b/docs/user/centralized_management.rst @@ -476,6 +476,7 @@ should look something like this:: ExecStart=/home/ocs/git/ocs-site-configs/my-lab/launcher-hm-server5.sh User=ocs Restart=always + RestartSec=10s [Install] WantedBy=multi-user.target diff --git a/ocs/ocs_systemd.py b/ocs/ocs_systemd.py index dbfc58c9..b22b50d3 100755 --- a/ocs/ocs_systemd.py +++ b/ocs/ocs_systemd.py @@ -28,6 +28,7 @@ ExecStart={cmd} User={service_user} Restart=always +RestartSec=10s {environment_lines} [Install] From 28bc23d84f98a8c26bf3d1f1e00c902d3f577f24 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 28 Aug 2023 19:46:47 +0000 Subject: [PATCH 16/23] [pre-commit.ci] pre-commit autoupdate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/pre-commit/mirrors-autopep8: v2.0.2 → v2.0.4](https://github.com/pre-commit/mirrors-autopep8/compare/v2.0.2...v2.0.4) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 78ac22e2..686b85f1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-autopep8 - rev: v2.0.2 + rev: v2.0.4 hooks: - id: autopep8 - repo: https://github.com/pycqa/flake8 From e1cea6810e4f536f2053abb591fab1ffb6d00ca1 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 Feb 2023 17:20:55 -0500 Subject: [PATCH 17/23] A possible implementation for managing non-python agents --- ocs/agents/host_manager/agent.py | 121 +++++++++++++++++++---------- ocs/agents/host_manager/drivers.py | 11 ++- ocs/ocsbow.py | 38 ++++++--- ocs/site_config.py | 43 ++++++++-- 4 files changed, 152 insertions(+), 61 deletions(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index dc5bb650..2b037cf8 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -41,8 +41,8 @@ def _get_local_instances(self): Returns: agent_dict (dict): Maps instance-id to a dict describing the agent config. The config is as contained in - HostConfig.instances but where 'instance-id', - 'agent-class', and 'manage' are all guaranteed populated + HostConfig.instances but where 'instance-id', 'agent-class' + or 'agent-exe', and 'manage' are all guaranteed populated (and manage is one of ['yes', 'no', 'docker']). warnings: A list of strings, each of which corresponds to some problem found in the config. @@ -54,6 +54,10 @@ def _get_local_instances(self): self.site_config_file = site.source_file self.host_name = hc.name self.working_dir = hc.working_dir + self.wamp_url = site.hub.data['wamp_server'] + self.wamp_realm = site.hub.data['wamp_realm'] + self.address_root = site.hub.data['address_root'] + self.log_dir = hc.log_dir # Scan for agent scripts in (deprecated) script registry for p in hc.agent_paths: @@ -72,7 +76,7 @@ def _get_local_instances(self): continue # Make sure 'manage' is set, and valid. default_manage = 'no' \ - if inst['agent-class'] == 'HostManager' else 'yes' + if 'agent-class' in inst and inst['agent-class'] == 'HostManager' else 'yes' inst['manage'] = inst.get('manage', default_manage) if inst['manage'] not in ['yes', 'no', 'docker']: warnings.append( @@ -80,6 +84,17 @@ def _get_local_instances(self): f'for instance_id={inst["instance-id"]}.') continue instances[inst['instance-id']] = inst + # Make sure either 'agent-class' or 'agent-exe' is set, but not both + if 'agent-class' not in inst and 'agent-exe' not in inst: + warnings.append( + f'Configuration problem, neither agent-class nor agent-exe is set' + f'for instance_id={inst["instance-id"]}.') + continue + if inst.get('agent-class') is not None and inst.get('agent-exe') is not None: + warnings.append( + f'Configuration problem, both agent-class and agent-exe are set' + f'for instance_id={inst["instance-id"]}.') + continue returnValue((instances, warnings)) yield @@ -169,13 +184,13 @@ def retire(db_key): docker_nonagents = list(self.docker_services.keys()) for iid, hinst in agent_dict.items(): - srv = self.docker_service_prefix + iid - cls = hinst['agent-class'] - mgmt = 'host' - if srv in docker_nonagents: - docker_nonagents.remove(srv) - cls += '[d]' - mgmt = 'docker' + record = dict(hinst) + record['srv'] = self.docker_service_prefix + iid + record['mgmt'] = 'host' + if record['srv'] in docker_nonagents: + docker_nonagents.remove(record['srv']) + record['agent-class'] += '[d]' + record['mgmt'] = 'docker' if hinst['manage'] != 'docker': session.add_message( f'The agent config for instance-id=' @@ -185,7 +200,7 @@ def retire(db_key): retire(iid) continue else: - srv = None + record['srv'] = None if hinst['manage'] == 'no': continue if hinst['manage'] == 'docker': @@ -195,36 +210,48 @@ def retire(db_key): f'in config. Dropping.') retire(iid) continue - new_managed.append((iid, iid, srv, cls, mgmt)) + record['db_key'] = iid + new_managed.append(record) for srv in docker_nonagents: - new_managed.append((srv, srv, srv, '[docker]', 'docker')) + new_managed.append({'db_key': srv, 'instance-id': srv, 'srv': srv, + 'agent-class': '[docker]', 'mgmt': 'docker'}) # Compare new managed items to stuff already in our database. - for db_key, iid, srv, cls, mgmt in new_managed: + for record in new_managed: + db_key = record['db_key'] instance = self.database.get(db_key, None) if instance is not None and \ instance['management'] == 'retired': instance = None if instance is not None: # So instance is some kind of actively managed container. - if (instance['agent_class'] != cls - or instance['management'] != mgmt): + if (instance['agent_class'] != record.get('agent-class') + or instance['agent_exe'] != record.get('agent-exe') + or instance['management'] != record.get('mgmt')): session.add_message( f'Managed agent "{db_key}" changed agent_class ' - f'({instance["agent_class"]} -> {cls}) or management ' - f'({instance["management"]} -> {mgmt}) and is being ' + f'({instance["agent_class"]} -> {record.get("agent-class")}) or agent_exe ' + f'({instance["agent_exe"]} -> {record.get("agent-exe")}) or management ' + f'({instance["management"]} -> {record.get("mgmt")}) and is being ' f'reset!') instance = None if instance is None: + if record.get("agent-class") is not None: + full_name=(f'{record["agent-class"]}:{record["db_key"]}') + else: + full_name=(f'{record["agent-exe"]}:{record["db_key"]}') instance = hm_utils.ManagedInstance.init( - management=mgmt, - instance_id=iid, - agent_class=cls, - full_name=(f'{cls}:{db_key}'), + management=record.get("mgmt"), + instance_id=record.get("instance-id"), + agent_class=record.get("agent-class"), + agent_exe=record.get("agent-exe"), + full_name=full_name, + agent_arguments=record.get("arguments"), + write_logs=record.get("write-logs", False) ) - if mgmt == 'docker': - instance['agent_script'] = srv + if record['mgmt'] == 'docker': + instance['agent_script'] = record['srv'] instance['prot'] = self._get_docker_helper(instance) if instance['prot'].status[0] is None: session.add_message( @@ -237,15 +264,17 @@ def retire(db_key): else: # Check for the agent class in the plugin system; # then check the (deprecated) agent script registry. - if cls in agent_plugins: - session.add_message(f'Found plugin for "{cls}"') + if record.get("agent-exe") is not None: + pass + elif record.get("agent-class") in agent_plugins: + session.add_message(f'Found plugin for "{record.get("agent-class")}"') instance['agent_script'] = '__plugin__' - elif cls in site_config.agent_script_reg: - session.add_message(f'Found launcher script for "{cls}"') - instance['agent_script'] = site_config.agent_script_reg[cls] + elif record.get("agent-class") in site_config.agent_script_reg: + session.add_message(f'Found launcher script for "{record.get("agent-class")}"') + instance['agent_script'] = site_config.agent_script_reg[record.get("agent-class")] else: session.add_message(f'No plugin (nor launcher script) ' - f'found for agent_class "{cls}"!') + f'found for agent_class "{record.get("agent-class")}"!') session.add_message(f'Tracking {instance["full_name"]}') self.database[db_key] = instance yield warnings @@ -303,18 +332,28 @@ def _launch_instance(self, instance): prot = self._get_docker_helper(instance) else: iid = instance['instance_id'] - pyth = sys.executable - script = instance['agent_script'] - if script == '__plugin__': - cmd = [pyth, '-m', 'ocs.agent_cli'] + if instance.get('agent_script') is not None: + pyth = sys.executable + script = instance['agent_script'] + if script == '__plugin__': + cmd = [pyth, '-m', 'ocs.agent_cli'] + else: + cmd = [pyth, script] + cmd.extend([ + '--instance-id', iid, + '--site-file', self.site_config_file, + '--site-host', self.host_name, + '--working-dir', self.working_dir]) + elif instance.get('agent_exe') is not None: + cmd = [instance['agent_exe'], '--instance-id', self.address_root+'.'+iid, + '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] + if "agent_arguments" in instance: + cmd.extend(instance["agent_arguments"]) + if instance['write_logs']: + log_file_path = self.log_dir+'/'+self.address_root+'.'+iid+".log" else: - cmd = [pyth, script] - cmd.extend([ - '--instance-id', iid, - '--site-file', self.site_config_file, - '--site-host', self.host_name, - '--working-dir', self.working_dir]) - prot = hm_utils.AgentProcessHelper(iid, cmd) + log_file_path = None + prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path) prot.up() instance['prot'] = prot diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index bc9d0ad1..1ec7d697 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -17,6 +17,8 @@ class ManagedInstance(dict): - 'agent_class' (str): The agent class. This will have special value 'docker' if the instance corresponds to a docker-compose service that has not been matched to a site_config entry. + - 'agent_exe' (str): The agent executable. This setting is mutually + exclusive with 'agent_class'. - 'instance_id' (str): The agent instance-id, or the docker service name if the instance is an unmatched docker-compose service. @@ -214,7 +216,7 @@ def stability_factor(times, window=120): class AgentProcessHelper(protocol.ProcessProtocol): - def __init__(self, instance_id, cmd): + def __init__(self, instance_id, cmd, log_file=None): super().__init__() self.status = None, None self.killed = False @@ -222,6 +224,7 @@ def __init__(self, instance_id, cmd): self.cmd = cmd self.lines = {'stderr': [], 'stdout': []} + self.log_file = open(log_file,"ab") if log_file is not None else None def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ) @@ -231,6 +234,8 @@ def down(self): # race condition, but it could be worse. if self.status[0] is None: reactor.callFromThread(self.transport.signalProcess, 'INT') + if self.log_file is not None: + self.log_file.flush() # See https://twistedmatrix.com/documents/current/core/howto/process.html # @@ -262,11 +267,15 @@ def processExited(self, status): self.status = status, time.time() def outReceived(self, data): + if self.log_file is not None: + self.log_file.write(data) self.lines['stdout'].extend(data.decode('utf8').split('\n')) if len(self.lines['stdout']) > 100: self.lines['stdout'] = self.lines['stdout'][-100:] def errReceived(self, data): + if self.log_file is not None: + self.log_file.write(data) self.lines['stderr'].extend(data.decode('utf8').split('\n')) if len(self.lines['stderr']) > 100: self.lines['stderr'] = self.lines['stderr'][-100:] diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index e48192b6..9824bb3d 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -216,7 +216,7 @@ def get_status(args, site_config, restrict_hosts=None): if inst.get('manage') is None: inst['manage'] = 'yes' inst.update(blank_state) - if inst['agent-class'] == HOSTMANAGER_CLASS: + if 'agent-class' in inst and inst['agent-class'] == HOSTMANAGER_CLASS: sort_order = 0 hms.append(HostManagerManager( args, site_config, instance_id=inst['instance-id'])) @@ -251,7 +251,7 @@ def get_status(args, site_config, restrict_hosts=None): for cinfo in info['child_states']: this_id = cinfo['instance_id'] # Watch for [d] suffix, and steal it. - if cinfo['agent_class'].endswith('[d]'): + if cinfo.get('agent_class') is not None and cinfo['agent_class'].endswith('[d]'): agent_info[this_id]['agent-class'] = cinfo['agent_class'] if this_id in found: output['warnings'].append( @@ -298,23 +298,37 @@ def print_status(args, site_config): for hstat in status['hosts']: header = {'instance-id': '[instance-id]', - 'agent-class': '[agent-class]', + 'agent': '[agent]', 'current': '[state]', 'target': '[target]'} - field_widths = {'instance-id': 30, - 'agent-class': 20} + field_widths = {'instance-id': 20, + 'agent-class': 20, + 'agent-exe': 20} if len(hstat['agent_info']): - field_widths = {k: max(v0, max([len(v[k]) + field_widths = {k: max(v0, max([len(v[k]) if k in v and v[k] is not None else 0 for v in hstat['agent_info'].values()])) for k, v0 in field_widths.items()} - fmt = ' {instance-id:%i} {agent-class:%i} {current:>10} {target:>10}' % ( - field_widths['instance-id'], field_widths['agent-class']) + field_widths['agent'] = max(field_widths['agent-class'], field_widths['agent-exe']) + del field_widths['agent-class'] + del field_widths['agent-exe'] + fmt = ' {instance-id:%i} {agent:%i} {current:>10} {target:>10}' % ( + field_widths['instance-id'], field_widths['agent']) header = fmt.format(**header) print('-' * len(header)) print(f'Host: {hstat["host_name"]}\n') print(header) - for v in hstat['agent_info'].values(): - print(fmt.format(**v)) + class FindAgentType(dict): + def __missing__(self, key): + if key == 'agent': + if 'agent-class' in self: + return self['agent-class'] + if 'agent-exe' in self: + return self['agent-exe'] + raise KeyError + hdata = sorted([FindAgentType(v) for v in hstat['agent_info'].values()], + key=lambda i: i['instance-id']) + for v in hdata: + print(fmt.format_map(v)) print() if len(status['warnings']): @@ -841,9 +855,9 @@ def main(args=None): status = get_status(args, site_config) for host_data in status['hosts']: active_hms = [v for v in host_data['agent_info'].values() - if v['agent-class'] == HOSTMANAGER_CLASS] + if v.get('agent-class') == HOSTMANAGER_CLASS] others = [v for v in host_data['agent_info'].values() - if v['agent-class'] != HOSTMANAGER_CLASS] + if v.get('agent-class') != HOSTMANAGER_CLASS] for inst in active_hms: if args.all or inst['instance-id'] in args.instance: hms.append(inst) diff --git a/ocs/site_config.py b/ocs/site_config.py index f25fa375..ab53354a 100644 --- a/ocs/site_config.py +++ b/ocs/site_config.py @@ -225,6 +225,12 @@ def from_dict(cls, data, parent=None): the Agent instance, as a way of finding the right InstanceConfig. + ``agent-exe`` (str, optional) + The Agent executable. This + may be matched against the agent_exe name provided by + the Agent instance, as a way of finding the right + InstanceConfig. + ``arguments`` (list, optional): A list of arguments that should be passed back to the agent. Historically the arguments have been grouped into @@ -246,6 +252,10 @@ def from_dict(cls, data, parent=None): self.manage = self.data.get('manage') if self.manage is None: self.manage = "yes" + if 'agent-class' not in self.data: + self.data['agent-class'] = None + if 'agent-exe' not in self.data: + self.data['agent-exe'] = None return self @@ -387,7 +397,7 @@ def add_arguments(parser=None): return parser -def get_config(args, agent_class=None): +def get_config(args, agent_class=None, agent_exe=None): """ Args: args: The argument object returned by @@ -396,13 +406,15 @@ def get_config(args, agent_class=None): in this object. agent_class: Class name passed in to match against the list of device classes in each host's list. + agent_exe: Executable path or name passed in to match against the list of + agent executables in each host's list. Special values accepted for agent_class: - '*control*': do not insist on matching host or device. - '*host*': do not insist on matching device (but do match host). Returns: - The tuple (site_config, host_config, device_config). + The tuple (site_config, host_config, instance_config). """ if args.site == 'none': return (None, None, None) @@ -473,16 +485,26 @@ def get_config(args, agent_class=None): instance_config = InstanceConfig.from_dict( dev, parent=host_config) break - else: + elif agent_class is not None: # Use the agent_class to figure it out... for dev in host_config.instances: - if dev['agent-class'] == agent_class: + if 'agent-class' in dev and dev['agent-class'] == agent_class: if instance_config is not None: raise RuntimeError( f"Multiple matches found for agent-class={agent_class}" " ... you probably need to pass --instance-id=") instance_config = InstanceConfig.from_dict( dev, parent=host_config) + elif agent_exe is not None: + # Use the agent_exe to figure it out... + for dev in host_config.instances: + if 'agent-exe' in dev and dev['agent-exe'] == agent_exe: + if instance_config is not None: + raise RuntimeError( + f"Multiple matches found for agent-exe={agent_exe}" + " ... you probably need to pass --instance-id=") + instance_config = InstanceConfig.from_dict( + dev, parent=host_config) if instance_config is None and not no_dev_match: raise RuntimeError("Could not find matching device description.") return collections.namedtuple('SiteConfig', ['site', 'host', 'instance'])(site_config, host_config, instance_config) @@ -648,7 +670,7 @@ def scan_for_agents(do_registration=True): return items -def parse_args(agent_class=None, parser=None, args=None): +def parse_args(agent_class=None, agent_exe=None, parser=None, args=None): """ Function to parse site-config and agent arguments. This function takes site, host, and instance arguments into account by making sure the instance @@ -663,6 +685,11 @@ def parse_args(agent_class=None, parser=None, args=None): may be matched against the agent_class name provided by the Agent instance, as a way of finding the right InstanceConfig. + agent_exe (str, optional): + Name of the Agent executable. This + may be matched against the agent_exe name provided by + the Agent instance, as a way of finding the right + InstanceConfig. parser (argparse.ArgumentParser, optional): Argument parser containing agent-specific arguments. If None, an empty parser will be created. @@ -696,7 +723,7 @@ def parse_args(agent_class=None, parser=None, args=None): pre_args, _ = pre_parser.parse_known_args(args=args) - site, host, instance = get_config(pre_args, agent_class=agent_class) + site, host, instance = get_config(pre_args, agent_class=agent_class, agent_exe=agent_exe) if instance is not None: # When the user omits instance_id, it can still be matched, @@ -734,7 +761,9 @@ def flatten(container): add_site_attributes(args, site, host=host) # Add agent_class attribute. - if not hasattr(args, 'agent_class'): + if not hasattr(args, 'agent_class') and agent_class is not None: setattr(args, 'agent_class', agent_class) + if not hasattr(args, 'agent_exe') and agent_exe is not None: + setattr(args, 'agent_exe', agent_exe) return args From 80bb45e56e54f60ebd56688659f25f3caf0cfebc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 24 Feb 2023 22:30:10 +0000 Subject: [PATCH 18/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ocs/agents/host_manager/agent.py | 8 ++++---- ocs/agents/host_manager/drivers.py | 2 +- ocs/ocsbow.py | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 2b037cf8..6dc888f1 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -238,9 +238,9 @@ def retire(db_key): instance = None if instance is None: if record.get("agent-class") is not None: - full_name=(f'{record["agent-class"]}:{record["db_key"]}') + full_name = (f'{record["agent-class"]}:{record["db_key"]}') else: - full_name=(f'{record["agent-exe"]}:{record["db_key"]}') + full_name = (f'{record["agent-exe"]}:{record["db_key"]}') instance = hm_utils.ManagedInstance.init( management=record.get("mgmt"), instance_id=record.get("instance-id"), @@ -345,12 +345,12 @@ def _launch_instance(self, instance): '--site-host', self.host_name, '--working-dir', self.working_dir]) elif instance.get('agent_exe') is not None: - cmd = [instance['agent_exe'], '--instance-id', self.address_root+'.'+iid, + cmd = [instance['agent_exe'], '--instance-id', self.address_root + '.' + iid, '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] if "agent_arguments" in instance: cmd.extend(instance["agent_arguments"]) if instance['write_logs']: - log_file_path = self.log_dir+'/'+self.address_root+'.'+iid+".log" + log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log" else: log_file_path = None prot = hm_utils.AgentProcessHelper(iid, cmd, log_file=log_file_path) diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index 1ec7d697..6c68b809 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -224,7 +224,7 @@ def __init__(self, instance_id, cmd, log_file=None): self.cmd = cmd self.lines = {'stderr': [], 'stdout': []} - self.log_file = open(log_file,"ab") if log_file is not None else None + self.log_file = open(log_file, "ab") if log_file is not None else None def up(self): reactor.spawnProcess(self, self.cmd[0], self.cmd[:], env=os.environ) diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index 9824bb3d..a3c82933 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -317,6 +317,7 @@ def print_status(args, site_config): print('-' * len(header)) print(f'Host: {hstat["host_name"]}\n') print(header) + class FindAgentType(dict): def __missing__(self, key): if key == 'agent': From 87b6dce6f3d0705be2c774f5994feed1ecc8eb44 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 24 Feb 2023 17:45:32 -0500 Subject: [PATCH 19/23] Fix reference to removed variable --- ocs/agents/host_manager/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 6dc888f1..009a8485 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -195,7 +195,7 @@ def retire(db_key): session.add_message( f'The agent config for instance-id=' f'{iid} was matched to docker service ' - f'{srv}, but config does not specify ' + f'{record["srv"]}, but config does not specify ' f'manage:docker! Dropping both.') retire(iid) continue From e11331b06779738ee5f1f5a8639d543b190aaa3f Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Thu, 14 Sep 2023 16:46:12 -0400 Subject: [PATCH 20/23] Call the combined address root and instance ID the address --- ocs/agents/host_manager/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 009a8485..fca1e7ee 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -345,7 +345,7 @@ def _launch_instance(self, instance): '--site-host', self.host_name, '--working-dir', self.working_dir]) elif instance.get('agent_exe') is not None: - cmd = [instance['agent_exe'], '--instance-id', self.address_root + '.' + iid, + cmd = [instance['agent_exe'], '--address', self.address_root + '.' + iid, '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] if "agent_arguments" in instance: cmd.extend(instance["agent_arguments"]) From 0f71bb5f4c49f1ff664394e95d1db636e4a2736c Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Thu, 14 Sep 2023 16:55:04 -0400 Subject: [PATCH 21/23] Turn off logging instead of crashing if log_dir is not set --- ocs/agents/host_manager/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index fca1e7ee..6f763d79 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -349,7 +349,7 @@ def _launch_instance(self, instance): '--wamp-url', self.wamp_url, '--wamp-realm', self.wamp_realm] if "agent_arguments" in instance: cmd.extend(instance["agent_arguments"]) - if instance['write_logs']: + if instance['write_logs'] and self.log_dir is not None: log_file_path = self.log_dir + '/' + self.address_root + '.' + iid + ".log" else: log_file_path = None From 4c7549fd3a94bacc52b9e2dd2295cc3392098255 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 15 Sep 2023 16:48:18 -0400 Subject: [PATCH 22/23] Flush log file later to avoid confusing log delays --- ocs/agents/host_manager/drivers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index 6c68b809..47470e08 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -234,8 +234,6 @@ def down(self): # race condition, but it could be worse. if self.status[0] is None: reactor.callFromThread(self.transport.signalProcess, 'INT') - if self.log_file is not None: - self.log_file.flush() # See https://twistedmatrix.com/documents/current/core/howto/process.html # @@ -265,6 +263,8 @@ def inConnectionLost(self): def processExited(self, status): # print('%s.status:' % self.instance_id, status) self.status = status, time.time() + if self.log_file is not None: + self.log_file.flush() def outReceived(self, data): if self.log_file is not None: From 9d313623824bdd2aa6e75f50092c89d5302ef545 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Fri, 15 Sep 2023 17:43:58 -0400 Subject: [PATCH 23/23] Log executable agent output by default --- ocs/agents/host_manager/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index 6f763d79..f5922e7e 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -248,7 +248,7 @@ def retire(db_key): agent_exe=record.get("agent-exe"), full_name=full_name, agent_arguments=record.get("arguments"), - write_logs=record.get("write-logs", False) + write_logs=record.get("write-logs", True) ) if record['mgmt'] == 'docker': instance['agent_script'] = record['srv']