diff --git a/code/gateway/dataaccess.q b/code/gateway/dataaccess.q index c414358d6..59e4445c3 100644 --- a/code/gateway/dataaccess.q +++ b/code/gateway/dataaccess.q @@ -7,27 +7,18 @@ go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])}; // Full generality dataaccess function in the gateway getdata:{[o] - // Input checking in the gateway - reqno:.requests.initlogger[o]; - o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; - // Get the Procs - if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; - // Get Default process behavior - default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;}); - // Use upserting logic to determine behaviour - options:default,o; - if[`ordering in key o;options[`ordering]: go each options`ordering]; - o:adjustqueries[o;partdict o]; + // Format query for multiprocess querying + query:preprocessing[o]; // Execute the queries - if[options`getquery; + if[query[`options][`getquery]; $[.gw.call .z.w; - :.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]]; - :.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]]; + :.gw.syncexec[(`.dataaccess.buildquery;query[`o]);query[`options][`procs]]; + :.gw.asyncexec[(`.dataaccess.buildquery;query[`o]);query[`options][`procs]]]]; $[.gw.call .z.w; //if sync - :.gw.syncexecjt[(`getdata;o);options[`procs];returntab[options;;reqno];options[`timeout]]; + :.gw.syncexecjt[(`getdata;query[`o]);query[`options][`procs];returntab[query[`options];;query[`reqno]];query[`options][`timeout]]; // if async - :.gw.asyncexecjpt[(`getdata;o);options[`procs];returntab[options;;reqno];options[`postback];options[`timeout]]]; + :.gw.asyncexecjpt[(`getdata;query[`o]);query[`options][`procs];returntab[query[`options];;query`[reqno]];query[`options][`postback];query[`options][`timeout]]]; }; // Dynamic routing finds all processes with relevant data @@ -133,3 +124,19 @@ colstm:{[input]: raze ((count') input[`aggregations]) #' key input[`aggregations crossprocmerge:{[input;A](^/)colmerge[;A;]'[colstm[input];$[A[0]~0!A[0];cols A[0];((cols A[0]) where not (cols A[0]) in cols key A[0])]]}; updategwtabprop:{[]:.gw.syncexec[".checkinputs.tablepropertiesconfig";exec servertype from .gw.servers];} + + +preprocessing:{[o] + // Input checking in the gateway + reqno:.requests.initlogger[o]; + o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; + // Get the Procs + if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; + // Get Default process behavior + default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;}); + // Use upserting logic to determine behaviour + options:default,o; + if[`ordering in key o;options[`ordering]:: go each options`ordering]; + o:adjustqueries[o;partdict o]; + `o`options`reqno!(o;options;reqno) + }; \ No newline at end of file diff --git a/code/gateway/kxdash.q b/code/gateway/kxdash.q index d978aa509..fb9844cd1 100644 --- a/code/gateway/kxdash.q +++ b/code/gateway/kxdash.q @@ -1,6 +1,4 @@ \d .kxdash -enabled:@[value;`enabled;{0b}]; - // use this to store the additional params that the kx dashboards seem to send in dashparams:`o`w`r`limit!(0;0i;0i;0W) @@ -9,6 +7,17 @@ dashexec:{[q;s;j] .gw.asyncexecjpt[(dashremote;q;dashparams);(),s;dashjoin[j];();0Wn] } +// Full generality dataaccess function for kx dashboards +getdata:{[o] + // Format query for multiprocess querying + query:.dataaccess.preprocessing[o]; + if[query[`options][`getquery]; + //Allow functionality of building a query + :.gw.asyncexec[(dashremote;(`getdata,query[`o]);dashparams);query[`options][`procs]];]; + //Execute the query + :.gw.asyncexecjpt[(dashremote;(`getdata;query[`o]);dashparams);query[`options][`procs];dashjoin[.dataaccess.returntab[query[`options];;query[`reqno]]];query[`options][`postback];query[`options][`timeout]]; + } + // execute the request // return a dict of status and result, along with the params // add a flag to the start of the list to stop dictionaries collapsing @@ -25,19 +34,17 @@ dashjoin:{[joinfunc;r] } dashps:{ - // check the query coming in meets the format - $[@[{`f`w`r`x`u~first 1_ value first x};x;0b]; - // pull out the values we need to return to the dashboards - [dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]); + // check the query coming in meets the format + $[@[{`f`w`r`x`u~first 1_ value first x};x;0b]; + // pull out the values we need to return to the dashboards + [dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]); // execute the query part, which must look something like // .kxdash.dashexec["select from t";`rdb`hdb;raze] - value x[4;1]; - ]; - // - value x] + .gw.ps x[4;1];]; + // if incoming message is not in kxdash format, treat as normal + .gw.ps x] } - // need this to handle queries that only hit one backend process // reformat those responses to look the same formatresponse:{[status;sync;result] @@ -46,16 +53,18 @@ formatresponse:{[status;sync;result] :$[res`status; (`.dash.rcv_msg;res`w;res`o;res`r;res`result); (`.dash.snd_err;res`w;res`r;res`result)]]; - $[not[status]and sync;'result;result]} - + $[not[status]and sync;'result;result] + } init:{ - // KX dashboards are expecting getFunctions to be defined on the process - .api.getFunctions:@[value;`.api.getFunctions;{{:()}}]; - // Reset format response - .gw.formatresponse:formatresponse; - // incorporate dashps into the .z.ps definition - .z.ps:{x@y;.kxdash.dashps y}@[value;`.z.ps;{{value x}}]; + //Define the current .z.ps as a new function to be called in dashps + .gw.ps:@[value;`.z.ps;{value}]; + // KX dashboards are expecting getFunctions to be defined on the process + .api.getFunctions:@[value;`.api.getFunctions;{{:()}}]; + // Reset format response + .gw.formatresponse:formatresponse; + // Redefine .z.ps as dashps to handle incoming queries + .z.ps:dashps; }; -if[enabled;init[]]; +if[enabled;init[]]; \ No newline at end of file