process_parallel.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. #!/usr/bin/env python3
  2. from os import listdir
  3. from os.path import join, isdir
  4. import argparse
  5. import subprocess
  6. import multiprocessing
  7. from multiprocessing.pool import Pool
  8. from filval_merge import merge_files
  9. def run_job(job_number, executable, files):
  10. file_list = f'file_list_{job_number:02d}.txt'
  11. with open(file_list, 'w') as f:
  12. f.write("\n".join(files))
  13. output_filename = f'output_{job_number:02d}.root'
  14. ret = subprocess.run([executable, '-s', '-F', file_list,
  15. '-o', output_filename])
  16. retcode = ret.returncode
  17. if retcode != 0:
  18. raise RuntimeError(f'Job {job_number} encountered errors!'
  19. f'(retcode: {retcode}), check log file.')
  20. return (job_number, output_filename)
  21. if __name__ == '__main__':
  22. parser = argparse.ArgumentParser()
  23. add = parser.add_argument
  24. add('executable')
  25. add('--jobs', '-j', type=int, default=multiprocessing.cpu_count())
  26. add('--dir', '-d', default='./data')
  27. add('--mergeinto', '-m', default='output.root')
  28. args = parser.parse_args()
  29. if not isdir(args.dir):
  30. raise ValueError(f'Directory {args.dir} does not exist')
  31. files = sorted([join(args.dir, fname) for fname in listdir(args.dir) if fname[-5:] == '.root'])
  32. files_per_job = len(files) // args.jobs
  33. job_files = [files[i::args.jobs] for i in range(args.jobs)]
  34. output_files = []
  35. def job_callback(args):
  36. job_num, job_file = args
  37. print(f'job {job_num} finished')
  38. output_files.append(job_file)
  39. with Pool(args.jobs) as pool:
  40. print(f'Starting {args.jobs} processes to process {len(files)} files')
  41. results = []
  42. for i, files in enumerate(job_files):
  43. results.append(pool.apply_async(run_job, (i, args.executable, files),
  44. callback=job_callback))
  45. for result in results: result.get()
  46. pool.close()
  47. pool.join()
  48. print('Finished processing nTuples.')
  49. print('Begin merging job files')
  50. merge_files(output_files, args.mergeinto)