Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Tweaked example testAll script with style (PEP8) fixes and bug fixes.

...

View file
nametestAll_KPFmockup_v2

Code Block
breakoutModewide
languagepy
#! /kroot/rel/default/bin/kpython3
import argparse
import os
import ktl
import subprocess
import re


description = """
testAll -- check KPF systems health

purpose:
      Test all of the KPF systems to determine whether they are
      working properly. The script checks:
      - Check dispatchers health
      - Check aux rack temperature
      - Check 

...

vaccum

      

...

Systems to check:
      - green instect dispatcher
      - vaccum pump dispatcher
      - green instect coldhead temperature
      - green ln2 dewar weight
      - vaccum pump bearing temperature
      

...

- vaccum pump frequency
      

...

- 

...

vaccum chamber pressure

      

...

Keywords to check:
      

...

- kpfgreen.d1
      - kpfvac.d2
      - kpfgreen.currtemp
      - kpflab.greenweight
      - kpfvac.pump_temp
      - kpfvac.pump_freq
      - kpfvac.vch_hivac

      GUIs to check:
      - FIU status GUI
      - Tip Tilt system GUI
      - KPF Status GUI

      

...

With no input argument, all systems are checked.

Exit values:
       0 = normal completion
      <0 = warnings but no errors
      >0 = error

example:
      1) To check instrument status:
         

...

testAll

"""
epilog="""
history:

Jan-27-2022     SY   

...

Adapted from DEIMOS testAll for KPF

...

Feb-01-2022  JoshW   Style (i.e. PEP8) tweaks and bugfixes.

"""

######################
## Parse Arguments  ##
######################
parser = argparse.ArgumentParser(description=description, epilog=epilog,
  

...

                               formatter_class=argparse.RawTextHelpFormatter,
 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

                     

...

 

...

  

...

)

...

parser.

...

add_

...

