Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions code/processes/gateway.q
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,12 @@ addserversfromconnectiontable[.servers.CONNECTIONS]
// Join active .gw.servers to .servers.SERVERS table
activeservers:{lj[select from .gw.servers where active;`handle xcol `w xkey .servers.SERVERS]}

/function to tell orchestrator to scale up or down
scale:{[procname;dir]
handle:.servers.gethandlebytype[`orchestrator;`any];
neg[handle](`.orch.scale;procname;dir);
}

\d .

// functions called by end-of-day processes
Expand Down
47 changes: 47 additions & 0 deletions code/processes/orchestrator.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/TorQ Orchestrator Process

\d .orch

/default parameters

scalingdetails:([] time:`timestamp$(); procname:`$(); dir:`$(); instancecreated:`$(); instanceremoved:`$(); totalnumofinstances:`int$(); lowerlimit:`int$(); upperlimit:`int$()); /table for tracking scaling

inputcsv:hsym first .proc.getconfigfile"processlimits.csv"; /location of csv file
limits:@[{.lg.o[`scale;"Opening ", x];`procname xkey ("SII"; enlist ",") 0:`$x}; (string inputcsv); {.lg.e[`scale;"failed to open ", (x)," : ",y];'y}[string inputcsv]]; /table of scalable processes and the max number of instances allowed for each

scaleprocslist:exec procname from limits; /list of scalable processes

/initialises connection to discovery process and creates keyed table containing the number of instances of each scalable process
getscaleprocsinstances:{[]
.servers.startup[];
`.orch.procs set exec procname from .servers.procstab where proctype=`hdb;
scaleprocsinstances:count each group first each ` vs/:procs;
`.orch.scaleprocsinstances set ([procname:key scaleprocsinstances] instances:value scaleprocsinstances);
}

getscaleprocsinstances[];

/function to scale up or down a process
scale:{[procname;dir]
if[dir=`up;op:"-u ";limitcheck:{x>=y};limit:`upper;parentproc:procname];

if[dir=`down;op:"-d ";limitcheck:{x<=y};limit:`lower;parentproc:first ` vs procname];

if[limitcheck[scaleprocsinstances[parentproc;`instances];limits[parentproc;limit]]; .lg.o[`scale;string[limit]," limit hit for ",string parentproc]; :()];

system "bash ${TORQHOME}/scale.sh ",op,string procname;
/update number of process instances
getscaleprocsinstances[];
/update table with record for scaling
`.orch.scalingdetails upsert (.z.p;parentproc;dir;$[dir=`up;` sv parentproc,`$string .orch.scaleprocsinstances[parentproc;`instances];`];$[dir=`down;procname;`];scaleprocsinstances[parentproc;`instances];limits[parentproc;`lower];limits[parentproc;`upper]);
}

/function to ensure all processes have been scaled up to meet lower limit
initialscaling:{[procname]
if[scaleprocsinstances[procname;`instances]<limits[procname;`lower];
reqinstances:limits[procname;`lower]-scaleprocsinstances[procname;`instances];
do[reqinstances;scale[procname;`up]];
];
}

initialscaling@/:scaleprocslist;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this doing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calls the intialscaling function for each of the scalable processes to ensure they have been scaled up to their lower limits on startup

1 change: 1 addition & 0 deletions config/processlimits.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
procname,lower,upper
2 changes: 1 addition & 1 deletion config/settings/gateway.q
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ loadprocesscode:1b // whether to load the process specific code def

