-
Notifications
You must be signed in to change notification settings - Fork 82
Torq Query Logging Management #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c1fb4fc
9a4f356
442c4a2
ac39c90
8471fb5
c746347
ed6c9b8
1694b68
37bd434
18f36d7
cf6599b
01d639a
669aeb1
19c4a28
6184e54
a32e4db
155cce8
ef38603
efb9e7e
162608d
58f220c
f7dab35
14fa853
e054bc5
ccec51b
1a3fe70
c100c7d
f8e2c59
3c66894
28503ae
d98f3ac
d7e8c07
0acca9f
4161e8d
ca9c5a7
59a1bdc
aca69cd
1661c80
b22278c
b335f51
fcb92c1
bd3775e
762e024
3dd3908
e65c2f5
04d78a8
d20f026
1ec4d1b
d464604
9c94173
965c28d
4eab35e
bc8a9a8
b12d1ab
b75991d
651d1d4
c076962
60abde6
f9ef13b
e41b5d3
8ba256a
00092ab
be45878
63742c0
e539a53
bcd28e0
779244e
6fdbcbe
cf4d2f4
8f255d0
2578530
ba1b2e3
c790f20
0cd6486
6d4b84e
32b1f13
b16a4cf
44ead6c
fa356c6
2cf385f
f9195f2
34d1060
a0a7cff
79cd3a0
ca18228
5889106
2f94429
ef15caa
64aecc6
2df2ea5
ef40422
ae33013
d565476
0bbbd7d
6285ef6
c1e2072
4c41629
59d4f4b
18b5c05
b8b707a
9f20891
eff5575
11da840
9f0763a
7af4c0d
4ed74e6
0e92e7c
2867e4c
afafe92
788c189
03f1efa
c20bf0b
a536b6e
fb141a7
d8618fb
ad3283c
1b4a2a0
4a87126
dce824d
7cf8b28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| .DS_Store | ||
| config/filealerterprocessed/ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| // queryfeed proc script - subs to .usage.usage tables and publishes to query tickerplant | ||
|
|
||
| // add connections to all procs for query tracking to be enabled | ||
| .servers.CONNECTIONS:.servers.CONNECTIONS,exec distinct proctype from (" SS ";enlist csv) 0: hsym `$getenv `TORQPROCESSES where procname in subprocs; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
|
|
||
| us:@[value;`us;([]querytime:`timestamp$();id:`long$();runtime:`timespan$();zcmd:`symbol$();proctype:`symbol$();procname:`symbol$;status:`char$();ip:`int$();user:`symbol$();handle:`int$();cmd:();mem:();sz:`long$();error:())]; | ||
|
|
||
| upd:{[t;x] x[2]:`timespan$x[2]; if [t in `.usage.usage; `us insert x]}; | ||
|
|
||
| .servers.startup[]; | ||
| start_sub:{[subprocs] | ||
| hds:(),exec w from .servers.SERVERS where procname in subprocs; | ||
| { | ||
| .lg.o[`startsub;"subscribing to ", string first exec procname from .servers.SERVERS where w=x]; | ||
| x(`.usage.querysub;`.usage.usage;`); | ||
| .lg.o[`completesub;"subscribed"]; | ||
| }each hds; | ||
| }; | ||
|
|
||
| start_sub[subprocs]; | ||
|
|
||
| readlog:{[file] | ||
|
|
||
| // Remove leading backtick from symbol columns, convert a and w columns back to integers | ||
| update zcmd:`$1 _' string zcmd, procname:`$1 _' string procname, proctype:`$1 _' string proctype, u:`$1 _' string u, | ||
| a:"I"$-1 _' a, w:"I"$-1 _' w from | ||
| // Read in file | ||
| @[{update "J"$'" " vs' mem from flip (cols `us)!("PJJSSSC*S***JS";"|")0:x};hsym`$file;{'"failed to read log file : ",x}]}; | ||
|
|
||
| queryfeed:{ | ||
| // normalise cmd data for gateway users | ||
| usnorm:update cmd:-2#'";" vs' cmd from us where user=`gateway; | ||
| usnorm:update cmd:first each cmd from usnorm where (first each cmd)~'(last each cmd); | ||
|
|
||
| h(".u.upd";`usage;value flip select from usnorm); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we stick to the same coding convention everywhere? |
||
| us::0#us; | ||
| }; | ||
|
|
||
| flushreload:{ | ||
| .lg.o[`flushreload1;"fr1"]; | ||
| procnames:exec distinct procname from .servers.SERVERS where proctype in subprocs; | ||
| {h(".u.upd";`usage;value flip update timer:`timespan$1000*timer from readlog[raze string (getenv `KDBLOG),"/usage_",(raze x),"_",.z.d,".log"])} each string each procnames; | ||
| }; | ||
|
|
||
| .servers.startupdepcycles[`querytp;10;0W]; | ||
| h:.servers.gethandlebytype[`querytp;`any]; | ||
|
|
||
| if[reloadenabled;.timer.once[.proc.cp[]+0D00:00:10.000;(`flushreload;`);"Flush reload"]]; | ||
| .timer.repeat[.proc.cp[];0Wp;0D00:00:00.200;(`queryfeed;`);"Publish Query Feed"]; | ||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,42 @@ | ||||||
| \d .dataaccess | ||||||
|
|
||||||
| // .gw.formatresponse:{[status;sync;result]$[not[status]and sync;'result;result]}}; | ||||||
|
|
||||||
| //Gets the json and converts to input dict before executing .dataaccess.getdata on the input | ||||||
| qrest:{ | ||||||
| // Set the response type | ||||||
| .gw.formatresponse:{[status;sync;result] $[sync and not status; 'result; `status`result!(status;result)]}; | ||||||
| // Run the function | ||||||
| :getdata jsontodict x}; | ||||||
| // Converts json payload to .dataaaccess input dictionary | ||||||
| jsontodict:{ | ||||||
| // convert the input to a dictionary | ||||||
| dict:.j.k x; | ||||||
| k:key dict; | ||||||
| // Change the Type of `tabname`instruments`grouping to chars | ||||||
| dict:@[dict;`tablename`instruments`grouping`columns inter k;{`$x}]; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be refactored
Suggested change
|
||||||
| // Change the Type of `start/end time to timestamps (altering T -> D and - -> . if applicable) | ||||||
| dict:@[dict;`starttime`endtime inter k;{x:ssr[x;"T";"D"];x:ssr[x;"-";"."];value x}]; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can potentially use the following parsing (but please check with a concrete example from what you get from the Jason file) |
||||||
| // retrieve aggregations | ||||||
| if[`aggregations in k;dict[`aggregations]:value dict[`aggregations]]; | ||||||
| // Convert timebar | ||||||
| if[`timebar in k;dict[`timebar]:@[value dict[`timebar];1+til 2;{:`$x}]]; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be refactored
Suggested change
|
||||||
| // Convert the filters key | ||||||
| if [`filters in k;dict[`filters]:filterskey dict`filters]; | ||||||
| //output | ||||||
| :dict}; | ||||||
|
|
||||||
| quotefinder:{y[2#where y>x]} | ||||||
|
|
||||||
| filterskey:{[filtersstrings] | ||||||
| likelist:ss[filtersstrings;"like"]; | ||||||
| if[0=count likelist;value filtersstrings]; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand what we are trying to achieve here. Can you please elaborate |
||||||
| // Get the location of all the backticks | ||||||
| apostlist:ss[filtersstrings;"'"]; | ||||||
| // Get the location of all the likes | ||||||
| swaplist:raze {y[2#where y>x]}[;apostlist] each likelist; | ||||||
| // Swap the ' to " | ||||||
| filtersstrings:@[filtersstrings;swaplist;:;"\""]; | ||||||
| // Convert the string to a dict | ||||||
| :value filtersstrings | ||||||
| }; | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,211 @@ | ||
| \d .dataaccess | ||
|
|
||
| forceservers:0b; | ||
|
|
||
| // dictionary containing aggregate functions needed to calculate map-reducable | ||
| // values over multiple processes | ||
| aggadjust:(!). flip( | ||
| (`avg; {flip(`sum`count;2#x)}); | ||
| (`cor; {flip(`wsum`count`sum`sum`sumsq`sumsq;@[x;(enlist(0;1);0;0;1;0;1)])}); | ||
| (`count; `); | ||
| (`cov; {flip(`wsum`count`sum`sum;@[x;(enlist(0;1);0;0;1)])}); | ||
| (`dev; {flip(`sumsq`count`sum;3#x)}); | ||
| (`distinct;`); | ||
| (`first; `); | ||
| (`last; `); | ||
| (`max; `); | ||
| (`min; `); | ||
| (`prd; `); | ||
| (`sum; `); | ||
| (`var; {flip(`sumsq`count`sum;3#x)}); | ||
| (`wavg; {flip(`wsum`sum;(enlist(x 0;x 1);x 0))}); | ||
| (`wsum; {enlist(`wsum;enlist(x 0;x 1))})); | ||
|
|
||
| // function to make symbols strings with an upper case first letter | ||
| camel:{$[11h~type x;@[;0;upper]each string x;@[string x;0;upper]]}; | ||
| // function that creates aggregation where X(X1,X2)=X(X(X1),X(X2)) where X is | ||
| // the aggregation and X1 and X2 are non overlapping subsets of a list | ||
| absagg:{enlist[`$x,y]!enlist(value x;`$x,y)}; | ||
|
|
||
| // functions to calculate avg, cov and var in mapaggregate dictionary | ||
| avgf:{(%;(sum;`$"sum",x);scx y)}; | ||
| covf:{(-;(%;swsum[x;y];scx x);(*;avgf[x;x];avgf[y;x]))}; | ||
| varf:{(-;(%;(sum;`$"sumsq",y);scx x);(xexp;avgf[y;x];2))}; | ||
| // functions to sum counts and wsums in mapaggregate dictioanry | ||
| scx:{(sum;`$"count",x)}; | ||
| swsum:{(sum;`$"wsum",x,y)} | ||
|
|
||
| // dictionary containing the functions needed to aggregate results together for | ||
| // map reducable aggregations | ||
| mapaggregate:(!). flip( | ||
| (`avg; {enlist[`$"avg",x]!enlist(%;(sum;`$"sum",x);scx x)}); | ||
| (`cor; {enlist[`$"cor",x,w]!enlist(%;covf[x;w];(*;(sqrt;varf[x;x]);(sqrt;varf[(x:x 0);w:x 1])))}); | ||
| (`count; {enlist[`$"count",x]!enlist scx x}); | ||
| (`cov; {enlist[`$"cov",x,w]!enlist covf[x:x 0;w:x 1]}); | ||
| (`dev; {enlist[`$"dev",x]!enlist(sqrt;varf[x;x])}); | ||
| (`first; {enlist[`$"first",x]!enlist(*:;`$"first",x)}); | ||
| (`last; {absagg["last";x]}); | ||
| (`max; {absagg["max";x]}); | ||
| (`min; {absagg["min";x]}); | ||
| (`prd; {absagg["prd";x]}); | ||
| (`sum; {absagg["sum";x]}); | ||
| (`var; {enlist[`$"var",x]!enlist varf[x;x]}); | ||
| (`wavg; {enlist[`$"wavg",x,w]!enlist(%;swsum[x:x 0;w:x 1];(sum;`$"sum",x))}); | ||
| (`wsum; {enlist[`$"wsum",x,w]!enlist swsum[x:x 0;w:x 1]})); | ||
|
|
||
| // function to convert sorting | ||
| go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])}; | ||
|
|
||
| // Full generality dataaccess function in the gateway | ||
| getdata:{[o] | ||
| // Input checking in the gateway | ||
| reqno:.requests.initlogger[o]; | ||
| o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; | ||
| // Get the Procs | ||
| if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; | ||
| // Get Default process behavior | ||
| default:`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(0Wn;();0W;0b;1b;{:x;}); | ||
| // Use upserting logic to determine behaviour | ||
| options:default,o; | ||
| if[`ordering in key o;options[`ordering]: go each options`ordering]; | ||
| o:adjustqueries[o;partdict o]; | ||
| options[`mapreduce]:0b; | ||
| gr:$[`grouping in key options;options`grouping;`]; | ||
| if[`aggregations in key options; | ||
| if[all key[options`aggregations]in key aggadjust; | ||
| options[`mapreduce]:not`date in gr]]; | ||
| // Execute the queries | ||
| if[options`getquery; | ||
| $[.gw.call .z.w; | ||
| :.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]]; | ||
| :.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]]; | ||
| :$[.gw.call .z.w; | ||
| // if sync | ||
| .gw.syncexecjt[(`getdata;o);options[`procs];autojoin[options];options[`timeout]]; | ||
| // if async | ||
| .gw.asyncexecjpt[(`getdata;o);options[`procs];autojoin[options];options[`postback];options[`timeout]]]; | ||
| }; | ||
|
|
||
|
|
||
| // join results together if from multiple processes | ||
| autojoin:{[options] | ||
| // if there is only one proc queried output the table | ||
| if[1=count options`procs;:first]; | ||
| // if there is no need for map reducable adjustment, return razed results | ||
| :$[not options`mapreduce;razeresults[options;];mapreduceres[options;]]; | ||
| }; | ||
|
|
||
| // raze results and call process res to apply postprocessing and sublist | ||
| razeresults:{[options;res] | ||
| res:raze res; | ||
| processres[options;res] | ||
| }; | ||
|
|
||
| //apply sublist and post processing to joined results | ||
| processres:{[options;res] | ||
| res:(options`postprocessing)res; | ||
| :$[(options`sublist)<>0W;(options`sublist) sublist res;res]; | ||
| }; | ||
|
|
||
| // function to correctly reduce two tables to one | ||
| mapreduceres:{[options;res] | ||
| // raze the result sets together | ||
| res:$[all 99h=type each res; | ||
| (){x,0!y}/res; | ||
| (),/res]; | ||
| aggs:options`aggregations; | ||
| aggs:flip(key[aggs]where count each value aggs;raze aggs); | ||
| // distinct may be present as only agg, so apply distinct again | ||
| if[all`distinct=first each aggs;:?[res;();1b;()]]; | ||
| // collecting the appropriate grouping argument for map-reduce aggs | ||
| gr:$[all`grouping`timebar in key options; | ||
| a!a:options[`timebar;2],options`grouping; | ||
| `grouping in key options; | ||
| a!a:(),options`grouping; | ||
| `timebar in key options; | ||
| a!a:(),options[`timebar;2]; | ||
| 0b]; | ||
| // select aggs by gr from res | ||
| res:?[res;();gr;raze{mapaggregate[x 0;camel x 1]}'[aggs]]; | ||
| //apply sublist and postprocesing to map reduced results | ||
| processres[options;res] | ||
| }; | ||
|
|
||
|
|
||
| // Dynamic routing finds all processes with relevant data | ||
| attributesrouting:{[options;procdict] | ||
| // Get the tablename and timespan | ||
| timespan:`date$options[`starttime`endtime]; | ||
| // See if any of the provided partitions are with the requested ones | ||
| procdict:{[x;timespan] (all x within timespan) or any timespan within x}[;timespan] each procdict; | ||
| // Only return appropriate dates | ||
| types:(key procdict) where value procdict; | ||
| // If the dates are out of scope of processes then error | ||
| if[0=count types; | ||
| '`$"gateway error - no info found for that table name and time range. Either table does not exist; attributes are incorect in .gw.servers on gateway, or the date range is outside the ones present" | ||
| ]; | ||
| :types; | ||
| }; | ||
|
|
||
| // Generates a dictionary of `tablename!mindate;maxdate | ||
| partdict:{[input] | ||
| tabname:input[`tablename]; | ||
| // Remove duplicate servertypes from the gw.servers | ||
| servers:select from .gw.servers where i=(first;i)fby servertype; | ||
| // extract the procs which have the table defined | ||
| servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes; | ||
| // Create a dictionary of the attributes against servertypes | ||
| procdict:(exec servertype from servers)!(exec attributes from servers)@'(key each exec attributes from servers)[;0]; | ||
| // If the response is a dictionary index into the tablename | ||
| procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]]; | ||
| // returns the dictionary as min date/ max date | ||
| procdict:asc @[procdict;key procdict;{:(min x; max x)}]; | ||
| // prevents overlap if more than one process contains a specified date | ||
| if[1<count procdict; | ||
| procdict:{:$[y~`date$();x;$[within[x 0;(min y;max y)];(1+max[y];x 1);x]]}':[procdict]]; | ||
| :procdict; | ||
| }; | ||
|
|
||
| // function to adjust the queries being sent to processes to prevent overlap of | ||
| // time clause and data being queried on more than one process | ||
| adjustqueries:{[options;part] | ||
| // if only one process then no need to adjust | ||
| if[2>count p:options`procs;:options]; | ||
| // get the date casting where relevant | ||
| st:$[a:-14h~tp:type start:options`starttime;start;`date$start]; | ||
| et:$[a;options`endtime;`date$options`endtime]; | ||
| // get the dates that are required by each process | ||
| dates:group key[part]where each{within[y;]each value x}[part]'[l:st+til 1+et-st]; | ||
| dates:l{(min x;max x)}'[dates]; | ||
| // if start/end time not a date, then adjust dates parameter for the | ||
| // correct types | ||
| if[not a; | ||
| // converts dates dictionary to timestamps/datetimes | ||
| dates:$[-15h~tp;{"z"$x};::]{(0D+x 0;x[1]+1D-1)}'[dates]; | ||
| // convert first and last timestamp to start and end time | ||
| dates:@[dates;f;:;(start;dates[f:first key dates;1])]; | ||
| dates:@[dates;l;:;(dates[l:last key dates;0];options`endtime)]]; | ||
| // adjust map reducable aggregations to get correct components | ||
| if[(1<count dates)&`aggregations in key options; | ||
| if[all key[o:options`aggregations]in key aggadjust; | ||
| aggs:mapreduce[o;$[`grouping in key options;options`grouping;`]]; | ||
| options:@[options;`aggregations;:;aggs]]]; | ||
| // create a dictionary of procs and different queries | ||
| :{@[@[x;`starttime;:;y 0];`endtime;:;y 1]}[options]'[dates]; | ||
| }; | ||
|
|
||
| // function to grab the correct aggregations needed for aggregating over | ||
| // multiple processes | ||
| mapreduce:{[aggs;gr] | ||
| // if there is a date grouping any aggregation is allowed | ||
| if[`date in gr;:aggs]; | ||
| // format aggregations into a paired list | ||
| aggs:flip(key[aggs]where count each value aggs;raze aggs); | ||
| // if aggregations are not map-reducable and there is no date grouping, | ||
| // then error | ||
| if[not all aggs[;0]in key aggadjust; | ||
| '`$"to perform non-map reducable aggregations automatically over multiple processes there must be a date grouping"]; | ||
| // aggregations are map reducable (with potential non-date groupings) | ||
| aggs:distinct raze{$[`~a:.dataaccess.aggadjust x 0;enlist x;a x 1]}'[aggs]; | ||
| :first'[aggs]!last'[aggs]; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything in this script is defined in root namespace, usually in TorQ we put most code in namespaces & keep root namespace reasonably clean