argument('-s', '--systems', nargs='+', default='all',
     

...

 

...

 

...

 

...

 

...

 

...

     

...

 

...

 

...

  

...

 help=

...

'Systems which should be tested',
                    

...

choices=['computers', '

...

keywords'

...

, '

...

guis'],

...

 

...

  

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

         

...

)

...

args 

...

= parser.parse_args()


######################
## Define constants ##
######################
SYSTEMS = args.systems
if args.systems == 'all':
    systems = ['computers', 'keywords', 'guis']
else:
    systems = args.systems

STATUS = {'

...

ok': 

...

'OK', 'warning': 'WARNING!', 'error': 'ERROR!'}

COMPUTERS = {'kpfserver': 'Instrument target host', \
             '

...

kpf': 

...

'

...

Linux 

...

virtual host'}

...

GUIS = {'

...

FIU 

...

Status': ('

...

FIU Status Display', 

...

'

...

kpfserver'

...

), \
        'Tip Tilt Status'

...

: ('

...

Tip Tilt Status Display', 

...

'

...

kpfserver'

...

), \
    

...

 

...

 

...

 

...

 

...

'KPF 

...

Status': ('

...

KPF Status Display', '

...

kpfserver'), \
        

...

'tklogger': ('

...

Events logger'

...

,

...

 

...

'

...

kpfserver'

...

), \
        

...

'pig_main': ('

...

Program Identification GUI', 'kpf')}

KEYWORDS = {'green instect dispatcher': ('

...

kpfgreen', ['

...

D1']

...

), \
            '

...

vaccum 

...

pump 

...

dispatcher': ('kpfvac', 

...

['D2'])}

...

HEALTH 

...

= 

...

{'green 

...

instect coldhead temperature': ('kpfgreen', 'CURRTEMP',['-145'],'warning'), \
          

...

'green 

...

ln2 

...

dewar 

...

weight': ('kpflab', 'GREENWEIGHT',['52.0 54.0'],'warning'), \
    

...

 

...

 

...

    'vaccum pump 

...

bearing 

...

temperature': ('kpfvac', 'PUMP_TEMP',['55.0 60.0'],'warning'), \
       

...

 

...

 

...

 'vaccum pump frequency': ('kpfvac', 'PUMP_FREQ', ['990 1100'],'warning'), \
       

...

   'vaccum chamber 

...

pressure': ('kpfvac', 'VCH_HIVAC', ['3.0e-5'],'warning')}

#telescope wrap
ROTCCWLM = -188
ROTCWLM = 188

######################
## Define functions ##
######################
def check_systems(systems):
    # Verify host
    if os.environ['

...

HOST'] 

...

!= 'KPF':
        

...

raise Exception('This command must be run on vm-KPF')

    

...

# 

...

Check systems
    errors_total = 0
    warnings_total = 0 
    if 'computers' in systems:
        ecomp, wcomp = check_computers(COMPUTERS, STATUS)
       

...

 errors_total = 

...

errors_total + 

...

ecomp
        warnings_total = warnings_total + wcomp
    if 'guis' in systems:
        eapps, wapps = check_apps(GUIS, STATUS)
        errors_total = errors_total + eapps
        warnings_total = warnings_total + wapps

...


    if 'keywords' in systems:
        ekeyw, wkeyw = check_keywords(KEYWORDS, STATUS)
        errors_total = errors_total + ekeyw
        warnings_total = warnings_total + wkeyw

    

...

if (errors_total == 0 and warnings_total == 0):
        msg = 'All KPF systems and computers are healthy'
    

...

else:
  

...

 

...

     

...

msg = f'{errors_total:

...

d} errors and {warnings_total:

...

d} warnings were issued'
    

...

print('-'*len(msg))
    print(msg)
 

...

 

...

 

...

 print('-'*len(msg))

    if errors_total > 0:
        

...

return errors_total
    elif warnings_total > 

...

0:
        return 

...

-warnings_total
    else:
        

...

return 0

#-----------------#
# Check computers #
#-----------------#
def check_computers(computer_dict, status):
    """
   

...

 Check communications with KPF computers.
    """
   

...

 n_errors

...

 

...

= 0

...


    n_warnings = 0
 

...

   print('Checking KPF computers:')
    

...

for 

...

computer 

...

in sorted(computer_dict.keys()):
        

...

computer_

...

status = 

...

status['error']
    

...

    cmd = ['ping', '-c', 

...

'1', computer]
       

...

 process = subprocess.run(cmd, capture_output=True)
        if process.returncode == 0:
    

...

     

...

 

...

 

...

 computer_status = status['ok']
    

...

    else:
     

...

 

...

 

...

     n_

...

errors = 

...

n_errors + 1
   

...

 

...

 

...

   print(f'  

...

Checking {computer.ljust(24,"."):24}{computer_status:8} ({computer_dict[computer]})')
    

...

return n_errors, n_warnings

...


...


#------------#
# Check GUIs #
#------------#
def check_apps(app_list, status):
    """
    

...

Check 

...

user 

...

applications.

...


...

    """
    n_errors 

...

= 

...

0
    n_warnings = 0
    print('Checking KPF GUIs:')

    # List all processes on kpfserver
    cmd = 

...

['kpf', 'status']
    

...

process = 

...

subprocess.

...

run(cmd, capture_output=True)
    KPF_status_output = process.stdout.decode('utf-8').split('\n')

...


    all_apps_check = []
    for line in KPF_status_output:
        if len(line) > 0:
            if (line.split()[0][0:4] == 'nirc'):
                all_apps_check.append(line)

    # List all processes on vm-kpf
    

...

cmd = 

...

['ps', '-ef'

...

]
    process = 

...

subprocess.

...

run(cmd, capture_output=True)
    all_ps_check = process.stdout.decode('utf-8').split('\n')

    for app in sorted(app_list):
        

...

computer = app_list[app][1]
        

...

app_status = status['warning']
        

...

if computer == 'kpfserver':
            

...

for process in all_apps_check:
                app_in_process = re.search(app, process)
                if app_in_process != None:
                    app_status = status['ok']

...


        else:
            

...

for process in all_ps_check:
                app_in_process = re.search(app, process)
                if app_in_process != None:
                    app_status = status['ok']
        

...

if app_status == status['warning']:
            app_status = status['warning'] + ' (ignore if observers not currently using KPF)'
            n_warnings = n_warnings + 1
        

...

print

...

(f'  Checking {app.ljust(24,"."):24}{app_status:8}')
    

...

return n_errors, n_warnings


#----------------#
# Check keywords #
#----------------#

...

def check_keywords(keyword_dict, status):
    """
    Check keyword health
    """

...


    n_errors = 0
    n_warnings = 0

...


    print('Checking KPF keyword libraries:')
    for key in sorted(keyword_dict.keys()):
        if keyword_dict[key][1] == 'acs' or keyword_dict[key][1] == 'dcs':
            keyword_status = status['warning']
        else:
            keyword_status = status['error']
        ktl_error = False
        for keyword_check in keyword_dict[key][1]:
            try:
                keyword = ktl.cache(keyword_dict[key][0], keyword_check)
                keyword.monitor()
                keyword_status = status['ok']
            except Exception as e:
                ktl_error = True
                keyword_status = status['error']

        if keyword_status == status['warning']:
            

...

n_warnings +=

...

 1
        if keyword_status == status['error']:

...


            n_errors += 

...

1
        print

...

(f'  Checking {key.ljust(24,"."):24}{keyword_status:8}')
    

...

return n_errors, n_warnings


#----------------#
# Check health   #
#----------------#

...

def check_settings(settings_dict, status):
    """
    Check KPF health
    """

...


    

...

n_errors = 0
    n_warnings = 0

...


    print('Checking KPF health:')
    for key in sorted(settings_dict.keys()):
        setting_status = status[settings_dict[key][3]]
        ktl_error = False
        out_of_range = True
        try:
            keyword = ktl.cache(settings_dict[key][0], settings_dict[key][1])
            keyword.monitor()
            if len(settings_dict[key][2]) == 2 and \
               (float(keyword.ascii) >= float(settings_dict[key][2][0])) and \
               (float(keyword.ascii) <= float(settings_dict[key][2][1])):
                out_of_range = False
                setting_status = status['ok']
            if (len(settings_dict[key][2]) == 1) and (keyword.ascii == settings_dict[key][2][0]):
                out_of_range = False
                setting_status = status['ok']
        except:
            ktl_error = True
            setting_status = status[settings_dict[key][3]]

        if setting_status == status['warning']:

...


            n_warnings += 

...

1
        if setting_status == status['error']:
            

...

n_errors +=

...

 1

        if ktl_error:
            print

...

(f'  Checking {key.ljust(55,"."):55}{setting_status:8} not reachable')
        else: 
            if out_of_range:
                if len(settings_dict[key][2]) == 1:
                    print

...

(f'  Checking {key.ljust(55,"."):55}{setting_status:8} Current value {keyword.ascii} should be {settings_dict[key][2][0]}.')
                if len(settings_dict[key][2]) == 2:
                    if setting_status == status['warning']:
                        if (settings_dict[key][1] == 'ROTCCWLM') or (settings_dict[key][1] == 'ROTCWLM'):
                            print

...

(f'  Checking {key.ljust(55,"."):55}{setting_status:8} (ignore if KPF is not selected in TCS) Current value {float(keyword.ascii):.1f} outside good range [{settings_dict[key][2][0]:.1f}, {settings_dict[key][2][1]:.1f}].')
                        else:
                            print

...

(f'  Checking {key.ljust(55,"."):55}{setting_status:8} Current value {float(keyword.ascii):.1f} outside good range [{settings_dict[key][2][0]:.1f}, {settings_dict[key][2][1]:.1f}].')
                    else:
                        print

...

(f'  Checking {key.ljust(55,"."):55}{setting_status:8} Current value {float(keyword.ascii):.1f} outside good range [{settings_dict[key][2][0]:.1f}, {settings_dict[key][2][1]:.1f}].')
            else:
                print

...

(f'  Checking {key.ljust(55,"."):55}{setting_status:8}')
    

...

return n_errors, n_warnings


...

##################
## Main program ##
##################

...

if __name__ == '__main__':
    

...

check_systems(systems)

...

HIRES testAll is a perl script, scroll down to see the output texts.

...