// Server connection details
\d .servers
CONNECTIONS:`rdb`hdb // list of connections to make at start up
CONNECTIONS:`rdb`hdb`orchestrator // list of connections to make at start up
RETRY:0D00:01 // period on which to retry dead connections. If 0, no reconnection attempts

\d .aqrest
Expand Down
75 changes: 75 additions & 0 deletions scale.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/bin/bash

usage() { echo "Usage: $0 [-u procname] or [-d procname]"; exit 1; }

#input procname from cmd line
inputprocname=$2
#get all procnames from process.csv
PROCNAMES="$(sed '1d' ${KDBAPPCONFIG}/process.csv | awk -F',' '{print $4}')"
#get all procnames from customprocess.csv
CUSTOMPROCNAMES="$(sed '1d' ${KDBAPPCONFIG}/customprocess.csv | awk -F',' '{print $4}')"

if [ $# -gt 2 ] ; then
echo -e '\nToo many arguments.'
elif [ $# -eq 1 ] ; then
if [[ "$1" == "-"* ]]; then
echo -e '\nProcname needed. Use: \n ' $CUSTOMPROCNAMES
else
usage
fi
elif [ $# -eq 2 ] ; then
while getopts "ud" option;do
case ${option} in
u)
#check for correct argument number and content:
#check number of arguments is greater than 1
#check if number of arguments is lower than one
#check if inputed procname matches a procname in process.csv
if [ "$(echo $PROCNAMES | grep -w $inputprocname)" == "" ] ; then
echo -e '\nUnavailable procname. Use: \n' $PROCNAMES
else
#get the last replica of inputed procname
var=$(bash ${TORQHOME}/torq.sh procs -csv ${KDBAPPCONFIG}/customprocess.csv | grep $inputprocname | tail -n 1)
#work out new procname based var
if [ `echo $var | grep -P -o '\d' | wc -l` -lt 2 ] ; then
next=2
else
next=$((${var: -1} + 1))
fi
newprocname="$inputprocname"."$next"
#determine port number increment based on number of lines in the process.csv
#if that port is already in use, increment by 1 till succesful
portnum="$(wc -l < ${KDBAPPCONFIG}/process.csv)"
while [ "$(netstat -an | grep "$((${KDBBASEPORT}+$portnum))" | grep -i listen)" != "" ]
do
((portnum++))
done
#use the inputed procname to determine what process to replicate:
#use configured port number and process name
#add new process to customprocess.csv
#starts the replicated process automatically using customprocess.csv in the -csv flag
#run summary of all processes in customprocess.csv
grep $inputprocname ${KDBAPPCONFIG}/process.csv | awk -F',' -vOFS=',' '{ $2 = "{KDBBASEPORT}+" '"$portnum"'; $4 ="'$newprocname'" }1' >> ${KDBAPPCONFIG}/customprocess.csv
echo "$inputprocname replicated as $newprocname"
bash ${TORQHOME}/torq.sh start $newprocname -csv ${KDBAPPCONFIG}/customprocess.csv
bash ${TORQHOME}/torq.sh summary -csv ${KDBAPPCONFIG}/customprocess.csv
fi
;;
d)
if [ "$(echo $CUSTOMPROCNAMES | grep -w $inputprocname)" ] ; then
echo -e 'Shutting down process now'
bash ${TORQHOME}/torq.sh stop $inputprocname -csv ${KDBAPPCONFIG}/customprocess.csv
cp ${KDBAPPCONFIG}/customprocess.csv ${KDBAPPCONFIG}/customprocessCopy.csv
grep -v "$inputprocname" ${KDBAPPCONFIG}/customprocessCopy.csv >| ${KDBAPPCONFIG}/customprocess.csv
else
echo 'Process name does not match, try again'
fi
;;
*)
usage
;;
esac
done
else
usage
fi
4 changes: 4 additions & 0 deletions tests/orchestrator/customprocess.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd
localhost,{KDBBASEPORT}+1,discovery,discovery1,,1,0,,,${KDBCODE}/processes/discovery.q,1,
localhost,{KDBBASEPORT},segmentedtickerplant,stp1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${TORQAPPHOME}/database.q -tplogdir ${KDBTPLOG},
localhost,{KDBBASEPORT}+2,hdb,hdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBHDB},1,,
4 changes: 4 additions & 0 deletions tests/orchestrator/process.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd
localhost,{KDBBASEPORT}+1,discovery,discovery1,,1,0,,,${KDBCODE}/processes/discovery.q,1,
localhost,{KDBBASEPORT},segmentedtickerplant,stp1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${TORQAPPHOME}/database.q -tplogdir ${KDBTPLOG},
localhost,{KDBBASEPORT}+2,hdb,hdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBHDB},1,,
2 changes: 2 additions & 0 deletions tests/orchestrator/processlimits.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
procname,lower,upper
hdb1,2,3
22 changes: 22 additions & 0 deletions tests/orchestrator/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

#path to test directory
testpath=${KDBTESTS}/orchestrator

OLDKDBAPPCONFIG=${KDBAPPCONFIG}
export KDBAPPCONFIG=${testpath}

#start procs
${TORQHOME}/torq.sh start all -csv ${testpath}/customprocess.csv

#Start test proc
/usr/bin/rlwrap $QCMD ${TORQHOME}/torq.q \
-proctype orchestrator -procname orchestrator1 \
-test ${testpath} \
-load ${TORQHOME}/code/processes/orchestrator.q \
-procfile ${testpath}/customprocess.csv -debug

#Shut down procs
${TORQHOME}/torq.sh stop all -csv ${testpath}/customprocess.csv

export KDBAPPCONFIG=${OLDKDBAPPCONFIG}
31 changes: 31 additions & 0 deletions tests/orchestrator/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
action,ms,bytes,lang,code,repeat,minver,comment

run,0,0,q,\c 300 300,1,,""

/test initial scaling
true,0,0,q,.orch.scaleprocsinstances[`hdb1;`instances]=.orch.limits[`hdb1;`lower],1,,"check hdb1 scaled to lower limit upon startup"

/test for scaling up func
before,0,0,q,instancesbefore:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1"
run,0,0,q,.orch.scale[`hdb1;`up],1,,"scale up hdb1"
run,0,0,q,instancesafter:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1"
true,0,0,q,instancesbefore<instancesafter,1,,"check instances now bigger for hdb1"

/testing scaling down
run,0,0,q,instancesbefore:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1"
run,0,0,q,.orch.scale[`hdb1.3;`down],1,,"scale down"
run,0,0,q,instancesafter:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1"
true,0,0,q,instancesbefore>instancesafter,1,,"check instances now smaller for hdb1"

/testing limits: upper then lower
run,0,0,q,.orch.scale[`hdb1;`up],1,,"scale up"
run,0,0,q,before:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current insatnces of hdb1"
run,0,0,q,.orch.scale[`hdb1;`up],1,,"scaling up once more after limit is hit, to show no. instances doesn't chnage once limit hit"
run,0,0,q,after:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1"
true,0,0,q,before=after,1,,"checking no. instances hasn't changed, so limit prevents further scaling up"

run,0,0,q,.orch.scale[`hdb1.3;`down],1,,"scale down"
run,0,0,q,before:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current insatnces of hdb1"
run,0,0,q,.orch.scale[`hdb1.2;`down],1,,"scale down once more to show no.instances stays same, indicating can't scale lower once limit hit"
run,0,0,q,after:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1"
true,0,0,q,before=after,1,,"checking no. instances hasn't changed, so limit prevents further scaling down"