diff --git a/code/processes/gateway.q b/code/processes/gateway.q index e0fbbd566..bc1a1ef83 100644 --- a/code/processes/gateway.q +++ b/code/processes/gateway.q @@ -535,6 +535,12 @@ addserversfromconnectiontable[.servers.CONNECTIONS] // Join active .gw.servers to .servers.SERVERS table activeservers:{lj[select from .gw.servers where active;`handle xcol `w xkey .servers.SERVERS]} +/function to tell orchestrator to scale up or down +scale:{[procname;dir] + handle:.servers.gethandlebytype[`orchestrator;`any]; + neg[handle](`.orch.scale;procname;dir); + } + \d . // functions called by end-of-day processes diff --git a/code/processes/orchestrator.q b/code/processes/orchestrator.q new file mode 100644 index 000000000..115abb412 --- /dev/null +++ b/code/processes/orchestrator.q @@ -0,0 +1,47 @@ +/TorQ Orchestrator Process + +\d .orch + +/default parameters + +scalingdetails:([] time:`timestamp$(); procname:`$(); dir:`$(); instancecreated:`$(); instanceremoved:`$(); totalnumofinstances:`int$(); lowerlimit:`int$(); upperlimit:`int$()); /table for tracking scaling + +inputcsv:hsym first .proc.getconfigfile"processlimits.csv"; /location of csv file +limits:@[{.lg.o[`scale;"Opening ", x];`procname xkey ("SII"; enlist ",") 0:`$x}; (string inputcsv); {.lg.e[`scale;"failed to open ", (x)," : ",y];'y}[string inputcsv]]; /table of scalable processes and the max number of instances allowed for each + +scaleprocslist:exec procname from limits; /list of scalable processes + +/initialises connection to discovery process and creates keyed table containing the number of instances of each scalable process +getscaleprocsinstances:{[] + .servers.startup[]; + `.orch.procs set exec procname from .servers.procstab where proctype=`hdb; + scaleprocsinstances:count each group first each ` vs/:procs; + `.orch.scaleprocsinstances set ([procname:key scaleprocsinstances] instances:value scaleprocsinstances); + } + +getscaleprocsinstances[]; + +/function to scale up or down a process +scale:{[procname;dir] + if[dir=`up;op:"-u ";limitcheck:{x>=y};limit:`upper;parentproc:procname]; + + if[dir=`down;op:"-d ";limitcheck:{x<=y};limit:`lower;parentproc:first ` vs procname]; + + if[limitcheck[scaleprocsinstances[parentproc;`instances];limits[parentproc;limit]]; .lg.o[`scale;string[limit]," limit hit for ",string parentproc]; :()]; + + system "bash ${TORQHOME}/scale.sh ",op,string procname; + /update number of process instances + getscaleprocsinstances[]; + /update table with record for scaling + `.orch.scalingdetails upsert (.z.p;parentproc;dir;$[dir=`up;` sv parentproc,`$string .orch.scaleprocsinstances[parentproc;`instances];`];$[dir=`down;procname;`];scaleprocsinstances[parentproc;`instances];limits[parentproc;`lower];limits[parentproc;`upper]); + } + +/function to ensure all processes have been scaled up to meet lower limit +initialscaling:{[procname] + if[scaleprocsinstances[procname;`instances]> ${KDBAPPCONFIG}/customprocess.csv + echo "$inputprocname replicated as $newprocname" + bash ${TORQHOME}/torq.sh start $newprocname -csv ${KDBAPPCONFIG}/customprocess.csv + bash ${TORQHOME}/torq.sh summary -csv ${KDBAPPCONFIG}/customprocess.csv + fi + ;; + d) + if [ "$(echo $CUSTOMPROCNAMES | grep -w $inputprocname)" ] ; then + echo -e 'Shutting down process now' + bash ${TORQHOME}/torq.sh stop $inputprocname -csv ${KDBAPPCONFIG}/customprocess.csv + cp ${KDBAPPCONFIG}/customprocess.csv ${KDBAPPCONFIG}/customprocessCopy.csv + grep -v "$inputprocname" ${KDBAPPCONFIG}/customprocessCopy.csv >| ${KDBAPPCONFIG}/customprocess.csv + else + echo 'Process name does not match, try again' + fi + ;; + *) + usage + ;; + esac + done +else + usage +fi diff --git a/tests/orchestrator/customprocess.csv b/tests/orchestrator/customprocess.csv new file mode 100644 index 000000000..74ea50f33 --- /dev/null +++ b/tests/orchestrator/customprocess.csv @@ -0,0 +1,4 @@ +host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd +localhost,{KDBBASEPORT}+1,discovery,discovery1,,1,0,,,${KDBCODE}/processes/discovery.q,1, +localhost,{KDBBASEPORT},segmentedtickerplant,stp1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${TORQAPPHOME}/database.q -tplogdir ${KDBTPLOG}, +localhost,{KDBBASEPORT}+2,hdb,hdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBHDB},1,, diff --git a/tests/orchestrator/process.csv b/tests/orchestrator/process.csv new file mode 100644 index 000000000..74ea50f33 --- /dev/null +++ b/tests/orchestrator/process.csv @@ -0,0 +1,4 @@ +host,port,proctype,procname,U,localtime,g,T,w,load,startwithall,extras,qcmd +localhost,{KDBBASEPORT}+1,discovery,discovery1,,1,0,,,${KDBCODE}/processes/discovery.q,1, +localhost,{KDBBASEPORT},segmentedtickerplant,stp1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${TORQAPPHOME}/database.q -tplogdir ${KDBTPLOG}, +localhost,{KDBBASEPORT}+2,hdb,hdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBHDB},1,, diff --git a/tests/orchestrator/processlimits.csv b/tests/orchestrator/processlimits.csv new file mode 100644 index 000000000..be16bea67 --- /dev/null +++ b/tests/orchestrator/processlimits.csv @@ -0,0 +1,2 @@ +procname,lower,upper +hdb1,2,3 diff --git a/tests/orchestrator/run.sh b/tests/orchestrator/run.sh new file mode 100644 index 000000000..8a89b55c2 --- /dev/null +++ b/tests/orchestrator/run.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +#path to test directory +testpath=${KDBTESTS}/orchestrator + +OLDKDBAPPCONFIG=${KDBAPPCONFIG} +export KDBAPPCONFIG=${testpath} + +#start procs +${TORQHOME}/torq.sh start all -csv ${testpath}/customprocess.csv + +#Start test proc +/usr/bin/rlwrap $QCMD ${TORQHOME}/torq.q \ + -proctype orchestrator -procname orchestrator1 \ + -test ${testpath} \ + -load ${TORQHOME}/code/processes/orchestrator.q \ + -procfile ${testpath}/customprocess.csv -debug + +#Shut down procs +${TORQHOME}/torq.sh stop all -csv ${testpath}/customprocess.csv + +export KDBAPPCONFIG=${OLDKDBAPPCONFIG} diff --git a/tests/orchestrator/test.csv b/tests/orchestrator/test.csv new file mode 100644 index 000000000..cdc270c5f --- /dev/null +++ b/tests/orchestrator/test.csv @@ -0,0 +1,31 @@ +action,ms,bytes,lang,code,repeat,minver,comment + +run,0,0,q,\c 300 300,1,,"" + +/test initial scaling +true,0,0,q,.orch.scaleprocsinstances[`hdb1;`instances]=.orch.limits[`hdb1;`lower],1,,"check hdb1 scaled to lower limit upon startup" + +/test for scaling up func +before,0,0,q,instancesbefore:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1" +run,0,0,q,.orch.scale[`hdb1;`up],1,,"scale up hdb1" +run,0,0,q,instancesafter:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1" +true,0,0,q,instancesbeforeinstancesafter,1,,"check instances now smaller for hdb1" + +/testing limits: upper then lower +run,0,0,q,.orch.scale[`hdb1;`up],1,,"scale up" +run,0,0,q,before:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current insatnces of hdb1" +run,0,0,q,.orch.scale[`hdb1;`up],1,,"scaling up once more after limit is hit, to show no. instances doesn't chnage once limit hit" +run,0,0,q,after:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1" +true,0,0,q,before=after,1,,"checking no. instances hasn't changed, so limit prevents further scaling up" + +run,0,0,q,.orch.scale[`hdb1.3;`down],1,,"scale down" +run,0,0,q,before:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current insatnces of hdb1" +run,0,0,q,.orch.scale[`hdb1.2;`down],1,,"scale down once more to show no.instances stays same, indicating can't scale lower once limit hit" +run,0,0,q,after:.orch.scaleprocsinstances[`hdb1;`instances],1,,"get current instances of hdb1" +true,0,0,q,before=after,1,,"checking no. instances hasn't changed, so limit prevents further scaling down"