#!/usr/bin/perl # parallel.pl by Bryan Dongray, btd@dongrays.com # available at http://www.norbry.net/tools/ # small modifications to run only on STDIN by Dale W. Carder, dwcarder@doit.wisc.edu # parallel PARALLELISM CMDS... $prog=$0; $prog =~ s/.*\///; # look for command line flags $vflg=0; while ($ARGV[0] =~ /^-/) { if ($ARGV[0] eq "-v") { $vflg=1; shift; next; } print STDERR "$prog: unknown option: $ARGV[0]\n"; goto errusage; } if ($#ARGV < 0) { print STDERR "$prog: arg count $#ARGV < 1\n"; errusage: print STDERR "Usage: $prog [-v] CMDS...\n"; print STDERR "\t-v\tverbose\n"; print STDERR "Reads commands to run via STDIN\n"; exit 1; } # set parallelism $parallelism=$ARGV[0]; if ($parallelism !~ /^[0-9]+$/) { print STDERR "$prog: $parallelism non numeric\n"; goto errusage; } if ($parallelism < 1 ) { print STDERR "$prog: $parallelism < 1\n"; goto errusage; } shift @ARGV; # init array of PIDs @pids=(); # fill up PID array with processes for ($pidindex=0 ; $parallelism == 0 || $pidindex < $parallelism ; $pidindex++) { if (&runit($pidindex) <= 0) { # it ended - or there was an error if ($parallelism == 0) { # if full out - set current maximum $parallelism=$pidindex; } goto wait4all; } } # forever loop for (;;) { # any process completed - restart new one $waitpid=wait; if ($waitpid < 0) { print STDERR "$prog: wait - out of sync:", $!, "\n"; exit 1; } # find associated child in array for ($pidindex=0 ; $pidindex < $parallelism ; $pidindex++) { if ($waitpid == $pids[$pidindex]) { if ($vflg) { print STDERR "end pid[$pidindex]=$waitpid: $arg[$pidindex]\n"; } # insert new process into that array element if (&runit($pidindex) <= 0) { # it ended - or there was an error goto wait4all; } last; } } } wait4all: # now to scan the array waiting for all children to complete # first time through there is no PID to find $waitpid == -1; for (;;) { $anyleft=0; if ($waitpid != -1) { # find associated child in array for ($pidindex=0 ; $pidindex < $parallelism ; $pidindex++) { if ($waitpid == $pids[$pidindex]) { if ($vflg) { print STDERR "end pid[$pidindex]=$waitpid: $arg[$pidindex]\n"; } $pids[$pidindex]=0; } else { # if any PID set - need to rescan PID list $anyleft |= $pids[$pidindex]; } } } if ($anyleft || $waitpid == -1) { $waitpid=wait; if ($waitpid < 0) { print STDERR "$prog: wait - out of sync:", $!, "\n"; exit 1; } } else { # no more to wait on exit 0; } } # run a new parallel process - standard input is last arg to CMD sub runit() { local ($pidindex) = @_; local ($lastarg, $newpid); # get the input for the CMD unless ($lastarg=) { if ($vflg) { print STDERR "$prog: EOF - wait tidy up\n"; } $pids[$pidindex]=0; return 0; } chomp $lastarg; $arg[$pidindex]=$lastarg; $newpid=fork(); # an error? if ($newpid < 0) { print STDERR "$prog: fork error:", $!, "\n"; $pids[$pidindex]=0; return $newpid; } # the parent? if ($newpid > 0) { $pids[$pidindex]=$newpid; return $newpid; } # child process # make STDIN /dev/null as our STDIN is the input to parallel args close(0); open(ZERO, "/dev/null"); if ($vflg) { print STDERR "start pid[$pidindex]=$$: $lastarg\n"; } exec $lastarg; print STDERR "$prog: exec error:", join(" ", $lastarg), "\nperror: ", $!, "\n"; exit 0; }