First time here? We are a friendly community of Power Systems Engineers. Check out the FAQ!
1 | initial version |
Here is some code I use, hopefully you can leverage portions of it to save yourself some time.
def __ANSI(self, out_file_identifier, out_file_path, subsystem):
ierr, busarray = self.psspy.abusint(subsystem, 1, 'NUMBER')
if ierr: return ierr
busarray = busarray[0]
# skip rerun
if not os.path.isfile(os.path.join(out_file_path, '%d_ANSI_rawData_tot.txt' % out_file_identifier)):
ierr = self.psspy.report_output(2, os.path.join(out_file_path,
'%d_ANSI_rawData_tot.txt' % out_file_identifier))
if ierr: return ierr
vltary = [1.05 for x in busarray if x]
cptary = [0.033 for x in busarray if x]
# total current
ierr = self.psspy.ansi_2([0, 0, 0, 5, 0], [40, 80, 40, 80], len(busarray), busarray, vltary, cptary, '')
if ierr > 0:
self.log('ERROR PERFORMING ANSI %s - Error code %s' % (out_file_identifier, ierr), 'error')
ierr = self.psspy.report_output(2, os.path.join(out_file_path,
'%d_ANSI_rawData_sym.txt' % out_file_identifier))
if ierr: return ierr
vltary = [1.05 for x in busarray if x]
cptary = [0.033 for x in busarray if x]
# symmetric current
ierr = self.psspy.ansi_2([0, 0, 0, 5, 1], [40, 80, 40, 80], len(busarray), busarray, vltary, cptary, '')
if ierr > 0:
self.log('ERROR PERFORMING ANSI %s - Error code %s' % (out_file_identifier, ierr), 'error')
else:
self.log('skipping %d_ANSI_rawData_tot.txt' % out_file_identifier)
def RunANSI_study(self):
self.log('Beginning ANSI portion of study...')
# trim tp specific to those in study
TP_specific_areas = list(set(int(x.area) for x in self.ansi_param))
# create a default subsystem
self.createSubsystemOfAreas(self.areas, sid=0)
# find all buses in default subsystem
study_buses = self.returnBusesInSubsystem(sid=0, in_service_only=False, bus_property='Number')
# make list of buses manageable size for PSS/E report window
self.log(' Segmenting %d study buses.' % len(study_buses))
study_buses = self.splitter(study_buses, chunk=100)
# loop thru lists of buses, create subsystems, and perform ANSI on them
self.log(' Created %d segments of size %d.' % (len(study_buses), len(study_buses[0])))
for i, segment_of_study_buses in enumerate(study_buses, 1):
self.createSubsystemOfBuses(segment_of_study_buses, 0)
self.log(' Running default ANSI for segment %d' % i)
self.__ANSI(i, self.report_dir, 0)
self.log('ANSI study complete.')
def processANSIresults(self, workbook, folder):
"""
:param workbook: Workbook to add results.
:param folder: Location of psse reports.
:return: Workbook with results included.
"""
if not hasattr(self, 'psspy'):
#self.init_psse()
self.init_psse()
#self.PSSEService.init_psse()
self.openCase(self.model_path)
ansi_results_sym, ansi_results_tot, ansi_tpspec = self.__compileANSIresults(folder)
workbook = self.__writeANSIresults(workbook, ansi_results_sym, 'Default-ANSI-Symm Curr Basis')
workbook = self.__writeANSIresults(workbook, ansi_results_tot, 'Default-ANSI-Total Curr Basis')
if ansi_tpspec:
workbook = self.__writeANSIresults(workbook, ansi_tpspec, 'TP-Criteria ANSI')
return workbook
def __compileANSIresults(self, folder):
""" Parses a folder and build ANSI results from PSSE ANSI report txt files."""
# get list of report filepaths
list_of_ansi_reports = glob.glob(os.path.join(folder, '*ANSI_rawData*.txt'))
self.log('Read in %d ANSI report files.' % len(list_of_ansi_reports))
list_of_ansi_reports_sym = [x for x in list_of_ansi_reports if 'sym' in os.path.basename(x)]
list_of_ansi_reports_tot = [x for x in list_of_ansi_reports if 'tot' in os.path.basename(x)]
list_of_ansi_reports_tp = [x for x in list_of_ansi_reports if 'TPSPEC' in os.path.basename(x)]
# file in memory
data = []
for report in list_of_ansi_reports_sym:
data_file = open(report, 'rU')
data.extend(data_file.readlines())
# parse file
list_of_fault_objects_sym = []
options = []
while data:
line = data.pop(0)
line = line.rstrip().rstrip('\r\n')
# build options
if line.upper().startswith('OPTIONS USED'):
# reread options because tpspefic changes ansi
while not line.startswith(' BRKER'):
options.append(line)
line = data.pop(0).rstrip().rstrip('\r\n')
# build fault result objects
temp_list = []
if line[:5].isdigit():
temp_list.append(line)
list_of_fault_objects_sym.append(ANSIfaultResult(temp_list, options, self.log))
for item in list_of_fault_objects_sym:
ierr, area = self.psspy.busint(ibus=int(item.bus_properties['bus_number']), string='AREA')
item.area = area
# file in memory
data = []
for report in list_of_ansi_reports_tot:
data_file = open(report, 'rU')
data.extend(data_file.readlines())
list_of_fault_objects_tot = []
options = []
while data:
line = data.pop(0)
line = line.rstrip().rstrip('\r\n')
# build options
if line.upper().startswith('OPTIONS USED'):
# reread options because tpspefic changes ansi
while not line.startswith(' BRKER'):
options.append(line)
line = data.pop(0).rstrip().rstrip('\r\n')
# build fault result objects
temp_list = []
if line[:5].isdigit():
temp_list.append(line)
list_of_fault_objects_tot.append(ANSIfaultResult(temp_list, options, self.log))
for item in list_of_fault_objects_tot:
ierr, area = self.psspy.busint(ibus=int(item.bus_properties['bus_number']), string='AREA')
item.area = area
# file in memory
data = []
for report in list_of_ansi_reports_tp:
data_file = open(report, 'rU')
data.extend(data_file.readlines())
# parse file
list_of_fault_objects_tpspec = []
options = []
while data:
line = data.pop(0)
line = line.rstrip().rstrip('\r\n')
# build options
if line.upper().startswith('OPTIONS USED'):
# reread options because tpspefic changes ansi
while not line.startswith(' BRKER'):
options.append(line)
line = data.pop(0).rstrip().rstrip('\r\n')
# build fault result objects
temp_list = []
if line[:5].isdigit():
temp_list.append(line)
list_of_fault_objects_tpspec.append(ANSIfaultResult(temp_list, options, self.log))
for item in list_of_fault_objects_tpspec:
ierr, area = self.psspy.busint(ibus=int(item.bus_properties['bus_number']), string='AREA')
item.area = area
return list_of_fault_objects_sym, list_of_fault_objects_tot, list_of_fault_objects_tpspec
class ANSIfaultResult:
""" Creates an object from a list of text lines parsed from an ANSI result file
"""
def __init__(self, report_lines, options, log):
self.log = log
self.options = options
self.parameters = {'xfr cor': 'Do not apply to zero sequence',
'decrement': 'Account for DC decrement only',
'pos_branch_div': '', 'pos_mach_div': '', 'zero_branch_div': '', 'zero_mach_div': ''}
self.bus_properties = {'bus_number': '', 'bus_name': '', 'bus_kv': '', 'maxvolt': '', 'time': ''}
self.three_phase = {'symmetric_mva': '', 'symmetric_kA': '', 'asymmetric_kA': '',
'x/r ratio': '', 'multiplying factor': ''}
self.asymmetric = {'symmetric_kA': '', 'asymmetric_kA': '', 'x/r ratio': '', 'multiplying factor': '',
'llg symmetric phase': '', 'llg 3 x seq': '', 'pos thev r': '', 'pos thev x': '',
'neg thev r': '', 'neg thev x': ''}
for line in self.options:
if 'FOR BRANCHES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN POSITIVE SEQUENCE NETWORK' in line:
self.parameters['pos_branch_div'] = line.split()[-1]
elif 'FOR MACHINES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN POSITIVE SEQUENCE NETWORK' in line:
self.parameters['pos_mach_div'] = line.split()[-1]
elif 'FOR BRANCHES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN ZERO SEQUENCE NETWORK' in line:
self.parameters['zero_branch_div'] = line.split()[-1]
elif 'FOR MACHINES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN ZERO SEQUENCE NETWORK' in line:
self.parameters['zero_mach_div'] = line.split()[-1]
if '' in self.parameters:
self.log('ANSI SCALING FACTORS NOT FOUND IN OPTIONS: %s' '\n'.join(x for x in options), 'error')
while report_lines:
line = report_lines.pop(0).strip('\r').strip('\n').strip()
# assign bus information and fault information
self.bus_properties['bus_number'], self.bus_properties['bus_name'] = line[:6], line[7:18]
line = [x.strip() for x in line[19:].split(',')]
self.bus_properties['bus_kv'] = line[0]
self.bus_properties['maxvolt'] = line[1]
self.bus_properties['time'] = line[2]
# assign symmetric fault info
self.three_phase['symmetric_mva'] = line[3]
self.three_phase['symmetric_kA'] = line[4]
self.three_phase['asymmetric_kA'] = line[5]
self.three_phase['x/r ratio'] = line[6]
self.three_phase['multiplying factor'] = line[7]
# assign asymmetric fault info
self.asymmetric['symmetric_kA'] = line[8]
self.asymmetric['asymmetric_kA'] = line[9]
self.asymmetric['x/r ratio'] = line[10]
self.asymmetric['multiplying factor'] = line[11]
self.asymmetric['llg symmetric phase'] = line[12]
self.asymmetric['llg 3 x seq'] = line[13]
self.asymmetric['pos thev r'] = line[14]
self.asymmetric['pos thev x'] = line[15]
self.asymmetric['neg thev r'] = line[16]
self.asymmetric['neg thev x'] = line[17]
self.area = ''