From d92f277dc94c57f236374f13064ed83779f74656 Mon Sep 17 00:00:00 2001 From: robertcoen Date: Wed, 21 Jul 2021 16:34:29 +0100 Subject: [PATCH 1/2] dataaccess functionality --- code/gateway/dataaccess.q | 28 +++++++++++++--------- code/gateway/kxdash.q | 49 ++++++++++++++++++++++++--------------- 2 files changed, 47 insertions(+), 30 deletions(-) diff --git a/code/gateway/dataaccess.q b/code/gateway/dataaccess.q index c414358d6..3f0701838 100644 --- a/code/gateway/dataaccess.q +++ b/code/gateway/dataaccess.q @@ -7,17 +7,8 @@ go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])}; // Full generality dataaccess function in the gateway getdata:{[o] - // Input checking in the gateway - reqno:.requests.initlogger[o]; - o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; - // Get the Procs - if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; - // Get Default process behavior - default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;}); - // Use upserting logic to determine behaviour - options:default,o; - if[`ordering in key o;options[`ordering]: go each options`ordering]; - o:adjustqueries[o;partdict o]; + // Format query for multiprocess querying + preprocessing[o]; // Execute the queries if[options`getquery; $[.gw.call .z.w; @@ -133,3 +124,18 @@ colstm:{[input]: raze ((count') input[`aggregations]) #' key input[`aggregations crossprocmerge:{[input;A](^/)colmerge[;A;]'[colstm[input];$[A[0]~0!A[0];cols A[0];((cols A[0]) where not (cols A[0]) in cols key A[0])]]}; updategwtabprop:{[]:.gw.syncexec[".checkinputs.tablepropertiesconfig";exec servertype from .gw.servers];} + + +preprocessing:{[o] + // Input checking in the gateway + reqno::.requests.initlogger[o]; + o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; + // Get the Procs + if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; + // Get Default process behavior + default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;}); + // Use upserting logic to determine behaviour + options::default,o; + if[`ordering in key o;options[`ordering]: go each options`ordering]; + o::adjustqueries[o;partdict o]; + }; \ No newline at end of file diff --git a/code/gateway/kxdash.q b/code/gateway/kxdash.q index d978aa509..f5d8531b0 100644 --- a/code/gateway/kxdash.q +++ b/code/gateway/kxdash.q @@ -1,6 +1,5 @@ +\c 200 2000 \d .kxdash -enabled:@[value;`enabled;{0b}]; - // use this to store the additional params that the kx dashboards seem to send in dashparams:`o`w`r`limit!(0;0i;0i;0W) @@ -9,6 +8,17 @@ dashexec:{[q;s;j] .gw.asyncexecjpt[(dashremote;q;dashparams);(),s;dashjoin[j];();0Wn] } +// Full generality dataaccess function for kx dashboards +getdata:{[o] + // Format query for multiprocess querying + .dataaccess.preprocessing[o]; + if[.dataaccess.options`getquery; + //Allow functionality of building a query + :.gw.asyncexec[(dashremote;(`getdata,o);dashparams);.dataaccess.options[`procs]];]; + //Execute the query + :.gw.asyncexecjpt[(dashremote;(`getdata;o);dashparams);.dataaccess.options[`procs];dashjoin[.dataaccess.returntab[.dataaccess.options;;.dataaccess.reqno]];.dataaccess.options[`postback];.dataaccess.options[`timeout]]; + } + // execute the request // return a dict of status and result, along with the params // add a flag to the start of the list to stop dictionaries collapsing @@ -24,20 +34,21 @@ dashjoin:{[joinfunc;r] (`.dash.snd_err;r[0;1;`w];r[0;1;`r];r[0;1;`result])] } +//Define the current .z.ps as a new function to be called in dashps +.z.ops:@[.:;`.z.ps;{.:}]; + dashps:{ - // check the query coming in meets the format - $[@[{`f`w`r`x`u~first 1_ value first x};x;0b]; - // pull out the values we need to return to the dashboards - [dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]); + // check the query coming in meets the format + $[@[{`f`w`r`x`u~first 1_ value first x};x;0b]; + // pull out the values we need to return to the dashboards + [dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]); // execute the query part, which must look something like // .kxdash.dashexec["select from t";`rdb`hdb;raze] - value x[4;1]; - ]; - // - value x] + .z.ops x[4;1];]; + // if incoming message is not in kxdash format, treat as normal + .z.ops x] } - // need this to handle queries that only hit one backend process // reformat those responses to look the same formatresponse:{[status;sync;result] @@ -46,16 +57,16 @@ formatresponse:{[status;sync;result] :$[res`status; (`.dash.rcv_msg;res`w;res`o;res`r;res`result); (`.dash.snd_err;res`w;res`r;res`result)]]; - $[not[status]and sync;'result;result]} - + $[not[status]and sync;'result;result] + } init:{ - // KX dashboards are expecting getFunctions to be defined on the process - .api.getFunctions:@[value;`.api.getFunctions;{{:()}}]; - // Reset format response - .gw.formatresponse:formatresponse; - // incorporate dashps into the .z.ps definition - .z.ps:{x@y;.kxdash.dashps y}@[value;`.z.ps;{{value x}}]; + // KX dashboards are expecting getFunctions to be defined on the process + .api.getFunctions:@[value;`.api.getFunctions;{{:()}}]; + // Reset format response + .gw.formatresponse:formatresponse; + // Redefine .z.ps as dashps to handle incoming queries + .z.ps:dashps; }; if[enabled;init[]]; From ac8bf91cdca1bd5d58ca46daf592d915c36b46b4 Mon Sep 17 00:00:00 2001 From: robertcoen Date: Tue, 3 Aug 2021 17:43:40 +0100 Subject: [PATCH 2/2] Updated preprocessing and namespce --- code/gateway/dataaccess.q | 21 +++++++++++---------- code/gateway/kxdash.q | 20 +++++++++----------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/code/gateway/dataaccess.q b/code/gateway/dataaccess.q index 3f0701838..59e4445c3 100644 --- a/code/gateway/dataaccess.q +++ b/code/gateway/dataaccess.q @@ -8,17 +8,17 @@ go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])}; // Full generality dataaccess function in the gateway getdata:{[o] // Format query for multiprocess querying - preprocessing[o]; + query:preprocessing[o]; // Execute the queries - if[options`getquery; + if[query[`options][`getquery]; $[.gw.call .z.w; - :.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]]; - :.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]]; + :.gw.syncexec[(`.dataaccess.buildquery;query[`o]);query[`options][`procs]]; + :.gw.asyncexec[(`.dataaccess.buildquery;query[`o]);query[`options][`procs]]]]; $[.gw.call .z.w; //if sync - :.gw.syncexecjt[(`getdata;o);options[`procs];returntab[options;;reqno];options[`timeout]]; + :.gw.syncexecjt[(`getdata;query[`o]);query[`options][`procs];returntab[query[`options];;query[`reqno]];query[`options][`timeout]]; // if async - :.gw.asyncexecjpt[(`getdata;o);options[`procs];returntab[options;;reqno];options[`postback];options[`timeout]]]; + :.gw.asyncexecjpt[(`getdata;query[`o]);query[`options][`procs];returntab[query[`options];;query`[reqno]];query[`options][`postback];query[`options][`timeout]]]; }; // Dynamic routing finds all processes with relevant data @@ -128,14 +128,15 @@ updategwtabprop:{[]:.gw.syncexec[".checkinputs.tablepropertiesconfig";exec serve preprocessing:{[o] // Input checking in the gateway - reqno::.requests.initlogger[o]; + reqno:.requests.initlogger[o]; o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; // Get the Procs if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; // Get Default process behavior default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;}); // Use upserting logic to determine behaviour - options::default,o; - if[`ordering in key o;options[`ordering]: go each options`ordering]; - o::adjustqueries[o;partdict o]; + options:default,o; + if[`ordering in key o;options[`ordering]:: go each options`ordering]; + o:adjustqueries[o;partdict o]; + `o`options`reqno!(o;options;reqno) }; \ No newline at end of file diff --git a/code/gateway/kxdash.q b/code/gateway/kxdash.q index f5d8531b0..fb9844cd1 100644 --- a/code/gateway/kxdash.q +++ b/code/gateway/kxdash.q @@ -1,4 +1,3 @@ -\c 200 2000 \d .kxdash // use this to store the additional params that the kx dashboards seem to send in dashparams:`o`w`r`limit!(0;0i;0i;0W) @@ -11,12 +10,12 @@ dashexec:{[q;s;j] // Full generality dataaccess function for kx dashboards getdata:{[o] // Format query for multiprocess querying - .dataaccess.preprocessing[o]; - if[.dataaccess.options`getquery; + query:.dataaccess.preprocessing[o]; + if[query[`options][`getquery]; //Allow functionality of building a query - :.gw.asyncexec[(dashremote;(`getdata,o);dashparams);.dataaccess.options[`procs]];]; + :.gw.asyncexec[(dashremote;(`getdata,query[`o]);dashparams);query[`options][`procs]];]; //Execute the query - :.gw.asyncexecjpt[(dashremote;(`getdata;o);dashparams);.dataaccess.options[`procs];dashjoin[.dataaccess.returntab[.dataaccess.options;;.dataaccess.reqno]];.dataaccess.options[`postback];.dataaccess.options[`timeout]]; + :.gw.asyncexecjpt[(dashremote;(`getdata;query[`o]);dashparams);query[`options][`procs];dashjoin[.dataaccess.returntab[query[`options];;query[`reqno]]];query[`options][`postback];query[`options][`timeout]]; } // execute the request @@ -34,9 +33,6 @@ dashjoin:{[joinfunc;r] (`.dash.snd_err;r[0;1;`w];r[0;1;`r];r[0;1;`result])] } -//Define the current .z.ps as a new function to be called in dashps -.z.ops:@[.:;`.z.ps;{.:}]; - dashps:{ // check the query coming in meets the format $[@[{`f`w`r`x`u~first 1_ value first x};x;0b]; @@ -44,9 +40,9 @@ dashps:{ [dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]); // execute the query part, which must look something like // .kxdash.dashexec["select from t";`rdb`hdb;raze] - .z.ops x[4;1];]; + .gw.ps x[4;1];]; // if incoming message is not in kxdash format, treat as normal - .z.ops x] + .gw.ps x] } // need this to handle queries that only hit one backend process @@ -61,6 +57,8 @@ formatresponse:{[status;sync;result] } init:{ + //Define the current .z.ps as a new function to be called in dashps + .gw.ps:@[value;`.z.ps;{value}]; // KX dashboards are expecting getFunctions to be defined on the process .api.getFunctions:@[value;`.api.getFunctions;{{:()}}]; // Reset format response @@ -69,4 +67,4 @@ init:{ .z.ps:dashps; }; -if[enabled;init[]]; +if[enabled;init[]]; \ No newline at end of file