Objective:
We have job description in the form of a string contained in a file called job.queue.
Each string corresponds to a job itself.
We have a lot of jobs in the file job.queue.
If we run them in sequential mode, it will take a lot of time.
We would like to parallelise them using linux shell script here to reduce the overall job run time.
Each string corresponds to a job itself.
We have a lot of jobs in the file job.queue.
If we run them in sequential mode, it will take a lot of time.
We would like to parallelise them using linux shell script here to reduce the overall job run time.
Lab setup:
OS: Centos 7.9.2009
Directory needed: ~/ptest
Files: job.queue, actual script file,job.parallel
Directory needed: ~/ptest
Files: job.queue, actual script file,job.parallel
Code to create job.queue:
cd ~/ptest;
cat /dev/null > job.queue;
for i in {1..50000}
do
echo job${i} >> job.queue;
done
echo 10 >job.parallel;
cat /dev/null > job.queue;
for i in {1..50000}
do
echo job${i} >> job.queue;
done
echo 10 >job.parallel;
Code to launch the jobs in parallel:
cd ~/ptest;
#!/bin/ksh
#code for parallel processing - model 2
cd ~/ptest;
cat /dev/null > job.processed;
cat /dev/null > job.tracker;
cat /dev/null > job.lock;
ldcnt=5;
export ldcnt;
#Module to reset all operational files to the count of loaders.
for i in $(eval echo "{1..${ldcnt}}")
do
cat /dev/null > job.processed.th${i};
cat /dev/null > job.tracker.th${i};
cat /dev/null > job.lock.th${i};
rm job.parallel.th${i} >/dev/null 2>&1;
rm job.queue.th${i} >/dev/null 2>&1;
done
if ! [ -f job.parallel ];
then
echo 1 > job.parallel;
fi
fsq()
{
exec 200>job.lock.${thn};
while true
do
flock -xn 200 && break || sleep .01;
done
echo $key1,processed >> job.processed.${thn};
200>&-
}
loader()
{
typeset -i i;
i=0;
while read line
do
pth=$(cat job.parallel.${thn});
if [ $i -ge ${pth} ];
then
while true
do
if [ $i -lt ${pth} ];
then
break;
else
sleep .01;
i=$(ps -ef|grep -i fsq|grep ${thn}|grep -v grep|wc -l);
fi
done
fi
((i++));
key1=$line;
export -f fsq;
export key1;
{ nohup bash -c fsq ${thn} & } >/dev/null 2>&1;
jid=$(echo $!);
echo $line - submitted under $jid|tee -a job.tracker.${thn};
done < job.queue.${thn}
}
#Module to launch the loaders
typeset -i j;
j=0;
#Module to break the work unit
lcnt=$(cat job.queue|wc -l);
spltcnt=$(echo "scale=0; $(bc -l <<< ${lcnt}+${ldcnt}-1)/${ldcnt}"|bc);
awk -v s=${spltcnt} 'NR%s==1 { file = FILENAME ".th" sprintf("%d", (NR/s)+1) } { print > file }' job.queue;
#parallel thread distributor
pth=$(cat job.parallel);
typeset -i m;
m=0;
if [ ${pth} -ge ${ldcnt} ];
then
for l in $(eval echo "{1..${pth}}")
do
((m++));
if [ -f job.parallel.th${m} ];
then
cupval=$(cat job.parallel.th${m});
newval=$(bc -l <<< ${cupval}+1);
echo ${newval} > job.parallel.th${m};
else
echo 1 > job.parallel.th${m};
fi
if [ ${m} -ge ${ldcnt} ];
then
m=0;
fi
done
else
for l in $(eval echo "{1..${ldcnt}}")
do
((m++));
if [ ${pth} -gt 0 ];
then
echo 1 > job.parallel.th${m};
else
echo 0 > job.parallel.th${m};
fi
((pth--));
done
fi
#loader launcher
for k in $(ls -tr job.queue.th*)
do
thn=$(echo $k|cut -d '.' -f 3);
export thn;
export -f loader;
{ nohup bash -c loader ${thn} & } >/dev/null 2>&1;
done
#code for parallel processing - model 2
cd ~/ptest;
cat /dev/null > job.processed;
cat /dev/null > job.tracker;
cat /dev/null > job.lock;
ldcnt=5;
export ldcnt;
#Module to reset all operational files to the count of loaders.
for i in $(eval echo "{1..${ldcnt}}")
do
cat /dev/null > job.processed.th${i};
cat /dev/null > job.tracker.th${i};
cat /dev/null > job.lock.th${i};
rm job.parallel.th${i} >/dev/null 2>&1;
rm job.queue.th${i} >/dev/null 2>&1;
done
if ! [ -f job.parallel ];
then
echo 1 > job.parallel;
fi
fsq()
{
exec 200>job.lock.${thn};
while true
do
flock -xn 200 && break || sleep .01;
done
echo $key1,processed >> job.processed.${thn};
200>&-
}
loader()
{
typeset -i i;
i=0;
while read line
do
pth=$(cat job.parallel.${thn});
if [ $i -ge ${pth} ];
then
while true
do
if [ $i -lt ${pth} ];
then
break;
else
sleep .01;
i=$(ps -ef|grep -i fsq|grep ${thn}|grep -v grep|wc -l);
fi
done
fi
((i++));
key1=$line;
export -f fsq;
export key1;
{ nohup bash -c fsq ${thn} & } >/dev/null 2>&1;
jid=$(echo $!);
echo $line - submitted under $jid|tee -a job.tracker.${thn};
done < job.queue.${thn}
}
#Module to launch the loaders
typeset -i j;
j=0;
#Module to break the work unit
lcnt=$(cat job.queue|wc -l);
spltcnt=$(echo "scale=0; $(bc -l <<< ${lcnt}+${ldcnt}-1)/${ldcnt}"|bc);
awk -v s=${spltcnt} 'NR%s==1 { file = FILENAME ".th" sprintf("%d", (NR/s)+1) } { print > file }' job.queue;
#parallel thread distributor
pth=$(cat job.parallel);
typeset -i m;
m=0;
if [ ${pth} -ge ${ldcnt} ];
then
for l in $(eval echo "{1..${pth}}")
do
((m++));
if [ -f job.parallel.th${m} ];
then
cupval=$(cat job.parallel.th${m});
newval=$(bc -l <<< ${cupval}+1);
echo ${newval} > job.parallel.th${m};
else
echo 1 > job.parallel.th${m};
fi
if [ ${m} -ge ${ldcnt} ];
then
m=0;
fi
done
else
for l in $(eval echo "{1..${ldcnt}}")
do
((m++));
if [ ${pth} -gt 0 ];
then
echo 1 > job.parallel.th${m};
else
echo 0 > job.parallel.th${m};
fi
((pth--));
done
fi
#loader launcher
for k in $(ls -tr job.queue.th*)
do
thn=$(echo $k|cut -d '.' -f 3);
export thn;
export -f loader;
{ nohup bash -c loader ${thn} & } >/dev/null 2>&1;
done
Throughput Chart for Multiple Loader (aka - the above script):
# of jobs/sec is shown below...
We will meet in next blog.