Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version
    def splitter(self, inputlist, chunk=100):
    """ Breaks inputlist into a list of lists with size chunk."""
    chunks = [inputlist[x:x + chunk] for x in range(0, len(inputlist), chunk)]
    return chunks

def RunASCCstudy(self, options): asccresults = []

    # create subsystem of areas in user input
    self.createSubsystemOfAreas(options['Areas'])

    # find all buses in study areas
    study_buses = self.returnBusesInSubsystem(sid=0, in_service_only=False, bus_property='Number')

    # make list of buses manageable size for PSS/E report window
    self.log('Segmenting %d study buses.' % len(study_buses))
    study_buses = self.splitter(study_buses, chunk=100)

    # loop thru lists of buses, create subsystems, and perform ASCC on them
    self.log('Created %d segments of size %d.\nPerforming ASCC portion of study.' % (
             len(study_buses), len(study_buses[0])))
    for i, segment_of_study_buses in enumerate(study_buses, 1):
        self.createSubsystemOfBuses(segment_of_study_buses, sid=0)
        self.log('Running ASCC for segment %d' % i)
        self.__ASCC(i, self.report_directory, 0)

def __ASCC(self, out_file_identifier, out_file_path, subsystem, gen_scfiles=False):
    """
    :param out_file_identifier: increments file names
    :param out_file_path: gives folder to place reports in
    :param subsystem: specifies subsystem id for psspy.ascc api
    :param gen_scfiles: option flag for scfile output of psspy.ascc api
    :return:
    """
    ierr = self.psspy.report_output(2, os.path.join(out_file_path, '%d_ASCC_rawData.txt' % out_file_identifier))
    if ierr: self.log('Error on ASCC segment %s ier %s' % (out_file_identifier, ierr), 'error')
    # class fault analysis conditions
    # ierr = self.psspy.flat_2([1, 1, 1, 2, 1, 2, 1, 3], [1, 1])

    if gen_scfiles:
        if not os.path.exists(os.path.join(out_file_path, 'scfiles')):
            os.makedirs(os.path.join(out_file_path, 'scfiles'))
        scfile_path = os.path.join(out_file_path, 'scfiles', '%d_scfile.sc' % out_file_identifier)
    else:
        scfile_path = ""

    ierr = self.psspy.ascc_3(subsystem, 0, [1, 1, 0, 1, 0, 2, 0, 1, 0, 1, 1, 0, 0, 2, 2, 1, 1], [1],
                                 '', '', scfile_path)
    if ierr: self.log('Error on ASCC segment %s' % out_file_identifier, 'error')

Approximation of how I do this:

Running Short circuit and building lots of text files:

 def splitter(self, inputlist, chunk=100):
     """ Breaks inputlist into a list of lists with size chunk."""
     chunks = [inputlist[x:x + chunk] for x in range(0, len(inputlist), chunk)]
     return chunks
def RunASCC_study(self, options): ascc_results = []

def RunASCCstudy(self, options): asccresults = []

    # create subsystem of areas in user input
    self.createSubsystemOfAreas(options['Areas'])

    # find all buses in study areas
    study_buses = self.returnBusesInSubsystem(sid=0, in_service_only=False, bus_property='Number')

    # make list of buses manageable size for PSS/E report window
    self.log('Segmenting %d study buses.' % len(study_buses))
    study_buses = self.splitter(study_buses, chunk=100)

    # loop thru lists of buses, create subsystems, and perform ASCC on them
    self.log('Created %d segments of size %d.\nPerforming ASCC portion of study.' % (
             len(study_buses), len(study_buses[0])))
    for i, segment_of_study_buses in enumerate(study_buses, 1):
        self.createSubsystemOfBuses(segment_of_study_buses, sid=0)
        self.log('Running ASCC for segment %d' % i)
        self.__ASCC(i, self.report_directory, 0)

def __ASCC(self, out_file_identifier, out_file_path, subsystem, gen_scfiles=False):
    """
    :param out_file_identifier: increments file names
    :param out_file_path: gives folder to place reports in
    :param subsystem: specifies subsystem id for psspy.ascc api
    :param gen_scfiles: option flag for scfile output of psspy.ascc api
    :return:
    """
    ierr = self.psspy.report_output(2, os.path.join(out_file_path, '%d_ASCC_rawData.txt' % out_file_identifier))
    if ierr: self.log('Error on ASCC segment %s ier %s' % (out_file_identifier, ierr), 'error')
    # class fault analysis conditions
    # ierr = self.psspy.flat_2([1, 1, 1, 2, 1, 2, 1, 3], [1, 1])

    if gen_scfiles:
        if not os.path.exists(os.path.join(out_file_path, 'scfiles')):
            os.makedirs(os.path.join(out_file_path, 'scfiles'))
        scfile_path = os.path.join(out_file_path, 'scfiles', '%d_scfile.sc' % out_file_identifier)
    else:
        scfile_path = ""

    ierr = self.psspy.ascc_3(subsystem, 0, [1, 1, 0, 1, 0, 2, 0, 1, 0, 1, 1, 0, 0, 2, 2, 1, 1], [1],
                                 '', '', scfile_path)
    if ierr: self.log('Error on ASCC segment %s' % out_file_identifier, 'error')

Parsing text files:

