First time here? We are a friendly community of Power Systems Engineers. Check out the FAQ!
1 | initial version |
def splitter(self, inputlist, chunk=100):
""" Breaks inputlist into a list of lists with size chunk."""
chunks = [inputlist[x:x + chunk] for x in range(0, len(inputlist), chunk)]
return chunks
def RunASCCstudy(self, options): asccresults = []
# create subsystem of areas in user input
self.createSubsystemOfAreas(options['Areas'])
# find all buses in study areas
study_buses = self.returnBusesInSubsystem(sid=0, in_service_only=False, bus_property='Number')
# make list of buses manageable size for PSS/E report window
self.log('Segmenting %d study buses.' % len(study_buses))
study_buses = self.splitter(study_buses, chunk=100)
# loop thru lists of buses, create subsystems, and perform ASCC on them
self.log('Created %d segments of size %d.\nPerforming ASCC portion of study.' % (
len(study_buses), len(study_buses[0])))
for i, segment_of_study_buses in enumerate(study_buses, 1):
self.createSubsystemOfBuses(segment_of_study_buses, sid=0)
self.log('Running ASCC for segment %d' % i)
self.__ASCC(i, self.report_directory, 0)
def __ASCC(self, out_file_identifier, out_file_path, subsystem, gen_scfiles=False):
"""
:param out_file_identifier: increments file names
:param out_file_path: gives folder to place reports in
:param subsystem: specifies subsystem id for psspy.ascc api
:param gen_scfiles: option flag for scfile output of psspy.ascc api
:return:
"""
ierr = self.psspy.report_output(2, os.path.join(out_file_path, '%d_ASCC_rawData.txt' % out_file_identifier))
if ierr: self.log('Error on ASCC segment %s ier %s' % (out_file_identifier, ierr), 'error')
# class fault analysis conditions
# ierr = self.psspy.flat_2([1, 1, 1, 2, 1, 2, 1, 3], [1, 1])
if gen_scfiles:
if not os.path.exists(os.path.join(out_file_path, 'scfiles')):
os.makedirs(os.path.join(out_file_path, 'scfiles'))
scfile_path = os.path.join(out_file_path, 'scfiles', '%d_scfile.sc' % out_file_identifier)
else:
scfile_path = ""
ierr = self.psspy.ascc_3(subsystem, 0, [1, 1, 0, 1, 0, 2, 0, 1, 0, 1, 1, 0, 0, 2, 2, 1, 1], [1],
'', '', scfile_path)
if ierr: self.log('Error on ASCC segment %s' % out_file_identifier, 'error')
2 | No.2 Revision |
Approximation of how I do this:
Running Short circuit and building lots of text files:
def splitter(self, inputlist, chunk=100):
""" Breaks inputlist into a list of lists with size chunk."""
chunks = [inputlist[x:x + chunk] for x in range(0, len(inputlist), chunk)]
return chunks
def RunASCC_study(self, options):
ascc_results = []
def RunASCCstudy(self, options): asccresults = []
Parsing text files:
def __compileASCCresults(self, folder):
"""
Parses a folder and build ASCC results from PSSE ASCC report txt files.
:param folder: Directory of containing psse reports.
:return: List of result objects.
"""
# get list of report filepaths
list_of_ascc_reports = []
for txtfile in os.listdir(folder):
if txtfile.endswith("ASCC_rawData.txt"):
list_of_ascc_reports.append(os.path.join(folder, txtfile))
# file in memory
data = []
for report in list_of_ascc_reports:
data_file = open(report, 'rU')
data.extend(data_file.readlines())
# parse file
list_of_result_objects = []
options = []
while data:
line = data.pop(0)
line = line.rstrip().rstrip('\r\n')
# build options
if line.upper().startswith('OPTIONS USED'):
# only need options once because its the same
if not options:
options.append(line)
line = data.pop(0).rstrip().rstrip('\r\n')
while line:
options.append(line.split('-', 1)[1])
line = data.pop(0).rstrip().rstrip('\r\n')
else:
pass
# build fault result objects
temp_list = []
if line.startswith('AT BUS'):
while line != "-----------------------------------------------------------------------------------------------------------------------------------------------":
temp_list.append(line)
line = data.pop(0).rstrip().rstrip('\r\n')
list_of_result_objects.append(self.ASCCfaultResult(temp_list, options, self.log))
self.log('Read in %d ASCC data files.' % len(list_of_ascc_reports))
return list_of_result_objects
class ASCCfaultResult():
""" Creates an object from a list of text lines parsed from an ASCC result file
"""
# todo need to update each space index to incorporate full line for rounding issues
def __init__(self, report_lines, options, log):
self.options = options
# default to non-lineout faults
self.lineout = False
self.line_outaged = None
self.contributions = {'number': [], 'name': [], 'kv': [], 'area': [], 'i_ary': [], 'ckt': []}
self.log = log
while report_lines:
line = report_lines.pop(0).strip()
# assign bus information and fault information
if line.startswith('AT BUS'):
space_index = [26, 85] # handles max length bus name overrun into kv value
line = ''.join(x + ' ' if index in space_index else x for index, x in enumerate(line))
# bus info
self.bus_number = re.findall(r'AT BUS (\d{6})', line)[0]
self.bus_name = re.findall(r'(?<=\[).+?(?=\])', line)[1].rsplit(' ', 1)[0].strip()
self.bus_kv = re.findall(r'(?<=\[).+?(?=\])', line)[1].rsplit(' ', 1)[1].strip()
self.area = re.findall(r'AREA (\d{1,3})', line)[0]
# not needed now but someone may want in the future for i*n level analysis
self.faulted_bus_number = re.findall(r'FAULTED BUS IS: (\d{6})', line)[0]
self.faulted_bus_name = re.findall(r'(?<=\[).+?(?=\])', line)[1][:11].strip()
self.faulted_bus_kv = re.findall(r'(?<=\[).+?(?=\])', line)[1][11:].strip()
self.fault_levels = re.findall(r'(\d{1,2}) LEVELS AWAY', line)
continue
# assign line out properties
elif line.startswith('***'):
self.lineout = 'lineout'
self.line_outaged = re.findall(r'(?<= LINE OUT: ).+?(?=\*\*\*)', line)[0]
continue
# find fault type
elif line.startswith('X--------------------'):
if 'THREE PHASE FAULT' in line:
self.fault_type = '3PH'
elif 'LINE TO GROUND (LG) FAULT' in line:
self.fault_type = 'SLG'
else:
self.log('ERROR: INVALID FAULT TYPE %s' % line)
continue
# lg faults list sequence currents and phase, this clears the seq so phase is read same code as 3ph
elif line.startswith('X--------- FROM ----------X'):
self.contributions = {'name': [], 'i_ary': [], 'ckt': []}
# find current totals
elif line.startswith('INITIAL SYM. S.C.') and self.fault_type == '3PH':
# 3 phase /I+/ AN(I+) /IA/ AN(IA) /IB/ AN(IB) /IC/ AN(IC)
# lg /I+/ AN(I+) /I-/ AN(I-) /I0/ AN(I0) /3I0/ AN(I0)
space_index = [63, 81, 99] # eliminates unrounded sc current values
line = ''.join(' ' if index in space_index else x for index, x in enumerate(line))
line = line.split('AMP')[1].split()
self.fault_totals = {'I+': line[0], 'AN(I+)': line[1],
'IA': line[2], 'AN(IA)': line[3],
'IB': line[4], 'AN(IB)': line[5],
'IC': line[6], 'AN(IC)': line[7]}
continue
elif line.startswith('INITIAL SYM. S.C.') and self.fault_type == 'SLG':
line = line.split('AMP')[1].split()
self.fault_totals = {'IA': line[0], 'AN(IA)': line[1]}
continue
# find contributions
elif line.startswith('(kV L-G'):
continue
elif line.startswith('THEVENIN'):
continue
else:
# 3 phase /I+/ AN(I+) /IA/ AN(IA) /IB/ AN(IB) /IC/ AN(IC)
# lg /I+/ AN(I+) /I-/ AN(I-) /I0/ AN(I0) /3I0/ AN(I0)
space_index = [28, 34, 38, 45, 54, 63, 72, 81, 90, 99, 108, 117, 126, 135]
line = [x.strip() for x in ''.join(x + ',' if index+2 in space_index else x for index, x in enumerate(line)).split(',')]
self.contributions['name'].append(line[0])
self.contributions['ckt'].append(line[2])
self.contributions['i_ary'].append([line[4], line[5]])
continue