Objective:
We have job description in the form of a string contained in a file called job.queue.
Each string corresponds to a job itself.
We have a lot of jobs in the file job.queue.
If we run them in sequential mode, it will take a lot of time.
We would like to parallelise them using linux shell script here to reduce the overall job run time.
Each string corresponds to a job itself.
We have a lot of jobs in the file job.queue.
If we run them in sequential mode, it will take a lot of time.
We would like to parallelise them using linux shell script here to reduce the overall job run time.
Lab setup:
OS: Centos 7.9.2009
Directory needed: ~/ptest
Files: job.queue, actual script file,job.parallel
Directory needed: ~/ptest
Files: job.queue, actual script file,job.parallel
Code to create job.queue:
cd ~/ptest;
cat /dev/null > job.queue;
for i in {1..50000}
do
echo job${i} >> job.queue;
done
echo 10 >job.parallel;
cat /dev/null > job.queue;
for i in {1..50000}
do
echo job${i} >> job.queue;
done
echo 10 >job.parallel;
Code to launch the jobs in parallel:
cd ~/ptest;
#!/bin/ksh
#code for parallel processing - model 2
cd ~/ptest;
cat /dev/null > job.processed;
cat /dev/null > job.tracker;
cat /dev/null > job.lock;
ldcnt=5;
export ldcnt;
#Module to reset all operational files to the count of loaders.
for i in $(eval echo "{1..${ldcnt}}")
do
cat /dev/null > job.processed.th${i};
cat /dev/null > job.tracker.th${i};
cat /dev/null > job.lock.th${i};
rm job.parallel.th${i} >/dev/null 2>&1;
rm job.queue.th${i} >/dev/null 2>&1;
done
if ! [ -f job.parallel ];
then
echo 1 > job.parallel;
fi
fsq()
{
exec 200>job.lock.${thn};
while true
do
flock -xn 200 && break || sleep .01;
done
echo $key1,processed >> job.processed.${thn};
200>&-
}
loader()
{
typeset -i i;
i=0;
while read line
do
pth=$(cat job.parallel.${thn});
if [ $i -ge ${pth} ];
then
while true
do
if [ $i -lt ${pth} ];
then
break;
else
sleep .01;
i=$(ps -ef|grep -i fsq|grep ${thn}|grep -v grep|wc -l);
fi
done
fi
((i++));
key1=$line;
export -f fsq;
export key1;
{ nohup bash -c fsq ${thn} & } >/dev/null 2>&1;
jid=$(echo $!);
echo $line - submitted under $jid|tee -a job.tracker.${thn};
done < job.queue.${thn}
}
#Module to launch the loaders
typeset -i j;
j=0;
#Module to break the work unit
lcnt=$(cat job.queue|wc -l);
spltcnt=$(echo "scale=0; $(bc -l <<< ${lcnt}+${ldcnt}-1)/${ldcnt}"|bc);
awk -v s=${spltcnt} 'NR%s==1 { file = FILENAME ".th" sprintf("%d", (NR/s)+1) } { print > file }' job.queue;
#parallel thread distributor
pth=$(cat job.parallel);
typeset -i m;
m=0;
if [ ${pth} -ge ${ldcnt} ];
then
for l in $(eval echo "{1..${pth}}")
do
((m++));
if [ -f job.parallel.th${m} ];
then
cupval=$(cat job.parallel.th${m});
newval=$(bc -l <<< ${cupval}+1);
echo ${newval} > job.parallel.th${m};
else
echo 1 > job.parallel.th${m};
fi
if [ ${m} -ge ${ldcnt} ];
then
m=0;
fi
done
else
for l in $(eval echo "{1..${ldcnt}}")
do
((m++));
if [ ${pth} -gt 0 ];
then
echo 1 > job.parallel.th${m};
else
echo 0 > job.parallel.th${m};
fi
((pth--));
done
fi
#loader launcher
for k in $(ls -tr job.queue.th*)
do
thn=$(echo $k|cut -d '.' -f 3);
export thn;
export -f loader;
{ nohup bash -c loader ${thn} & } >/dev/null 2>&1;
done
#code for parallel processing - model 2
cd ~/ptest;
cat /dev/null > job.processed;
cat /dev/null > job.tracker;
cat /dev/null > job.lock;
ldcnt=5;
export ldcnt;
#Module to reset all operational files to the count of loaders.
for i in $(eval echo "{1..${ldcnt}}")
do
cat /dev/null > job.processed.th${i};
cat /dev/null > job.tracker.th${i};
cat /dev/null > job.lock.th${i};
rm job.parallel.th${i} >/dev/null 2>&1;
rm job.queue.th${i} >/dev/null 2>&1;
done
if ! [ -f job.parallel ];
then
echo 1 > job.parallel;
fi
fsq()
{
exec 200>job.lock.${thn};
while true
do
flock -xn 200 && break || sleep .01;
done
echo $key1,processed >> job.processed.${thn};
200>&-
}
loader()
{
typeset -i i;
i=0;
while read line
do
pth=$(cat job.parallel.${thn});
if [ $i -ge ${pth} ];
then
while true
do
if [ $i -lt ${pth} ];
then
break;
else
sleep .01;
i=$(ps -ef|grep -i fsq|grep ${thn}|grep -v grep|wc -l);
fi
done
fi
((i++));
key1=$line;
export -f fsq;
export key1;
{ nohup bash -c fsq ${thn} & } >/dev/null 2>&1;
jid=$(echo $!);
echo $line - submitted under $jid|tee -a job.tracker.${thn};
done < job.queue.${thn}
}
#Module to launch the loaders
typeset -i j;
j=0;
#Module to break the work unit
lcnt=$(cat job.queue|wc -l);
spltcnt=$(echo "scale=0; $(bc -l <<< ${lcnt}+${ldcnt}-1)/${ldcnt}"|bc);
awk -v s=${spltcnt} 'NR%s==1 { file = FILENAME ".th" sprintf("%d", (NR/s)+1) } { print > file }' job.queue;
#parallel thread distributor
pth=$(cat job.parallel);
typeset -i m;
m=0;
if [ ${pth} -ge ${ldcnt} ];
then
for l in $(eval echo "{1..${pth}}")
do
((m++));
if [ -f job.parallel.th${m} ];
then
cupval=$(cat job.parallel.th${m});
newval=$(bc -l <<< ${cupval}+1);
echo ${newval} > job.parallel.th${m};
else
echo 1 > job.parallel.th${m};
fi
if [ ${m} -ge ${ldcnt} ];
then
m=0;
fi
done
else
for l in $(eval echo "{1..${ldcnt}}")
do
((m++));
if [ ${pth} -gt 0 ];
then
echo 1 > job.parallel.th${m};
else
echo 0 > job.parallel.th${m};
fi
((pth--));
done
fi
#loader launcher
for k in $(ls -tr job.queue.th*)
do
thn=$(echo $k|cut -d '.' -f 3);
export thn;
export -f loader;
{ nohup bash -c loader ${thn} & } >/dev/null 2>&1;
done
Throughput Chart for Multiple Loader (aka - the above script):
# of jobs/sec is shown below...
We will meet in next blog.
Wonderful Blog.... Thanks for sharing with us...
ReplyDeleteHadoop Training in Chennai
Hadoop Training in Bangalore
Big Data Online Training
Big Data Training in Coimbatore