process_parallel.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. #! /usr/bin/env python3
  2. from os import listdir
  3. from os.path import join, isdir
  4. import argparse
  5. import subprocess
  6. import multiprocessing
  7. from multiprocessing.pool import Pool
  8. from merge import merge_files
  9. def run_job(job_number, executable, files):
  10. file_list = f'file_list_{job_number:02d}.txt'
  11. with open(file_list, 'w') as f:
  12. f.write("\n".join(files))
  13. output_filename = f'output_{job_number:02d}.root'
  14. ret = subprocess.run([executable, '-s', '-F', file_list,
  15. '-o', output_filename])
  16. retcode = ret.returncode
  17. if retcode != 0:
  18. raise RuntimeError(f'Job {job_number} encountered errors!'
  19. f'(retcode: {retcode}), check log file.')
  20. return (job_number, output_filename)
  21. if __name__ == '__main__':
  22. parser = argparse.ArgumentParser()
  23. add = parser.add_argument
  24. add('executable')
  25. add('--jobs', '-j', type=int, default=multiprocessing.cpu_count())
  26. add('--dir', '-d', default='./data')
  27. add('--mergeinto', '-m', default='output.root')
  28. args = parser.parse_args()
  29. if not isdir(args.dir):
  30. raise ValueError(f'Directory {args.dir} does not exist')
  31. files = sorted([join(args.dir, fname) for fname in listdir(args.dir) if fname[-5:] == '.root'])
  32. files_per_job = len(files) // args.jobs
  33. job_files = [files[i::args.jobs] for i in range(args.jobs)]
  34. output_files = []
  35. def job_callback(args):
  36. job_num, job_file = args
  37. print(f'job {job_num} finished')
  38. output_files.append(job_file)
  39. with Pool(args.jobs) as pool:
  40. print(f'Starting {args.jobs} processes to process {len(files)} files')
  41. results = []
  42. for i, files in enumerate(job_files):
  43. results.append(pool.apply_async(run_job, (i, args.executable, files),
  44. callback=job_callback))
  45. for result in results: result.get()
  46. pool.close()
  47. pool.join()
  48. print('Finished processing nTuples.')
  49. print('Begin merging job files')
  50. merge_files(output_files, args.mergeinto)