Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Here is some code I use, hopefully you can leverage portions of it to save yourself some time.

 def __ANSI(self, out_file_identifier, out_file_path, subsystem):
        ierr, busarray = self.psspy.abusint(subsystem, 1, 'NUMBER')
        if ierr: return ierr
        busarray = busarray[0]
        # skip rerun
        if not os.path.isfile(os.path.join(out_file_path, '%d_ANSI_rawData_tot.txt' % out_file_identifier)):
            ierr = self.psspy.report_output(2, os.path.join(out_file_path,
                                                            '%d_ANSI_rawData_tot.txt' % out_file_identifier))
            if ierr: return ierr
            vltary = [1.05 for x in busarray if x]
            cptary = [0.033 for x in busarray if x]
            # total current
            ierr = self.psspy.ansi_2([0, 0, 0, 5, 0], [40, 80, 40, 80], len(busarray), busarray, vltary, cptary, '')
            if ierr > 0:
                self.log('ERROR PERFORMING ANSI %s - Error code %s' % (out_file_identifier, ierr), 'error')
            ierr = self.psspy.report_output(2, os.path.join(out_file_path,
                                                            '%d_ANSI_rawData_sym.txt' % out_file_identifier))
            if ierr: return ierr
            vltary = [1.05 for x in busarray if x]
            cptary = [0.033 for x in busarray if x]
            # symmetric current
            ierr = self.psspy.ansi_2([0, 0, 0, 5, 1], [40, 80, 40, 80], len(busarray), busarray, vltary, cptary, '')
            if ierr > 0:
                self.log('ERROR PERFORMING ANSI %s - Error code %s' % (out_file_identifier, ierr), 'error')
        else:
            self.log('skipping %d_ANSI_rawData_tot.txt' % out_file_identifier)



 def RunANSI_study(self):
        self.log('Beginning ANSI portion of study...')
        # trim tp specific to those in study

        TP_specific_areas = list(set(int(x.area) for x in self.ansi_param))

        # create a default subsystem
        self.createSubsystemOfAreas(self.areas, sid=0)

        # find all buses in default subsystem
        study_buses = self.returnBusesInSubsystem(sid=0, in_service_only=False, bus_property='Number')

        # make list of buses manageable size for PSS/E report window
        self.log('  Segmenting %d study buses.' % len(study_buses))
        study_buses = self.splitter(study_buses, chunk=100)

        # loop thru lists of buses, create subsystems, and perform ANSI on them
        self.log('  Created %d segments of size %d.' % (len(study_buses), len(study_buses[0])))
        for i, segment_of_study_buses in enumerate(study_buses, 1):
            self.createSubsystemOfBuses(segment_of_study_buses, 0)
            self.log('    Running default ANSI for segment %d' % i)
            self.__ANSI(i, self.report_dir, 0)
        self.log('ANSI study complete.')

    def processANSIresults(self, workbook, folder):
        """

        :param workbook: Workbook to add results.
        :param folder: Location of psse reports.
        :return: Workbook with results included.
        """
        if not hasattr(self, 'psspy'):
            #self.init_psse()
            self.init_psse()
            #self.PSSEService.init_psse()
            self.openCase(self.model_path)

        ansi_results_sym, ansi_results_tot, ansi_tpspec = self.__compileANSIresults(folder)
        workbook = self.__writeANSIresults(workbook, ansi_results_sym, 'Default-ANSI-Symm Curr Basis')
        workbook = self.__writeANSIresults(workbook, ansi_results_tot, 'Default-ANSI-Total Curr Basis')
        if ansi_tpspec:
            workbook = self.__writeANSIresults(workbook, ansi_tpspec, 'TP-Criteria ANSI')
        return workbook

    def __compileANSIresults(self, folder):
        """ Parses a folder and build ANSI results from PSSE ANSI report txt files."""
        # get list of report filepaths
        list_of_ansi_reports = glob.glob(os.path.join(folder, '*ANSI_rawData*.txt'))
        self.log('Read in %d ANSI report files.' % len(list_of_ansi_reports))
        list_of_ansi_reports_sym = [x for x in list_of_ansi_reports if 'sym' in os.path.basename(x)]
        list_of_ansi_reports_tot = [x for x in list_of_ansi_reports if 'tot' in os.path.basename(x)]
        list_of_ansi_reports_tp = [x for x in list_of_ansi_reports if 'TPSPEC' in os.path.basename(x)]

        # file in memory
        data = []
        for report in list_of_ansi_reports_sym:
            data_file = open(report, 'rU')
            data.extend(data_file.readlines())
        # parse file
        list_of_fault_objects_sym = []
        options = []
        while data:
            line = data.pop(0)
            line = line.rstrip().rstrip('\r\n')
            # build options
            if line.upper().startswith('OPTIONS USED'):
                # reread options because tpspefic changes ansi
                while not line.startswith('                                BRKER'):
                    options.append(line)
                    line = data.pop(0).rstrip().rstrip('\r\n')
            # build fault result objects
            temp_list = []
            if line[:5].isdigit():
                temp_list.append(line)
                list_of_fault_objects_sym.append(ANSIfaultResult(temp_list, options, self.log))
        for item in list_of_fault_objects_sym:
            ierr, area = self.psspy.busint(ibus=int(item.bus_properties['bus_number']), string='AREA')
            item.area = area

        # file in memory
        data = []
        for report in list_of_ansi_reports_tot:
            data_file = open(report, 'rU')
            data.extend(data_file.readlines())
        list_of_fault_objects_tot = []
        options = []
        while data:
            line = data.pop(0)
            line = line.rstrip().rstrip('\r\n')
            # build options
            if line.upper().startswith('OPTIONS USED'):
                # reread options because tpspefic changes ansi
                while not line.startswith('                                BRKER'):
                    options.append(line)
                    line = data.pop(0).rstrip().rstrip('\r\n')
            # build fault result objects
            temp_list = []
            if line[:5].isdigit():
                temp_list.append(line)
                list_of_fault_objects_tot.append(ANSIfaultResult(temp_list, options, self.log))
        for item in list_of_fault_objects_tot:
            ierr, area = self.psspy.busint(ibus=int(item.bus_properties['bus_number']), string='AREA')
            item.area = area

        # file in memory
        data = []
        for report in list_of_ansi_reports_tp:
            data_file = open(report, 'rU')
            data.extend(data_file.readlines())
        # parse file
        list_of_fault_objects_tpspec = []
        options = []
        while data:
            line = data.pop(0)
            line = line.rstrip().rstrip('\r\n')
            # build options
            if line.upper().startswith('OPTIONS USED'):
                # reread options because tpspefic changes ansi
                while not line.startswith('                                BRKER'):
                    options.append(line)
                    line = data.pop(0).rstrip().rstrip('\r\n')
            # build fault result objects
            temp_list = []
            if line[:5].isdigit():
                temp_list.append(line)
                list_of_fault_objects_tpspec.append(ANSIfaultResult(temp_list, options, self.log))
        for item in list_of_fault_objects_tpspec:
            ierr, area = self.psspy.busint(ibus=int(item.bus_properties['bus_number']), string='AREA')
            item.area = area

        return list_of_fault_objects_sym, list_of_fault_objects_tot, list_of_fault_objects_tpspec

