Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions code/gateway/dataaccess.q
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,18 @@ go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])};

// Full generality dataaccess function in the gateway
getdata:{[o]
// Input checking in the gateway
reqno:.requests.initlogger[o];
o:@[.checkinputs.checkinputs;o;.requests.error[reqno]];
// Get the Procs
if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]];
// Get Default process behavior
default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;});
// Use upserting logic to determine behaviour
options:default,o;
if[`ordering in key o;options[`ordering]: go each options`ordering];
o:adjustqueries[o;partdict o];
// Format query for multiprocess querying
query:preprocessing[o];
// Execute the queries
if[options`getquery;
if[query[`options][`getquery];
$[.gw.call .z.w;
:.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]];
:.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]];
:.gw.syncexec[(`.dataaccess.buildquery;query[`o]);query[`options][`procs]];
:.gw.asyncexec[(`.dataaccess.buildquery;query[`o]);query[`options][`procs]]]];
$[.gw.call .z.w;
//if sync
:.gw.syncexecjt[(`getdata;o);options[`procs];returntab[options;;reqno];options[`timeout]];
:.gw.syncexecjt[(`getdata;query[`o]);query[`options][`procs];returntab[query[`options];;query[`reqno]];query[`options][`timeout]];
// if async
:.gw.asyncexecjpt[(`getdata;o);options[`procs];returntab[options;;reqno];options[`postback];options[`timeout]]];
:.gw.asyncexecjpt[(`getdata;query[`o]);query[`options][`procs];returntab[query[`options];;query`[reqno]];query[`options][`postback];query[`options][`timeout]]];
};

// Dynamic routing finds all processes with relevant data
Expand Down Expand Up @@ -133,3 +124,19 @@ colstm:{[input]: raze ((count') input[`aggregations]) #' key input[`aggregations
crossprocmerge:{[input;A](^/)colmerge[;A;]'[colstm[input];$[A[0]~0!A[0];cols A[0];((cols A[0]) where not (cols A[0]) in cols key A[0])]]};

updategwtabprop:{[]:.gw.syncexec[".checkinputs.tablepropertiesconfig";exec servertype from .gw.servers];}


preprocessing:{[o]
// Input checking in the gateway
reqno:.requests.initlogger[o];
o:@[.checkinputs.checkinputs;o;.requests.error[reqno]];
// Get the Procs
if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]];
// Get Default process behavior
default:`join`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(multiprocjoin[o];0Wn;();0W;0b;1b;{:x;});
// Use upserting logic to determine behaviour
options:default,o;
if[`ordering in key o;options[`ordering]:: go each options`ordering];
o:adjustqueries[o;partdict o];
`o`options`reqno!(o;options;reqno)
};
49 changes: 29 additions & 20 deletions code/gateway/kxdash.q
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
\d .kxdash
enabled:@[value;`enabled;{0b}];

// use this to store the additional params that the kx dashboards seem to send in
dashparams:`o`w`r`limit!(0;0i;0i;0W)

Expand All @@ -9,6 +7,17 @@ dashexec:{[q;s;j]
.gw.asyncexecjpt[(dashremote;q;dashparams);(),s;dashjoin[j];();0Wn]
}

// Full generality dataaccess function for kx dashboards
getdata:{[o]
// Format query for multiprocess querying
query:.dataaccess.preprocessing[o];
if[query[`options][`getquery];
//Allow functionality of building a query
:.gw.asyncexec[(dashremote;(`getdata,query[`o]);dashparams);query[`options][`procs]];];
//Execute the query
:.gw.asyncexecjpt[(dashremote;(`getdata;query[`o]);dashparams);query[`options][`procs];dashjoin[.dataaccess.returntab[query[`options];;query[`reqno]]];query[`options][`postback];query[`options][`timeout]];
}

// execute the request
// return a dict of status and result, along with the params
// add a flag to the start of the list to stop dictionaries collapsing
Expand All @@ -25,19 +34,17 @@ dashjoin:{[joinfunc;r]
}

dashps:{
// check the query coming in meets the format
$[@[{`f`w`r`x`u~first 1_ value first x};x;0b];
// pull out the values we need to return to the dashboards
[dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]);
// check the query coming in meets the format
$[@[{`f`w`r`x`u~first 1_ value first x};x;0b];
// pull out the values we need to return to the dashboards
[dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]);
// execute the query part, which must look something like
// .kxdash.dashexec["select from t";`rdb`hdb;raze]
value x[4;1];
];
//
value x]
.gw.ps x[4;1];];
// if incoming message is not in kxdash format, treat as normal
.gw.ps x]
}


// need this to handle queries that only hit one backend process
// reformat those responses to look the same
formatresponse:{[status;sync;result]
Expand All @@ -46,16 +53,18 @@ formatresponse:{[status;sync;result]
:$[res`status;
(`.dash.rcv_msg;res`w;res`o;res`r;res`result);
(`.dash.snd_err;res`w;res`r;res`result)]];
$[not[status]and sync;'result;result]}

$[not[status]and sync;'result;result]
}

init:{
// KX dashboards are expecting getFunctions to be defined on the process
.api.getFunctions:@[value;`.api.getFunctions;{{:()}}];
// Reset format response
.gw.formatresponse:formatresponse;
// incorporate dashps into the .z.ps definition
.z.ps:{x@y;.kxdash.dashps y}@[value;`.z.ps;{{value x}}];
//Define the current .z.ps as a new function to be called in dashps
.gw.ps:@[value;`.z.ps;{value}];
// KX dashboards are expecting getFunctions to be defined on the process
.api.getFunctions:@[value;`.api.getFunctions;{{:()}}];
// Reset format response
.gw.formatresponse:formatresponse;
// Redefine .z.ps as dashps to handle incoming queries
.z.ps:dashps;
};

if[enabled;init[]];
if[enabled;init[]];