def __compileASCCresults(self, folder):
    """
    Parses a folder and build ASCC results from PSSE ASCC report txt files.
    :param folder: Directory of containing psse reports.
    :return: List of result objects.
    """

    # get list of report filepaths
    list_of_ascc_reports = []
    for txtfile in os.listdir(folder):
        if txtfile.endswith("ASCC_rawData.txt"):
            list_of_ascc_reports.append(os.path.join(folder, txtfile))

    # file in memory
    data = []
    for report in list_of_ascc_reports:
        data_file = open(report, 'rU')
        data.extend(data_file.readlines())

    # parse file
    list_of_result_objects = []
    options = []
    while data:
        line = data.pop(0)
        line = line.rstrip().rstrip('\r\n')
        # build options
        if line.upper().startswith('OPTIONS USED'):
            # only need options once because its the same
            if not options:
                options.append(line)
                line = data.pop(0).rstrip().rstrip('\r\n')
                while line:
                    options.append(line.split('-', 1)[1])
                    line = data.pop(0).rstrip().rstrip('\r\n')
            else:
                pass
        # build fault result objects
        temp_list = []
        if line.startswith('AT BUS'):
            while line != "-----------------------------------------------------------------------------------------------------------------------------------------------":
                temp_list.append(line)
                line = data.pop(0).rstrip().rstrip('\r\n')
            list_of_result_objects.append(self.ASCCfaultResult(temp_list, options, self.log))
    self.log('Read in %d ASCC data files.' % len(list_of_ascc_reports))
    return list_of_result_objects

class ASCCfaultResult():
    """ Creates an object from a list of text lines parsed from an ASCC result file
       """
    # todo need to update each space index to incorporate full line for rounding issues
    def __init__(self, report_lines, options, log):
        self.options = options
        # default to non-lineout faults
        self.lineout = False
        self.line_outaged = None
        self.contributions = {'number': [], 'name': [], 'kv': [], 'area': [], 'i_ary': [], 'ckt': []}
        self.log = log

        while report_lines:
            line = report_lines.pop(0).strip()
            # assign bus information and fault information
            if line.startswith('AT BUS'):
                space_index = [26, 85]  # handles max length bus name overrun into kv value
                line = ''.join(x + ' ' if index in space_index else x for index, x in enumerate(line))
                # bus info
                self.bus_number = re.findall(r'AT BUS (\d{6})', line)[0]
                self.bus_name = re.findall(r'(?<=\[).+?(?=\])', line)[1].rsplit(' ', 1)[0].strip()
                self.bus_kv = re.findall(r'(?<=\[).+?(?=\])', line)[1].rsplit(' ', 1)[1].strip()
                self.area = re.findall(r'AREA  (\d{1,3})', line)[0]
                # not needed now but someone may want in the future for i*n level analysis
                self.faulted_bus_number = re.findall(r'FAULTED BUS IS: (\d{6})', line)[0]
                self.faulted_bus_name = re.findall(r'(?<=\[).+?(?=\])', line)[1][:11].strip()
                self.faulted_bus_kv = re.findall(r'(?<=\[).+?(?=\])', line)[1][11:].strip()
                self.fault_levels = re.findall(r'(\d{1,2}) LEVELS AWAY', line)
                continue
            # assign line out properties
            elif line.startswith('***'):
                self.lineout = 'lineout'
                self.line_outaged = re.findall(r'(?<= LINE OUT: ).+?(?=\*\*\*)', line)[0]
                continue
            # find fault type
            elif line.startswith('X--------------------'):
                if 'THREE PHASE FAULT' in line:
                    self.fault_type = '3PH'
                elif 'LINE TO GROUND (LG) FAULT' in line:
                    self.fault_type = 'SLG'
                else:
                    self.log('ERROR: INVALID FAULT TYPE %s' % line)
                continue
            # lg faults list sequence currents and phase, this clears the seq so phase is read same code as 3ph
            elif line.startswith('X--------- FROM ----------X'):
                self.contributions = {'name': [], 'i_ary': [], 'ckt': []}
            # find current totals
            elif line.startswith('INITIAL SYM. S.C.') and self.fault_type == '3PH':
                # 3 phase /I+/   AN(I+)     /IA/   AN(IA)     /IB/   AN(IB)     /IC/   AN(IC)
                # lg      /I+/   AN(I+)     /I-/   AN(I-)     /I0/   AN(I0)    /3I0/   AN(I0)
                space_index = [63, 81, 99]  # eliminates unrounded sc current values
                line = ''.join(' ' if index in space_index else x for index, x in enumerate(line))
                line = line.split('AMP')[1].split()
                self.fault_totals = {'I+': line[0], 'AN(I+)': line[1],
                                     'IA': line[2], 'AN(IA)': line[3],
                                     'IB': line[4], 'AN(IB)': line[5],
                                     'IC': line[6], 'AN(IC)': line[7]}
                continue
            elif line.startswith('INITIAL SYM. S.C.') and self.fault_type == 'SLG':
                line = line.split('AMP')[1].split()
                self.fault_totals = {'IA': line[0], 'AN(IA)': line[1]}
                continue
            # find contributions
            elif line.startswith('(kV L-G'):
                continue
            elif line.startswith('THEVENIN'):
                continue
            else:
                # 3 phase /I+/   AN(I+)     /IA/   AN(IA)     /IB/   AN(IB)     /IC/   AN(IC)
                # lg      /I+/   AN(I+)     /I-/   AN(I-)     /I0/   AN(I0)    /3I0/   AN(I0)
                space_index = [28, 34, 38, 45, 54, 63, 72, 81, 90, 99, 108, 117, 126, 135]
                line = [x.strip() for x in ''.join(x + ',' if index+2 in space_index else x for index, x in enumerate(line)).split(',')]
                self.contributions['name'].append(line[0])
                self.contributions['ckt'].append(line[2])
                self.contributions['i_ary'].append([line[4], line[5]])
                continue