class ANSIfaultResult:
    """ Creates an object from a list of text lines parsed from an ANSI result file
               """

    def __init__(self, report_lines, options, log):
        self.log = log
        self.options = options
        self.parameters = {'xfr cor': 'Do not apply to zero sequence',
                           'decrement': 'Account for DC decrement only',
                           'pos_branch_div': '', 'pos_mach_div': '', 'zero_branch_div': '', 'zero_mach_div': ''}
        self.bus_properties = {'bus_number': '', 'bus_name': '', 'bus_kv': '', 'maxvolt': '', 'time': ''}
        self.three_phase = {'symmetric_mva': '', 'symmetric_kA': '', 'asymmetric_kA': '',
                            'x/r ratio': '', 'multiplying factor': ''}
        self.asymmetric = {'symmetric_kA': '', 'asymmetric_kA': '', 'x/r ratio': '', 'multiplying factor': '',
                           'llg symmetric phase': '', 'llg 3 x seq': '', 'pos thev r': '', 'pos thev x': '',
                           'neg thev r': '', 'neg thev x': ''}
        for line in self.options:
            if 'FOR BRANCHES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN POSITIVE SEQUENCE NETWORK' in line:
                self.parameters['pos_branch_div'] = line.split()[-1]
            elif 'FOR MACHINES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN POSITIVE SEQUENCE NETWORK' in line:
                self.parameters['pos_mach_div'] = line.split()[-1]
            elif 'FOR BRANCHES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN ZERO SEQUENCE NETWORK' in line:
                self.parameters['zero_branch_div'] = line.split()[-1]
            elif 'FOR MACHINES WITH R=0, RESISTANCE SCALING FACTOR (X/R RATIO) IN ZERO SEQUENCE NETWORK' in line:
                self.parameters['zero_mach_div'] = line.split()[-1]
        if '' in self.parameters:
            self.log('ANSI SCALING FACTORS NOT FOUND IN OPTIONS: %s' '\n'.join(x for x in options), 'error')
        while report_lines:
            line = report_lines.pop(0).strip('\r').strip('\n').strip()
            # assign bus information and fault information
            self.bus_properties['bus_number'], self.bus_properties['bus_name'] = line[:6], line[7:18]
            line = [x.strip() for x in line[19:].split(',')]
            self.bus_properties['bus_kv'] = line[0]
            self.bus_properties['maxvolt'] = line[1]
            self.bus_properties['time'] = line[2]
            # assign symmetric fault info
            self.three_phase['symmetric_mva'] = line[3]
            self.three_phase['symmetric_kA'] = line[4]
            self.three_phase['asymmetric_kA'] = line[5]
            self.three_phase['x/r ratio'] = line[6]
            self.three_phase['multiplying factor'] = line[7]
            # assign asymmetric fault info
            self.asymmetric['symmetric_kA'] = line[8]
            self.asymmetric['asymmetric_kA'] = line[9]
            self.asymmetric['x/r ratio'] = line[10]
            self.asymmetric['multiplying factor'] = line[11]
            self.asymmetric['llg symmetric phase'] = line[12]
            self.asymmetric['llg 3 x seq'] = line[13]
            self.asymmetric['pos thev r'] = line[14]
            self.asymmetric['pos thev x'] = line[15]
            self.asymmetric['neg thev r'] = line[16]
            self.asymmetric['neg thev x'] = line[17]

        self.area = ''