Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

The way we do it is complicated but bulletproof. In short, use the multiprocessing Module to run the simulations. You will need to put your run script into a function so that you can pass that function to multiprocessing. Please see the code below. The .Process() function starts the simulation by passing the next faultidev to the Function rundynsim(). I have a 28 core machine, so I load up 36 processes and let the multi-threading engine handle the balancing. This is set with the numprocesses Variable. If you want to run sequentially, you can use numprocesses=1.

    while fault_idevs:
    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    #Create a new thread if we haven't used all available to us.  Only create 
    #   one per sleep cycle.
    if( len(threads) < num_processes):    
        fault_being_executed = fault_idevs[0]       # Strings passed by value
        p = multiprocessing.Process(target=run_dynsim, args=[cnvcase, snap, fault_idevs.pop(0), lock, generic_VTG])
        p.start()
        print p, p.is_alive()
        threads.append(p)
        # Set us up a dictionary of running processes and some info so that we can 
        #  check to see if they have stalled
        running_processes[p.pid] = {'Fault_Name': fault_being_executed, 'Log_Size': 0, 'Out_Size': 0, 'Num_Stalls': 0, 'Start_Time': datetime.now()}
    else:
        for thread in threads:                        # Remove any threads that have finished
            if not thread.is_alive():
                del running_processes[thread.pid]
                threads.remove(thread)
                print 'REMOVING A THREAD'
    time.sleep(4)                                    
    # Let's check on our processes every minute
    stalled_process_check(running_processes, threads, lock, 60, len(fault_idevs), True)

To answer the main part of your question, running them in separate processes allows you to check and see if they are still working. If not, terminate them and move on. Some of mine can take more than 30 minutes to complete.

See the function below. The gist of it is to check the length of the log file and out Files; and if they aren't changing, terminate the process. The os.stat Functions make a call to get file sizes. These get stored and compared with the next stalled process check. This gets run every minute, so hung processes will be terminated within about three minutes. (The allowed number of stalls below is set to one, so it will terminate a minute after detection.)

# =============================================================================
# Function stalled_process_check
# Input: running_processes - the list of known running processes
#        threads - the list of threads being run by multiprocessing
#        lock - file lock for writing a status update to file
#        update_seconds - How often to actually run the main routine in this
#                         function
#        no_of_waiting - number of processes to queue or threads that are 
#                        waiting at the end
#        still_queueing - Status of whether there are more threads
#  Operation: This code loop through running_processes and save .log and 
#             .out file sizes.  If they don't change, it will record a stall.
#             If it hasn't changed the next time either, it will terminate the 
#             thread and let other code do the garbage collection. There is a 
#             status update at the end.
#  Output: Operates on *threads* 
# =============================================================================
def stalled_process_check(running_processes, threads, lock, update_seconds, no_of_waiting, still_queueing = True):
        global last_update_time
        # Let's check on our processes every update_seconds seconds
        try:
            elapsedTime = datetime.now() - last_update_time
        except NameError: #If last_update_time does not exist, create it 
            last_update_time = datetime.now()
            elapsedTime = datetime.now() - last_update_time
        if elapsedTime.total_seconds() > update_seconds:
            time_minutes = "%.2f" % (time.clock()/60)
            print('\n\nExecuting stalled process check @ ' + time_minutes + ' minutes runtime.')
            for pid, entry in running_processes.iteritems():
                faultidev = entry['Fault_Name']
                fault_name = faultidev[faultidev.find('FAULT_'):-4]
                log_file = '_Results\\' + fault_name + '.log'
                out_file = '_Results\\' + fault_name + '.out'
                try:
                    log_size = os.stat(log_file).st_size #Get the latest log file size
                except:
                    log_size = 0
                try:
                    out_size = os.stat(out_file).st_size #Get the latest out file size
                except:
                    out_size = 0
                print(' Fault name:  ' + fault_name)
                process_elapsedTime_seconds = (datetime.now() - entry['Start_Time']).total_seconds()/60.0
                print('  Execution Time:  {0:.1f} minutes').format(process_elapsedTime_seconds)
                print('  Log Size: ' + str(log_size) + '  /  Out Size: ' + str(out_size))
                if log_size == entry['Log_Size'] and out_size == entry['Out_Size']:
                    if entry['Num_Stalls'] == 0:
                        running_processes[pid]['Num_Stalls'] = 1
                    else:    # Num_Stalls > 0
                        # Terminate this process
                        for thread in threads:
                            if thread.pid == pid:
                                # If the thread was using the lock when we terminate 
                                # it then we're going to have a really bad day, btw.
                                thread.terminate()         
                                print('  Terminating stalled process with fault name:  ' + fault_name)
                                with lock:
                                    with open('DynamicsRunReport.txt', 'a') as myoutfile:
                                        time_minutes = "%.2f" % (time.clock()/60)
                                        myoutfile.write("Terminating stalled process " + fault_name + "@ " + time_minutes + ' minutes.\n')
                                break
                else:
                    # Update our values with the new ones
                    running_processes[pid]['Log_Size'] = log_size
                    running_processes[pid]['Out_Size'] = out_size
            last_update_time = datetime.now()
            if still_queueing: #added v3.37 - process status update
                status_update = '{} jobs waiting to queue.'.format(no_of_waiting)
            else:
                status_update = 'All jobs are queued. {} still running.'.format(no_of_waiting)
            print(status_update)