...
View file | ||
---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
#! /kroot/rel/default/bin/kpython3 import argparse import os import ktl import subprocess import re description = """ testAll -- check KPF systems health purpose: Test all of the KPF systems to determine whether they are working properly. The script checks: - Check dispatchers health - Check aux rack temperature - Check |
...
vaccum |
...
Systems to check:
- green instect dispatcher
- vaccum pump dispatcher
- green instect coldhead temperature
- green ln2 dewar weight
- vaccum pump bearing temperature
|
...
- vaccum pump frequency
|
...
- |
...
vaccum chamber pressure |
...
Keywords to check:
|
...
- kpfgreen.d1
- kpfvac.d2
- kpfgreen.currtemp
- kpflab.greenweight
- kpfvac.pump_temp
- kpfvac.pump_freq
- kpfvac.vch_hivac
GUIs to check:
- FIU status GUI
- Tip Tilt system GUI
- KPF Status GUI
|
...
With no input argument, all systems are checked. Exit values: 0 = normal completion <0 = warnings but no errors >0 = error example: 1) To check instrument status: |
...
testAll
"""
epilog="""
history:
Jan-27-2022 SY |
...
Adapted from DEIMOS testAll for KPF
|
...
Feb-01-2022 JoshW Style (i.e. PEP8) tweaks and bugfixes. """ ###################### ## Parse Arguments ## ###################### parser = argparse.ArgumentParser(description=description, epilog=epilog, |
...
formatter_class=argparse.RawTextHelpFormatter, |
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
) |
...
parser. |
...
add_ |
...
argument('-s', '--systems', nargs='+', default='all', |
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
help= |
...
'Systems which should be tested', |
...
choices=['computers', ' |
...
keywords' |
...
, ' |
...
guis'], |
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
|
...
) |
...
args |
...
= parser.parse_args() ###################### ## Define constants ## ###################### SYSTEMS = args.systems if args.systems == 'all': systems = ['computers', 'keywords', 'guis'] else: systems = args.systems STATUS = {' |
...
ok': |
...
'OK', 'warning': 'WARNING!', 'error': 'ERROR!'} COMPUTERS = {'kpfserver': 'Instrument target host', \ ' |
...
kpf': |
...
' |
...
Linux |
...
virtual host'} |
...
GUIS = {' |
...
FIU |
...
Status': (' |
...
FIU Status Display', |
...
' |
...
kpfserver' |
...
), \ 'Tip Tilt Status' |
...
: (' |
...
Tip Tilt Status Display', |
...
' |
...
kpfserver' |
...
), \ |
...
|
...
|
...
|
...
|
...
'KPF |
...
Status': (' |
...
KPF Status Display', ' |
...
kpfserver'), \ |
...
'tklogger': (' |
...
Events logger' |
...
, |
...
|
...
' |
...
kpfserver' |
...
), \
|
...
'pig_main': (' |
...
Program Identification GUI', 'kpf')} KEYWORDS = {'green instect dispatcher': (' |
...
kpfgreen', [' |
...
D1'] |
...
), \ ' |
...
vaccum |
...
pump |
...
dispatcher': ('kpfvac', |
...
['D2'])} |
...
HEALTH |
...
= |
...
{'green |
...
instect coldhead temperature': ('kpfgreen', 'CURRTEMP',['-145'],'warning'), \ |
...
'green |
...
ln2 |
...
dewar |
...
weight': ('kpflab', 'GREENWEIGHT',['52.0 54.0'],'warning'), \ |
...
|
...
|
...
'vaccum pump |
...
bearing |
...
temperature': ('kpfvac', 'PUMP_TEMP',['55.0 60.0'],'warning'), \ |
...
|
...
|
...
'vaccum pump frequency': ('kpfvac', 'PUMP_FREQ', ['990 1100'],'warning'), \ |
...
'vaccum chamber |
...
pressure': ('kpfvac', 'VCH_HIVAC', ['3.0e-5'],'warning')} #telescope wrap ROTCCWLM = -188 ROTCWLM = 188 ###################### ## Define functions ## ###################### def check_systems(systems): # Verify host if os.environ[' |
...
HOST'] |
...
!= 'KPF': |
...
raise Exception('This command must be run on vm-KPF') |
...
# |
...
Check systems errors_total = 0 warnings_total = 0 if 'computers' in systems: ecomp, wcomp = check_computers(COMPUTERS, STATUS) |
...
errors_total = |
...
errors_total + |
...
ecomp warnings_total = warnings_total + wcomp if 'guis' in systems: eapps, wapps = check_apps(GUIS, STATUS) errors_total = errors_total + eapps warnings_total = warnings_total + wapps |
...
if 'keywords' in systems:
ekeyw, wkeyw = check_keywords(KEYWORDS, STATUS)
errors_total = errors_total + ekeyw
warnings_total = warnings_total + wkeyw
|
...
if (errors_total == 0 and warnings_total == 0): msg = 'All KPF systems and computers are healthy' |
...
else: |
...
|
...
|
...
msg = f'{errors_total: |
...
d} errors and {warnings_total: |
...
d} warnings were issued' |
...
print('-'*len(msg)) print(msg) |
...
|
...
|
...
print('-'*len(msg)) if errors_total > 0: |
...
return errors_total elif warnings_total > |
...
0: return |
...
-warnings_total else: |
...
return 0 #-----------------# # Check computers # #-----------------# def check_computers(computer_dict, status): """ |
...
Check communications with KPF computers. """ |
...
n_errors |
...
|
...
= 0 |
...
n_warnings = 0 |
...
print('Checking KPF computers:') |
...
for |
...
computer |
...
in sorted(computer_dict.keys()): |
...
computer_ |
...
status = |
...
status['error'] |
...
cmd = ['ping', '-c', |
...
'1', computer] |
...
process = subprocess.run(cmd, capture_output=True) if process.returncode == 0: |
...
|
...
|
...
|
...
computer_status = status['ok'] |
...
else: |
...
|
...
|
...
n_ |
...
errors = |
...
n_errors + 1 |
...
|
...
|
...
print(f' |
...
Checking {computer.ljust(24,"."):24}{computer_status:8} ({computer_dict[computer]})') |
...
return n_errors, n_warnings |
...
|
...
#------------# # Check GUIs # #------------# def check_apps(app_list, status): """ |
...
Check |
...
user |
...
applications. |
...
|
...
""" n_errors |
...
= |
...
0 n_warnings = 0 print('Checking KPF GUIs:') # List all processes on kpfserver cmd = |
...
['kpf', 'status'] |
...
process = |
...
subprocess. |
...
run(cmd, capture_output=True) KPF_status_output = process.stdout.decode('utf-8').split('\n') |
...
all_apps_check = []
for line in KPF_status_output:
if len(line) > 0:
if (line.split()[0][0:4] == 'nirc'):
all_apps_check.append(line)
# List all processes on vm-kpf
|
...
cmd = |
...
['ps', '-ef' |
...
] process = |
...
subprocess. |
...
run(cmd, capture_output=True) all_ps_check = process.stdout.decode('utf-8').split('\n') for app in sorted(app_list): |
...
computer = app_list[app][1]
|
...
app_status = status['warning']
|
...
if computer == 'kpfserver':
|
...
for process in all_apps_check:
app_in_process = re.search(app, process)
if app_in_process != None:
app_status = status['ok'] |
...
else:
|
...
for process in all_ps_check:
app_in_process = re.search(app, process)
if app_in_process != None:
app_status = status['ok']
|
...
if app_status == status['warning']:
app_status = status['warning'] + ' (ignore if observers not currently using KPF)'
n_warnings = n_warnings + 1
|
...
print |
...
(f' Checking {app.ljust(24,"."):24}{app_status:8}')
|
...
return n_errors, n_warnings #----------------# # Check keywords # #----------------# |
...
def check_keywords(keyword_dict, status):
"""
Check keyword health
""" |
...
n_errors = 0
n_warnings = 0 |
...
print('Checking KPF keyword libraries:') for key in sorted(keyword_dict.keys()): if keyword_dict[key][1] == 'acs' or keyword_dict[key][1] == 'dcs': keyword_status = status['warning'] else: keyword_status = status['error'] ktl_error = False for keyword_check in keyword_dict[key][1]: try: keyword = ktl.cache(keyword_dict[key][0], keyword_check) keyword.monitor() keyword_status = status['ok'] except Exception as e: ktl_error = True keyword_status = status['error'] if keyword_status == status['warning']: |
...
n_warnings += |
...
1
if keyword_status == status['error']: |
...
n_errors += |
...
1
print |
...
(f' Checking {key.ljust(24,"."):24}{keyword_status:8}')
|
...
return n_errors, n_warnings #----------------# # Check health # #----------------# |
...
def check_settings(settings_dict, status):
"""
Check KPF health
""" |
...
|
...
n_errors = 0
n_warnings = 0 |
...
print('Checking KPF health:')
for key in sorted(settings_dict.keys()):
setting_status = status[settings_dict[key][3]]
ktl_error = False
out_of_range = True
try:
keyword = ktl.cache(settings_dict[key][0], settings_dict[key][1])
keyword.monitor()
if len(settings_dict[key][2]) == 2 and \
(float(keyword.ascii) >= float(settings_dict[key][2][0])) and \
(float(keyword.ascii) <= float(settings_dict[key][2][1])):
out_of_range = False
setting_status = status['ok']
if (len(settings_dict[key][2]) == 1) and (keyword.ascii == settings_dict[key][2][0]):
out_of_range = False
setting_status = status['ok']
except:
ktl_error = True
setting_status = status[settings_dict[key][3]]
if setting_status == status['warning']: |
...
n_warnings += |
...
1
if setting_status == status['error']:
|
...
n_errors += |
...
1
if ktl_error:
print |
...
(f' Checking {key.ljust(55,"."):55}{setting_status:8} not reachable')
else:
if out_of_range:
if len(settings_dict[key][2]) == 1:
print |
...
(f' Checking {key.ljust(55,"."):55}{setting_status:8} Current value {keyword.ascii} should be {settings_dict[key][2][0]}.')
if len(settings_dict[key][2]) == 2:
if setting_status == status['warning']:
if (settings_dict[key][1] == 'ROTCCWLM') or (settings_dict[key][1] == 'ROTCWLM'):
print |
...
(f' Checking {key.ljust(55,"."):55}{setting_status:8} (ignore if KPF is not selected in TCS) Current value {float(keyword.ascii):.1f} outside good range [{settings_dict[key][2][0]:.1f}, {settings_dict[key][2][1]:.1f}].')
else:
print |
...
(f' Checking {key.ljust(55,"."):55}{setting_status:8} Current value {float(keyword.ascii):.1f} outside good range [{settings_dict[key][2][0]:.1f}, {settings_dict[key][2][1]:.1f}].')
else:
print |
...
(f' Checking {key.ljust(55,"."):55}{setting_status:8} Current value {float(keyword.ascii):.1f} outside good range [{settings_dict[key][2][0]:.1f}, {settings_dict[key][2][1]:.1f}].')
else:
print |
...
(f' Checking {key.ljust(55,"."):55}{setting_status:8}')
|
...
return n_errors, n_warnings
|
...
##################
## Main program ##
##################
|
...
if __name__ == '__main__':
|
...
check_systems(systems) |
...
HIRES testAll is a perl script, scroll down to see the output texts.
...