diff --git a/socs/agents/galil_axis/agent.py b/socs/agents/galil_axis/agent.py index 1b146dd43..2cd4ef10c 100644 --- a/socs/agents/galil_axis/agent.py +++ b/socs/agents/galil_axis/agent.py @@ -1085,6 +1085,62 @@ def get_limitswitch_polarity(self, session, params): return True, f'Limit switch polarity is {status}' + @ocs_agent.param('axis', type=str) + def get_forward_limitswitch(self, session, params): + """get_forward_limitswitch(axis) + + **Task** - Returns forward limit switch state for a given axis. 1 means not triggered, 0 means triggered. + + Parameters: + axis (str): Axis to query (e.g. 'A') + """ + axis = params['axis'] + + with self.lock.acquire_timeout(timeout=5, job='get_forward_limitswitch') as acquired: + if not acquired: + self.log.warn(f"Could not start Task because {self.lock.job} is already running.") + return False, "Could not acquire lock." + + state, human_state = self.stage.get_forward_limitswitch(axis) + + # store data + session.data = { + 'axis': axis, + 'raw_state': state, + 'human_state': human_state, + 'timestamp': time.time(), + } + + return True, f"Axis {axis} forward limit: {human_state} (raw={state})" + + @ocs_agent.param('axis', type=str) + def get_reverse_limitswitch(self, sesion, params): + """get_reverse_limitswitch(axis) + + **Task** - Returns reverse limit switch state for a given axis. 1 means not triggered, 0 means triggered. + + Parameters: + axis (str): Axis to query (e.g. 'A') + """ + axis = params['axis'] + + with self.lock.acquire_timeout(timeout=5, job='get_reverse_limitswitch') as acquired: + if not acquired: + self.log.warn(f"Could not start Task because {self.lock.job} is already running.") + return False, "Could not acquire lock." + + state, human_state = self.stage.get_reverse_limitswitch(axis) + + # store data + session.data = { + 'axis': axis, + 'raw_state': state, + 'human_state': human_state, + 'timestamp': time.time(), + } + + return True, f"Axis {axis} reverse limit: {human_state} (raw={state})" + @ocs_agent.param('axis', type=str) def stop_axis_motion(self, session, params): """stop_axis_motion(axis) @@ -1374,6 +1430,8 @@ def main(args=None): agent.register_task('get_limitswitch_mode', galilaxis_agent.get_limitswitch_mode) agent.register_task('set_limitswitch_polarity', galilaxis_agent.set_limitswitch_polarity) agent.register_task('get_limitswitch_polarity', galilaxis_agent.get_limitswitch_polarity) + agent.register_task('get_forward_limitswitch', galilaxis_agent.get_forward_limitswitch) + agent.register_task('get_reverse_limitswitch', galilaxis_agent.get_reverse_limitswitch) agent.register_task('set_gearing', galilaxis_agent.set_gearing) agent.register_task('set_gearing_ratio', galilaxis_agent.set_gearing_ratio) agent.register_task('get_gearing_ratio', galilaxis_agent.get_gearing_ratio) diff --git a/socs/agents/galil_axis/drivers.py b/socs/agents/galil_axis/drivers.py index 49ac7260d..c3c26b3d0 100644 --- a/socs/agents/galil_axis/drivers.py +++ b/socs/agents/galil_axis/drivers.py @@ -475,6 +475,38 @@ def get_limitswitch_polarity(self): return state, status + def get_forward_limitswitch(self, axis): + """ + Return forward limit switch state for a given axis. 1 means not triggered, 0 means triggered (no motion allowed). + """ + resp = self.galil_command("MG _LF", axis=axis, expect_response=True) + try: + state = int(float(resp)) + except Exception: + print(f"Unexpected response from MG _LF{axis}: {resp}") + return None, "unknown" + + # --- Interpret raw + human_state = "not triggered" if state == 1 else "triggered" if state == 0 else f"unknown ({state})" + + return state, human_state + + def get_reverse_limitswitch(self, axis): + """ + Return reverse limit switch state for a given axis. 1 means not triggered, 0 means triggered. + """ + resp = self.galil_command("MG _LR", axis=axis, expect_response=True) + try: + state = int(float(resp)) + except Exception: + print(f"Unexpected response from MG _LR{axis}: {resp}") + return None, "unknown" + + # --- Interpret raw + human_state = "not triggered" if state == 1 else "triggered" if state == 0 else f"unknown ({state})" + + return state, human_state + def stop_motion(self, axis): """ Stop motion.