diff --git a/.gitignore b/.gitignore index e43b0f988..267d73112 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .DS_Store +config/filealerterprocessed/ diff --git a/code/handlers/logusage.q b/code/handlers/logusage.q index ff41a03f7..31d212ad6 100644 --- a/code/handlers/logusage.q +++ b/code/handlers/logusage.q @@ -21,12 +21,18 @@ logtodisk:@[value;`logtodisk;1b] // whether to log to disk or not logtomemory:@[value;`logtomemory;1b] // write query logs to memory ignore:@[value;`ignore;1b] // check the ignore list for functions to ignore ignorelist:@[value;`ignorelist;(`upd;"upd")] // the list of functions to ignore +allowedusers:@[value;`allowedusers;()] +ignoreusers:@[value;`ignoreusers;()] // clients to ignore for query logging flushinterval:@[value;`flushinterval;0D00:30:00] // default value for how often to flush the in-memory logs flushtime:@[value;`flushtime;0D03] // default value for how long to persist the in-memory logs suppressalias:@[value;`suppressalias;0b] // whether to suppress the log file alias creation logtimestamp:@[value;`logtimestamp;{[x] {[].proc.cd[]}}] // function to generate the log file timestamp suffix logroll:@[value;`logroll;1b] // whether to automatically roll the log file LEVEL:@[value;`LEVEL;3] // Log level +querytrack:@[value;`querytrack;0b] // whether query tracking is enabled by default + +// enable query tracking for proc if procname included in csv config file +querytrack:$[.proc.procname in "S"$read0 hsym `$(getenv `KDBCONFIG),"/querytrack.txt";1b;0b] id:@[value;`id;0j] nextid:{:id+::1} @@ -38,8 +44,12 @@ logh:@[value;`logh;0] write:{ if[logtodisk;@[neg logh;format x;()]]; if[logtomemory; `.usage.usage upsert x]; + if[querytrack; .ps.publish[`.usage.usage;x]]; ext[x]} +// custom sub function to add usage table to subscribable tables if query tracking enabled +querysub:{if[querytrack; .stpps.t:`.usage.usage,.stpps.t]; .u.sub[x;y]} + // extension function to extend the logging e.g. publish the log message ext:{[x]} diff --git a/code/handlers/order.txt b/code/handlers/order.txt index b7df23c61..f074c6e23 100644 --- a/code/handlers/order.txt +++ b/code/handlers/order.txt @@ -3,6 +3,7 @@ ldap.q writeaccess.q controlaccess.q permissions.q +trackqueries.q trackclients.q logusage.q apidetails.q diff --git a/code/processes/queryfeed.q b/code/processes/queryfeed.q new file mode 100644 index 000000000..523aadeaf --- /dev/null +++ b/code/processes/queryfeed.q @@ -0,0 +1,49 @@ +// queryfeed proc script - subs to .usage.usage tables and publishes to query tickerplant + +// add connections to all procs for query tracking to be enabled +.servers.CONNECTIONS:.servers.CONNECTIONS,exec distinct proctype from (" SS ";enlist csv) 0: hsym `$getenv `TORQPROCESSES where procname in subprocs; + +us:@[value;`us;([]querytime:`timestamp$();id:`long$();runtime:`timespan$();zcmd:`symbol$();proctype:`symbol$();procname:`symbol$;status:`char$();ip:`int$();user:`symbol$();handle:`int$();cmd:();mem:();sz:`long$();error:())]; + +upd:{[t;x] x[2]:`timespan$x[2]; if [t in `.usage.usage; `us insert x]}; + +.servers.startup[]; +start_sub:{[subprocs] + hds:(),exec w from .servers.SERVERS where procname in subprocs; + { + .lg.o[`startsub;"subscribing to ", string first exec procname from .servers.SERVERS where w=x]; + x(`.usage.querysub;`.usage.usage;`); + .lg.o[`completesub;"subscribed"]; + }each hds; + }; + +start_sub[subprocs]; + +readlog:{[file] + + // Remove leading backtick from symbol columns, convert a and w columns back to integers + update zcmd:`$1 _' string zcmd, procname:`$1 _' string procname, proctype:`$1 _' string proctype, u:`$1 _' string u, + a:"I"$-1 _' a, w:"I"$-1 _' w from + // Read in file + @[{update "J"$'" " vs' mem from flip (cols `us)!("PJJSSSC*S***JS";"|")0:x};hsym`$file;{'"failed to read log file : ",x}]}; + +queryfeed:{ + // normalise cmd data for gateway users + usnorm:update cmd:-2#'";" vs' cmd from us where user=`gateway; + usnorm:update cmd:first each cmd from usnorm where (first each cmd)~'(last each cmd); + + h(".u.upd";`usage;value flip select from usnorm); + us::0#us; + }; + +flushreload:{ + .lg.o[`flushreload1;"fr1"]; + procnames:exec distinct procname from .servers.SERVERS where proctype in subprocs; + {h(".u.upd";`usage;value flip update timer:`timespan$1000*timer from readlog[raze string (getenv `KDBLOG),"/usage_",(raze x),"_",.z.d,".log"])} each string each procnames; + }; + +.servers.startupdepcycles[`querytp;10;0W]; +h:.servers.gethandlebytype[`querytp;`any]; + +if[reloadenabled;.timer.once[.proc.cp[]+0D00:00:10.000;(`flushreload;`);"Flush reload"]]; +.timer.repeat[.proc.cp[];0Wp;0D00:00:00.200;(`queryfeed;`);"Publish Query Feed"]; diff --git a/code/processes/querygateway.q b/code/processes/querygateway.q new file mode 100644 index 000000000..489944ed2 --- /dev/null +++ b/code/processes/querygateway.q @@ -0,0 +1,640 @@ +// This is an asynchronous gateway. For sync calls, use deferred sync e.g. (neg h)"query";result:h[] +// There is a synchronous method, but it should be avoided unless absolutely required +// e.g. when using a non-native API which doesn't support deferred sync. +// Most of the notes and implementation refer to the asynchronous method. The sync methods cause the gateway to block +// and therefore limit the number of queries that can be serviced at the same time. + +// Queries are routed across heterogenous servers as they become available. Each query can query multiple servers, and the different +// parts of the query will be executed as the required back end server becomes available. +// When all the backend results are available, they will be joined together and returned to the client +// Generally speaking, each back end server will only have one query queued at once. When it returns its result, it will be given +// the next query. +// Queries which fail on the back end are not automatically re-run. It is up to the client to re-submit it if required. + +// There are a couple of calls to be used +// .gw.addserver[handle;servertype] +// is used to add a server of the specified type on the given handle. servertype would be for example `hdb or `rdb +// When a client queries they can use a simple method or a more advanced method +// .gw.asyncexec[query;servertypes(list of symbols)] +// will execute the supplied query on each of the specified server types, and raze the results +// .gw.asyncexecjpt[query;servertypes(list of symbols);joinfunction(lambda);postbackfunction(symbol);timeout(timespan)] +// allows the client to specify how the results are joined, posted back and timed out. +// The join function should be a monadic lambda - the parameter is the list of results +// The postback function is the function which will be called back in the client. It should be a diadic function as both +// the query that was run, and the result are posted back. The postback can be the name of a function or a lambda. The client +// can also post up functions with > 2 parameters, as long as the extra parameters are also sent e.g. +// ({[a;b;query;result] ... };`a;`b) is valid +// To not use a postback function, send up the empty list () +// i.e. the same as a deferred sync call +// The timeout value is used to return an error to the client if the query has been waiting / running too long +// asyncexec is equivalent to asyncexecjpt with joinfunction=raze, postbackfunction=`, timeout=0Wn + +// The synchronous calls are +// .gw.syncexec[query;servertypes(list of symbols)] +// which will parallelise the queries across the backend servers, but block at the front end (so only 1 client query can be processed at a time) +// the results will be razed and returned to the client +// .gw.syncexecj[query;servertypes(list of symbols);joinfunction(lambda)] +// allows the client to specify the join function + +// Error handling +// As default, an error from an asynchronous request is sent back as a string. The format of the response can be altered +// using the formatresponse function. +// each error will be prefixed with the errorprefix (currently "error: ") +// Errors for sync requests should be remain flagged as errors using '.(the current definition of formatresponse). +// Errors will be returned when +// a) the query times out +// b) a back end server returns an error +// c) the join function fails +// d) a back end server fails +// e) the client requests a query against a server type which currently isn't active (this error is returned immediately) +// f) the query is executed successfully but the result is too big to serialize and send back ('limit) +// If postback functions are used, the error string will be posted back within the postback function +// (i.e. it will be packed the same way as a valid result) + +// If the client closes connection before getting results, the back end servers will still continue to execute +// Each of the remaining client queries will be unqueued. +// If a back end server fails, the clients which are waiting for a result will be returned an error. + +// The servers which are available is decided using .gw.availableservers[] function. This returns a dictionary of handle!servertype +// Currently this will return all registered and active handles. If you wish to modify the routing, change this function. Examples +// might be to change it so only servers in the same datacentre or country are used, unless there are none available (i.e. only +// route queries over a WAN if there isn't any other option). To work out where a server is, it needs to report the information +// in the .proc.getattributes[] call. The data will automatically be populated in the .gw.servers table. +// When synchronous calls are used, errors are returned to the client rather than strings. + +// The next query to execute is decided using the .gw.getnextqueryid[] function. Currently this is a simple FIFO queue (.gw.fifo) +// for more sophisticated algorithms e.g. priority queues based on user name, ip address, usage so far, the query that is being run etc. +// set .gw.getnextqueryid to be something else. It should return a 1 row table containing all the query details. + +// server stats are collected in .gw.servers +// client info is in .gw.clients +// query info is in .gw.queryqueue + +// To create a simple homogenous gateway (i.e. all backend servers are the same) you create projections of +// addserver and asyncexec, with the (servertypes) parameter projected to a single server of (for example) `standardserver + +\d .gw + +// if error & sync message, throws an error. Else passes result as normal +// status - 1b=success, 0b=error. sync - 1b=sync, 0b=async +formatresponse:@[value;`.gw.formatresponse;{{[status;sync;result]$[not[status]and sync;'result;result]}}]; +synccallsallowed:@[value;`.gw.synccallsallowed; 0b] // whether synchronous calls are allowed +querykeeptime:@[value;`.gw.querykeeptime; 0D00:30] // the time to keep queries in the +errorprefix:@[value;`.gw.errorprefix; "error: "] // the prefix for clients to look for in error strings +permissioned:@[value;`.gw.permissioned; 0b] // should the gateway permission queries before the permissions script does +clearinactivetime:@[value;`.gw.clearinactivetime; 0D01:00] // the time to store data on inactive handles + +readonly:@[value;`.readonly.enabled;0b] +valp:$[`.pm.valp ~ key `.pm.valp;.pm.valp;.gw.readonly;{value parse x};value] +val:$[`.pm.val ~ key `.pm.val;.pm.val;.gw.readonly;reval;eval] + +eod:0b +seteod:{[b] .lg.o[`eod;".gw.eod set to ",string b]; eod::b;} // called by wdb.q during EOD +checkeod:{[IDS].gw.eod&1 returntime+age} + +// scheduling function to get the next query to execute. Need to ensure we avoid starvation +// possibilities : +// fifo +// low utilisation (handles which have used it least), +// low utilisation last x minutes +// handle longest wait (the handle which has been waiting the longest without any query being serviced) +// low query count (handle with least queries run) +// low query count last x minutes +fifo:{1 sublist select from canberun[] where time=min time} +getnextqueryid:fifo +getnextquery:{ + qid:getnextqueryid[]; + if[0=count qid; :()]; + update submittime:.proc.cp[]^submittime from `.gw.queryqueue where queryid in qid`queryid; + qid} + +// finish a query +// delete the temp results +// update the status, set the return time +// reset the serverhandle +finishquery:{[qid;err;serverh] + deleteresult[qid]; + update error:err,returntime:.proc.cp[] from `.gw.queryqueue where queryid in qid; + setserverstate[serverh;0b]; + } + +// Get a list of pending and running queries +getqueue:{select queryid,time,clienth,query,servertype,status:?[null submittime;`pending;`running],submittime from .gw.queryqueue where null returntime} + +// manage the result set dictionaries +addemptyresult:{[queryid; clienth; servertypes] results[queryid]:(clienth;servertypes!(count servertypes,:())#enlist(0Ni;(::);0b))} +addservertoquery:{[queryid;servertype;serverh] .[`.gw.results;(queryid;1);{.[x;(y 0;0);:;y 1]};(servertype;serverh)]} +deleteresult:{[queryid] .gw.results : (queryid,()) _ .gw.results} + +// add a result coming back from a server +addserverresult:{[queryid;results] + serverid:first exec serverid from .gw.servers where active, handle=.z.w; + if[queryid in key .gw.results; + .[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;1);:;results]; + .[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;2);:;1b] + ]; + setserverstate[.z.w;0b]; + runnextquery[]; + checkresults[queryid]} +// handle an error coming back from the server +addservererror:{[queryid;error] + // propagate the error to the client + sendclientreply[queryid;.gw.errorprefix,error;0b]; + setserverstate[.z.w;0b]; + runnextquery[]; + // finish the query + finishquery[queryid;1b;0Ni]; + } +// check if all results are in. If so, send the results to the client +checkresults:{[queryid] + if[all value (r:.gw.results[queryid])[1;;2]; + // get the rest of the detail from the query table + querydetails:queryqueue[queryid]; + // apply the join function to the results + res:@[{(0b;$[10h=type x;value(x;y); x @ y])}[querydetails[`join]];value r[1;;1];{(1b;.gw.errorprefix,"failed to apply join function to result sets: ",x)}]; + // send the results back to the client. + sendclientreply[queryid;last res;not res 0]; + // finish the query + finishquery[queryid;res 0;0Ni]];} + +// build and send a response to go to the client +// if the postback function is defined, then wrap the result in that, and also send back the original query +sendclientreply:{[queryid;result;status] + querydetails:queryqueue[queryid]; + // if query has already been sent an error, don't send another one + if[querydetails`error; :()]; + tosend:$[()~querydetails[`postback]; + result; + querydetails[`postback],enlist[querydetails`query],enlist result]; + $[querydetails`sync; + // return sync response + @[-30!;(querydetails`clienth;not status;$[status;.gw.formatresponse[1b;1b;result];result]);{.lg.o[`syncexec;x]}]; + @[neg querydetails`clienth;.gw.formatresponse[status;0b;tosend];()]]; + }; + +// execute a query on the server. Catch the error, propagate back +serverexecute:{[queryid;query] + res:@[{(0b;value x)};query;{(1b;"failed to run query on server ",(string .z.h),":",(string system"p"),": ",x)}]; + // send back the result, in an error trap + @[neg .z.w; $[res 0; (`.gw.addservererror;queryid;res 1); (`.gw.addserverresult;queryid;res 1)]; + // if we fail to send the result back it might be something IPC related, e.g. limit error, so try just sending back an error message + {@[neg .z.w;(`.gw.addservererror;x;"failed to return query from server ",(string .z.h),":",(string system"p"),": ",y);()]}[queryid]];} +// send a query to a server +sendquerytoserver:{[queryid;query;serverh] + (neg serverh,:())@\:(serverexecute;queryid;query); + setserverstate[serverh;1b];} + +// handle is closed by a server +removeserverhandle:{[serverh] + if[null servertype:first exec servertype from .gw.servers where handle=serverh; :()]; + if[null serverid:first exec serverid from .gw.servers where handle=serverh; :()]; + + // get the list of effected query ids + + // 1) queries sent to this server but no reply back yet + qids:where {[res;id] any not res[1;where id=res[1;;0];2]}[;serverid] each results; + // propagate an error back to each client + sendclientreply[;.gw.errorprefix,"backend ",string[servertype]," server handling query closed the connection";0b] each qids; + finishquery[qids;1b;serverh]; + + // 2) queries partially run + waiting for this server + activeServerIDs:exec serverid from .gw.servers where active, handle<>serverh; + activeServerTypes:distinct exec servertype from .gw.servers where active, handle<>serverh; + + qids2:where {[res;id;aIDs;aTypes] + s:where not res[1;;2]; + $[11h=type s; not all s in aTypes; not all any each s in\: aIDs] + }[;serverid;activeServerIDs;activeServerTypes] each results _ 0Ni; + sendclientreply[;.gw.errorprefix,"backend ",string[servertype]," server for running query closed the connection";0b] each qids2; + finishquery[qids2;1b;serverh]; + + // 3) queries not yet run + waiting for this server + qids3:exec queryid from .gw.queryqueue where null submittime, not `boolean${$[11h=type z; all z in x; all any each z in\: y]}[activeServerTypes;activeServerIDs] each servertype; + // propagate an error back to each client + sendclientreply[;.gw.errorprefix,"backend ",string[servertype]," server for queued query closed the connection";0b] each qids3; + finishquery[qids3;1b;serverh]; + + // mark the server as inactive + update handle:0Ni, active:0b, disconnecttime:.proc.cp[] from `.gw.servers where handle=serverh; + + runnextquery[]; + } + +// clear out long time inactive servers +removeinactive:{[inactivity] + delete from `.gw.servers where not active,disconnecttime<.proc.cp[]-inactivity} + +// timeout queries +checktimeout:{ + qids:exec queryid from .gw.queryqueue where not timeout=0Wn,.proc.cp[] > time+timeout,null returntime; + // propagate a timeout error to each client + if[count qids; + sendclientreply[;.gw.errorprefix,"query has exceeded specified timeout value";0b] each qids; + finishquery[qids;1b;0Ni]]; + } + + + + +/- NEED TO FILTER ON PREFERENCES FIRST + +/- initial common bit of functionality for filtering on servers +getserversinitial:{[req;att] + + if[0=count req; :([]serverid:enlist key att)]; + + /- check if all servers report all the requirements - drop any that don't + att:(where all each (key req) in/: key each att)#att; + + if[not count att;'"getservers: no servers report all requested attributes"]; + + /- calculate where each of the requirements is in each of the attribute sets + s:update serverid:key att from value req in'/: (key req)#/:att; + + /- calculate how many requirements are satisfied by each server + /- the requirments satisfied is the minimum of each + /- resort the table by this + s:s idesc value min each sum each' `serverid xkey s; + + /- split the servers into groups - those with the same attributes are in the same group + /- we only need to query one of these + s:`serverid xkey 0!(key req) xgroup s; + + s} + +/- given a dictionary of requirements and a list of attribute dictionaries +/- work out which servers we need to hit to satisfy each requirement +/- each requirement only has to be satisfied once, i.e. requirements are treated independently +getserversindependent:{[req;att;besteffort] + + if[0=count req; :([]serverid:enlist key att)]; + + s:getserversinitial[req;att]; + + /- we want to calculate which rows have not already been fully satisfied + /- if the matched value in a row has already been matched, then it is useless + filter:(value s)¬ -1 _ (0b&(value s) enlist 0),maxs value s; + + /- work out whether the requirement is completely filled + alldone:1+first where all each all each' maxs value s; + + if[(null alldone) and not besteffort; + '"getserversindependent: cannot satisfy query as not all attributes can be matched"]; + + /- use the filter to remove any rows which don't add value + s:1!(0!s) w:where any each any each' filter; + + /- map each server id group to each of the attributes that it has available + /- if you want overlaps, remove the &filter w from the end of this bit of code + (key s)!{(key x)!(value x)@'where each y key x}[req]each value s&filter w} + +// execute an asynchronous query +asyncexecjpts:{[query;servertype;joinfunction;postback;timeout;sync] + // Check correct function called + if[sync<>.gw.call .z.w; + // if asyncexec used with sync request, signal an error. + // if syncexec used with async request, send async response using neg .z.w + @[neg .z.w;.gw.formatresponse[0b;not sync;"Incorrect function used: ",$[sync;"syncexec";"asyncexec"]];()]; + :(); + ]; + if[.gw.permissioned; + if[not .pm.allowed[.z.u;query]; + @[neg .z.w;.gw.formatresponse[0b;sync;"User is not permissioned to run this query from the gateway"];()]; + :(); + ]; + ]; + query:({[u;q]$[`.pm.execas ~ key `.pm.execas;value (`.pm.execas; q; u);`.pm.valp ~ key `.pm.valp; .pm.valp q; value q]}; .z.u; query); + /- if sync calls are allowed disable async calls to avoid query conflicts + $[.gw.synccallsallowed and .z.K<3.6; + errstr:.gw.errorprefix,"only synchronous calls are allowed"; + [errstr:""; + if[99h<>type servertype; + // its a list of servertypes e.g. `rdb`hdb + servertype:distinct servertype,(); + errcheck:@[getserverids;servertype;{.gw.errorprefix,x}]; + if[10h=type errcheck; errstr:errcheck]; + queryattributes:()!(); + ]; + if[99h=type servertype; + // its a dictionary of attributes + queryattributes:servertype; + res:@[getserverids;queryattributes;{.gw.errorprefix,"getserverids: failed with error - ",x}]; + if[10h=type res; errstr:res]; + if[10h<>type res; if[0=count raze res; errstr:.gw.errorprefix,"no servers match given attributes"]]; + servertype:res; + ]; + ] + ]; + // error has been hit + if[count errstr; + @[neg .z.w;.gw.formatresponse[0b;sync;$[()~postback;errstr;$[-11h=type postback;enlist postback;postback],enlist[query],enlist errstr]];()]; + :()]; + + addquerytimeout[query;servertype;queryattributes;joinfunction;postback;timeout;sync]; + runnextquery[]; + }; + +asyncexecjpt:asyncexecjpts[;;;;;0b] +asyncexec:asyncexecjpt[;;raze;();0Wn] + +// execute a synchronous query +syncexecjpre36:{[query;servertype;joinfunction] + // Check correct function called + if[(.z.w<>0)&(.z.w in key .gw.call)¬ .gw.call .z.w; + @[neg .z.w;.gw.formatresponse[0b;0b;"Incorrect function used: asyncexec"];()]; + :(); + ]; + if[not[.gw.synccallsallowed] and .z.K<3.6;.gw.formatresponse[0b;1b;"synchronous calls are not allowed"]]; + // check if the gateway allows the query to be called + if[.gw.permissioned; + if[not .pm.allowed [.z.u;query]; + .gw.formatresponse[0b;1b;"User is not permissioned to run this query from the gateway"]; + ]; + ]; + // check if we have all the servers active + serverids:@[getserverids;servertype;{.gw.formatresponse[0b;1b;.gw.errorprefix,x]}]; + // check if gateway in eod reload phase + if[checkeod[serverids];.gw.formatresponse[0b;1b;"unable to query multiple servers during eod reload"]]; + // get the list of handles + tab:availableserverstable[0b]; + handles:(exec serverid!handle from tab)first each (exec serverid from tab) inter/: serverids; + setserverstate[handles;1b]; + start:.z.p; + query:({[u;q]$[`.pm.execas ~ key `.pm.execas; value(`.pm.execas; q; u);`.pm.valp ~ key `.pm.valp; .pm.valp q; value q]}; .z.u; query); + // to allow parallel execution, send an async query up each handle, then block and wait for the results + (neg handles)@\:({@[neg .z.w;@[{(1b;.z.p;value x)};x;{(0b;.z.p;x)}];{@[neg .z.w;(0b;.z.p;x);()]}]};query); + // flush + (neg handles)@\:(::); + // block and wait for the results + res:handles@\:(::); + // update the usage data + update inuse:0b,usage:usage+(handles!res[;1] - start)[handle] from `.gw.servers where handle in handles; + // check if there are any errors in the returned results + :$[all res[;0]; + // no errors - join the results + [s:@[{(1b;x y)}joinfunction;res[;2];{(0b;"failed to apply supplied join function to results: ",x)}]; + .gw.formatresponse[s 0;1b;s 1]]; + [failed:where not res[;0]; + .gw.formatresponse[0b;1b;"queries failed on server(s) ",(", " sv string exec servertype from servers where handle in handles failed),". Error(s) were ","; " sv res[failed][;2]]] + ]; + }; + +syncexecjt:{[query;servertype;joinfunction;timeout] + // can only be used on 3.6 + + // use async call back function, flag it as sync + // doesn't make sense to allow specification of a callback for sync requests + asyncexecjpts[query;servertype;joinfunction;();timeout;1b]; + // defer response + @[-30!;(::);()]; + }; + +$[.z.K < 3.6; + [syncexecj:syncexecjpre36; + syncexec:syncexecjpre36[;;raze]]; + [syncexecj:syncexecjt[;;;0Wn]; + syncexec:syncexecjt[;;raze;0Wn]]]; + +// run a query +runquery:{[] + // check if there is something to run + if[count torun:getnextquery[]; + if[not checkeod torun`servertype; + torun:first torun; + // if it isn't already in the result dict, add it + if[not torun[`queryid] in key results; + addemptyresult[torun`queryid;torun`clienth;torun`servertype]; + ]; + // update the results dictionary and send off the queries + // get the handles to run on + avail:availableserverstable[1b]; + IDs:$[11h=type torun`available; + (exec first serverid by servertype from avail)[torun`available]; + first each (exec serverid from avail) inter/: torun`available]; + handles:avail[([]serverid:IDs);`handle]; + // update the results dictionary + addservertoquery[torun`queryid;torun`available;IDs]; + // send off the queries + sendquerytoserver[torun`queryid;torun`query;handles]; + ]; + ]; + }; + +runnextquery:runquery + + +// when a new connection is opened, add client details +po:{addclientdetails[x]} + +// called when a handle is closed +pc:{ + removeclienthandle[x]; + removeserverhandle[x];} + +pgs:{.gw.call,:enlist[x]!enlist y}; + +// override message handlers +.z.pc:{x@y;.gw.pc[y]}@[value;`.z.pc;{{[x]}}]; +.z.po:{x@y;.gw.po[y]}@[value;`.z.po;{{[x]}}]; +.z.pg:{.gw.pgs[.z.w;1b];x@y}@[value;`.z.pg;{{[x]}}]; +.z.ps:{.gw.pgs[.z.w;0b];x@y}@[value;`.z.ps;{{[x]}}]; +// only wrap .z.ws if it is already defined +if[@[{value x;1b};`.z.ws;{0b}];.z.ws:{.gw.pgs[.z.w;0b];x@y}.z.ws]; + +// START UP +// initialise connections +.servers.startup[] + + /-check if the gateway has connected to discovery process, block the process until a connection is established +while[0 = count .servers.getservers[`proctype;`discovery;()!();0b;1b]; + /-while no connected make the process sleep for X seconds and then run the subscribe function again + .os.sleep[5]; + /-run the servers startup code again (to make connection to discovery) + .servers.startup[]; + .servers.retrydiscovery[]] + +// add servers from the standard connections table +addserversfromconnectiontable:{ + {.gw.addserverattr'[x`w;x`proctype;x`attributes]}[select w,proctype,attributes from .servers.SERVERS where ((proctype in x) or x~`ALL),not w in ((0;0Ni),exec handle from .gw.servers where active)];} + +// When new connections come in from the discovery service, try to reconnect +.servers.addprocscustom:{[connectiontab;procs] + // retry connections + .servers.retry[]; + // add any new connections + .gw.addserversfromconnectiontable[.servers.CONNECTIONS]; + // See if any queries can now be run + runnextquery[]} + +addserversfromconnectiontable[.servers.CONNECTIONS] + +// Join active .gw.servers to .servers.SERVERS table +activeservers:{lj[select from .gw.servers where active;`handle xcol `w xkey .servers.SERVERS]} + +\d . + +// functions called by end-of-day processes + +reloadstart:{ + .lg.o[`reload;"reload start called"]; + /- set eod variable to active/true + .gw.seteod[1b]; + /- extract ids of queries not yet returned + qids:exec queryid from .gw.queryqueue where 1 D and - -> . if applicable) + dict:@[dict;`starttime`endtime inter k;{x:ssr[x;"T";"D"];x:ssr[x;"-";"."];value x}]; + // retrieve aggregations + if[`aggregations in k;dict[`aggregations]:value dict[`aggregations]]; + // Convert timebar + if[`timebar in k;dict[`timebar]:@[value dict[`timebar];1+til 2;{:`$x}]]; + // Convert the filters key + if [`filters in k;dict[`filters]:filterskey dict`filters]; + //output + :dict}; + +quotefinder:{y[2#where y>x]} + +filterskey:{[filtersstrings] + likelist:ss[filtersstrings;"like"]; + if[0=count likelist;value filtersstrings]; + // Get the location of all the backticks + apostlist:ss[filtersstrings;"'"]; + // Get the location of all the likes + swaplist:raze {y[2#where y>x]}[;apostlist] each likelist; + // Swap the ' to " + filtersstrings:@[filtersstrings;swaplist;:;"\""]; + // Convert the string to a dict + :value filtersstrings + }; diff --git a/code/querygateway/dataaccess.q b/code/querygateway/dataaccess.q new file mode 100644 index 000000000..3d7054acf --- /dev/null +++ b/code/querygateway/dataaccess.q @@ -0,0 +1,211 @@ +\d .dataaccess + +forceservers:0b; + +// dictionary containing aggregate functions needed to calculate map-reducable +// values over multiple processes +aggadjust:(!). flip( + (`avg; {flip(`sum`count;2#x)}); + (`cor; {flip(`wsum`count`sum`sum`sumsq`sumsq;@[x;(enlist(0;1);0;0;1;0;1)])}); + (`count; `); + (`cov; {flip(`wsum`count`sum`sum;@[x;(enlist(0;1);0;0;1)])}); + (`dev; {flip(`sumsq`count`sum;3#x)}); + (`distinct;`); + (`first; `); + (`last; `); + (`max; `); + (`min; `); + (`prd; `); + (`sum; `); + (`var; {flip(`sumsq`count`sum;3#x)}); + (`wavg; {flip(`wsum`sum;(enlist(x 0;x 1);x 0))}); + (`wsum; {enlist(`wsum;enlist(x 0;x 1))})); + +// function to make symbols strings with an upper case first letter +camel:{$[11h~type x;@[;0;upper]each string x;@[string x;0;upper]]}; +// function that creates aggregation where X(X1,X2)=X(X(X1),X(X2)) where X is +// the aggregation and X1 and X2 are non overlapping subsets of a list +absagg:{enlist[`$x,y]!enlist(value x;`$x,y)}; + +// functions to calculate avg, cov and var in mapaggregate dictionary +avgf:{(%;(sum;`$"sum",x);scx y)}; +covf:{(-;(%;swsum[x;y];scx x);(*;avgf[x;x];avgf[y;x]))}; +varf:{(-;(%;(sum;`$"sumsq",y);scx x);(xexp;avgf[y;x];2))}; +// functions to sum counts and wsums in mapaggregate dictioanry +scx:{(sum;`$"count",x)}; +swsum:{(sum;`$"wsum",x,y)} + +// dictionary containing the functions needed to aggregate results together for +// map reducable aggregations +mapaggregate:(!). flip( + (`avg; {enlist[`$"avg",x]!enlist(%;(sum;`$"sum",x);scx x)}); + (`cor; {enlist[`$"cor",x,w]!enlist(%;covf[x;w];(*;(sqrt;varf[x;x]);(sqrt;varf[(x:x 0);w:x 1])))}); + (`count; {enlist[`$"count",x]!enlist scx x}); + (`cov; {enlist[`$"cov",x,w]!enlist covf[x:x 0;w:x 1]}); + (`dev; {enlist[`$"dev",x]!enlist(sqrt;varf[x;x])}); + (`first; {enlist[`$"first",x]!enlist(*:;`$"first",x)}); + (`last; {absagg["last";x]}); + (`max; {absagg["max";x]}); + (`min; {absagg["min";x]}); + (`prd; {absagg["prd";x]}); + (`sum; {absagg["sum";x]}); + (`var; {enlist[`$"var",x]!enlist varf[x;x]}); + (`wavg; {enlist[`$"wavg",x,w]!enlist(%;swsum[x:x 0;w:x 1];(sum;`$"sum",x))}); + (`wsum; {enlist[`$"wsum",x,w]!enlist swsum[x:x 0;w:x 1]})); + +// function to convert sorting +go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])}; + +// Full generality dataaccess function in the gateway +getdata:{[o] + // Input checking in the gateway + reqno:.requests.initlogger[o]; + o:@[.checkinputs.checkinputs;o;.requests.error[reqno]]; + // Get the Procs + if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]]; + // Get Default process behavior + default:`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(0Wn;();0W;0b;1b;{:x;}); + // Use upserting logic to determine behaviour + options:default,o; + if[`ordering in key o;options[`ordering]: go each options`ordering]; + o:adjustqueries[o;partdict o]; + options[`mapreduce]:0b; + gr:$[`grouping in key options;options`grouping;`]; + if[`aggregations in key options; + if[all key[options`aggregations]in key aggadjust; + options[`mapreduce]:not`date in gr]]; + // Execute the queries + if[options`getquery; + $[.gw.call .z.w; + :.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]]; + :.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]]; + :$[.gw.call .z.w; + // if sync + .gw.syncexecjt[(`getdata;o);options[`procs];autojoin[options];options[`timeout]]; + // if async + .gw.asyncexecjpt[(`getdata;o);options[`procs];autojoin[options];options[`postback];options[`timeout]]]; + }; + + +// join results together if from multiple processes +autojoin:{[options] + // if there is only one proc queried output the table + if[1=count options`procs;:first]; + // if there is no need for map reducable adjustment, return razed results + :$[not options`mapreduce;razeresults[options;];mapreduceres[options;]]; + }; + +// raze results and call process res to apply postprocessing and sublist +razeresults:{[options;res] + res:raze res; + processres[options;res] + }; + +//apply sublist and post processing to joined results +processres:{[options;res] + res:(options`postprocessing)res; + :$[(options`sublist)<>0W;(options`sublist) sublist res;res]; + }; + +// function to correctly reduce two tables to one +mapreduceres:{[options;res] + // raze the result sets together + res:$[all 99h=type each res; + (){x,0!y}/res; + (),/res]; + aggs:options`aggregations; + aggs:flip(key[aggs]where count each value aggs;raze aggs); + // distinct may be present as only agg, so apply distinct again + if[all`distinct=first each aggs;:?[res;();1b;()]]; + // collecting the appropriate grouping argument for map-reduce aggs + gr:$[all`grouping`timebar in key options; + a!a:options[`timebar;2],options`grouping; + `grouping in key options; + a!a:(),options`grouping; + `timebar in key options; + a!a:(),options[`timebar;2]; + 0b]; + // select aggs by gr from res + res:?[res;();gr;raze{mapaggregate[x 0;camel x 1]}'[aggs]]; + //apply sublist and postprocesing to map reduced results + processres[options;res] + }; + + +// Dynamic routing finds all processes with relevant data +attributesrouting:{[options;procdict] + // Get the tablename and timespan + timespan:`date$options[`starttime`endtime]; + // See if any of the provided partitions are with the requested ones + procdict:{[x;timespan] (all x within timespan) or any timespan within x}[;timespan] each procdict; + // Only return appropriate dates + types:(key procdict) where value procdict; + // If the dates are out of scope of processes then error + if[0=count types; + '`$"gateway error - no info found for that table name and time range. Either table does not exist; attributes are incorect in .gw.servers on gateway, or the date range is outside the ones present" + ]; + :types; + }; + +// Generates a dictionary of `tablename!mindate;maxdate +partdict:{[input] + tabname:input[`tablename]; + // Remove duplicate servertypes from the gw.servers + servers:select from .gw.servers where i=(first;i)fby servertype; + // extract the procs which have the table defined + servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes; + // Create a dictionary of the attributes against servertypes + procdict:(exec servertype from servers)!(exec attributes from servers)@'(key each exec attributes from servers)[;0]; + // If the response is a dictionary index into the tablename + procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]]; + // returns the dictionary as min date/ max date + procdict:asc @[procdict;key procdict;{:(min x; max x)}]; + // prevents overlap if more than one process contains a specified date + if[1count p:options`procs;:options]; + // get the date casting where relevant + st:$[a:-14h~tp:type start:options`starttime;start;`date$start]; + et:$[a;options`endtime;`date$options`endtime]; + // get the dates that are required by each process + dates:group key[part]where each{within[y;]each value x}[part]'[l:st+til 1+et-st]; + dates:l{(min x;max x)}'[dates]; + // if start/end time not a date, then adjust dates parameter for the + // correct types + if[not a; + // converts dates dictionary to timestamps/datetimes + dates:$[-15h~tp;{"z"$x};::]{(0D+x 0;x[1]+1D-1)}'[dates]; + // convert first and last timestamp to start and end time + dates:@[dates;f;:;(start;dates[f:first key dates;1])]; + dates:@[dates;l;:;(dates[l:last key dates;0];options`endtime)]]; + // adjust map reducable aggregations to get correct components + if[(1type att; + // its a list of servertypes e.g. `rdb`hdb + // check if user attributes are a symbol list + if[not 11h=abs type att; + '" Servertype should be given as either a dictionary(type 99h) or a symbol list (11h)" + ]; + servertype:distinct att,(); + //list of active servers + activeservers:exec distinct servertype from .gw.servers where active; + //list of all servers + allservers:exec distinct servertype from .gw.servers; + activeserversmsg:". Available servers include: ",", " sv string activeservers; + //check if a null argument is passed + if[any null att;'"A null server cannot be passed as an argument",activeserversmsg]; + //if any requested servers are missing then: + //if requested server does not exist, return error with list of available servers + //if requested server exists but is currently inactive, return error with list of available servers + if[count servertype except activeservers; + '"the following ",$[max not servertype in allservers; + "are not valid servers: ",", " sv string servertype except allservers; + "requested servers are currently inactive: ",", " sv string servertype except activeservers + ],activeserversmsg; + ]; + :(exec serverid by servertype from .gw.servers where active)[servertype]; + ]; + + serverids:$[`servertype in key att; + raze getserveridstype[delete servertype from att] each (),att`servertype; + getserveridstype[att;`all]]; + + if[all 0=count each serverids;'"no servers match requested attributes"]; + :serverids; + } + +getserveridstype:{[att;typ] + // default values + besteffort:1b; + attype:`cross; + + servers:$[typ=`all; + exec serverid!attributes from .gw.servers where active; + exec serverid!attributes from .gw.servers where active,servertype=typ]; + + if[`besteffort in key att; + if[-1h=type att`besteffort;besteffort:att`besteffort]; + att:delete besteffort from att; + ]; + if[`attributetype in key att; + if[-11h=type att`attributetype;attype:att`attributetype]; + att:delete attributetype from att; + ]; + + res:$[attype=`independent;getserversindependent[att;servers;besteffort]; + getserverscross[att;servers;besteffort]]; + + serverids:first value flip $[99h=type res; key res; res]; + if[all 0=count each serverids;'"no servers match ",string[typ]," requested attributes"]; + :serverids; + } + +/- build a cross product from a nested dictionary +buildcross:{(cross/){flip (enlist y)#x}[x] each key x} + +/- given a dictionary of requirements and a list of attribute dictionaries +/- work out which servers we need to hit to satisfy each requirement +/- we want to satisfy the cross product of requirements - so each attribute has to be available with each other attribute +/- e.g. each symbol has to be availble within each specified date +getserverscross:{[req;att;besteffort] + + if[0=count req; :([]serverid:enlist key att)]; + + s:getserversinitial[req;att]; + + /- build the cross product of requirements + reqcross:buildcross[req]; + + /- calculate the cross product of data contributed by each source + /- and drop it from the list of stuff that is required + util:flip `remaining`found!flip ({[x;y;z] (y[0] except found; y[0] inter found:$[0=count y[0];y[0];buildcross x@'where each z])}[req]\)[(reqcross;());value s]; + + /- check if everything is done + if[(count last util`remaining) and not besteffort; + '"getserverscross: cannot satisfy query as the cross product of all attributes can't be matched"]; + /- remove any rows which don't add value + s:1!(0!s) w:where not 0=count each util`found; + /- return the parameters which should be queried for + (key s)!distinct each' flip each util[w]`found + } diff --git a/code/querygateway/kxdash.q b/code/querygateway/kxdash.q new file mode 100644 index 000000000..d978aa509 --- /dev/null +++ b/code/querygateway/kxdash.q @@ -0,0 +1,61 @@ +\d .kxdash +enabled:@[value;`enabled;{0b}]; + +// use this to store the additional params that the kx dashboards seem to send in +dashparams:`o`w`r`limit!(0;0i;0i;0W) + +// function to be called from the dashboards +dashexec:{[q;s;j] + .gw.asyncexecjpt[(dashremote;q;dashparams);(),s;dashjoin[j];();0Wn] + } + +// execute the request +// return a dict of status and result, along with the params +// add a flag to the start of the list to stop dictionaries collapsing +// to tables in the join function +dashremote:{[q;dashparams] + (`kxdash;dashparams,`status`result!@[{(1b;value x)};q;{(0b;x)}]) + } + +// join function used for dashboard results +dashjoin:{[joinfunc;r] + $[min r[;1;`status]; + (`.dash.rcv_msg;r[0;1;`w];r[0;1;`o];r[0;1;`r];r[0;1;`limit] sublist joinfunc r[;1;`result]); + (`.dash.snd_err;r[0;1;`w];r[0;1;`r];r[0;1;`result])] + } + +dashps:{ + // check the query coming in meets the format + $[@[{`f`w`r`x`u~first 1_ value first x};x;0b]; + // pull out the values we need to return to the dashboards + [dashparams::`o`w`r`limit!(last value x 1;x 2;x 3;x[4;0]); + // execute the query part, which must look something like + // .kxdash.dashexec["select from t";`rdb`hdb;raze] + value x[4;1]; + ]; + // + value x] + } + + +// need this to handle queries that only hit one backend process +// reformat those responses to look the same +formatresponse:{[status;sync;result] + if[`kxdash~first result; + res:last result; + :$[res`status; + (`.dash.rcv_msg;res`w;res`o;res`r;res`result); + (`.dash.snd_err;res`w;res`r;res`result)]]; + $[not[status]and sync;'result;result]} + + +init:{ + // KX dashboards are expecting getFunctions to be defined on the process + .api.getFunctions:@[value;`.api.getFunctions;{{:()}}]; + // Reset format response + .gw.formatresponse:formatresponse; + // incorporate dashps into the .z.ps definition + .z.ps:{x@y;.kxdash.dashps y}@[value;`.z.ps;{{value x}}]; + }; + +if[enabled;init[]]; diff --git a/code/queryhdb/queryhdbstandard.q b/code/queryhdb/queryhdbstandard.q new file mode 100644 index 000000000..68cf3de2d --- /dev/null +++ b/code/queryhdb/queryhdbstandard.q @@ -0,0 +1,7 @@ +// reload function +reload:{ + .lg.o[`reload;"reloading QUERYHDB"]; + system"l ."} + +// Get the relevant QUERYHDB attributes +.proc.getattributes:{`date`tables!(@[value;`date;`date$()];tables[])} diff --git a/code/querytp/pubsub.q b/code/querytp/pubsub.q new file mode 100644 index 000000000..2f05ff55c --- /dev/null +++ b/code/querytp/pubsub.q @@ -0,0 +1,30 @@ +// Get pubsub common code +.proc.loadf[getenv[`KDBCODE],"/common/pubsub.q"]; + +// Define UPD and ZTS wrapper functions + +// Check for end of day/period and call inner UPD function +.stpps.upd.def:{[t;x] + if[.stplg.nextendUTC`create); // dict r contains icounts & not using own logfile + subtabs:$[subscribeto~`;key r`icounts;subscribeto],(); + .u.jcounts::.u.icounts::$[0=count r`icounts;()!();subtabs!enlist [r`icounts]subtabs]; + ] + ]; + } + +// Initialise chained STP +init:{ + // Load in timer and subscription code and set top-level end of day/period functions + .proc.loadf[getenv[`KDBCODE],"/common/timer.q"]; + .proc.loadf[getenv[`KDBCODE],"/common/subscriptions.q"]; + `endofperiod set {[x;y;z] .stplg.endofperiod[x;y;z]}; + `endofday set {[x;y] .stplg.endofday[x;y]}; + + // Initialise connections and subscribe to main STP + .servers.startupdepnamecycles[.sctp.tickerplantname;.sctp.tpconnsleep;.sctp.tpcheckcycles]; + .sctp.subscribe[]; + }; + +\d . + +// Make the SCTP die if the main STP dies +.z.pc:{[f;x] + @[f;x;()]; + if[.sctp.chainedtp; + if[.sctp.tph=x; .lg.e[`.z.pc;"lost connection to tickerplant : ",string .sctp.tickerplantname];exit 1] + ] + } @[value;`.z.pc;{{}}]; + +// Extract data from incoming table as a list +upd:{[t;x] + x:value flip x; + .u.upd[t;x] + } \ No newline at end of file diff --git a/code/querytp/stplog.q b/code/querytp/stplog.q new file mode 100644 index 000000000..ff08f6e89 --- /dev/null +++ b/code/querytp/stplog.q @@ -0,0 +1,296 @@ +// Utilites for periodic tp logging in stp process + +// Live logs and handles to logs for each table +currlog:([tbl:`symbol$()]logname:`symbol$();handle:`int$()) + +// View of log file handles for faster lookups +loghandles::exec tbl!handle from currlog + +\d .stplg + +// Name of error log file +errorlogname:@[value;`.stplg.errorlogname;`segmentederrorlogfile] + +// Create stp log directory +// Log structure `:stplogs/date/tabname_time +createdld:{[name;date] + if[not count dir:hsym `$getenv[`KDBTPLOG];.lg.e[`stp;"log directory not defined"];exit 1]; + .os.md dir; + .os.md .stplg.dldir:` sv dir,`$raze/[string name,"_",date]; + }; + +// Functions to generate log names in one of five modes + +// Generate standardised timestamp string for log names +gentimeformat:{(raze string "dv"$x) except ".:"}; + +// Tabperiod mode - TP log rolled periodically (default 1 hr), 1 log per table (default setting) +.stplg.logname.tabperiod:{[dir;tab;p] ` sv (hsym dir;`$raze string (.proc.procname;"_";tab),.stplg.gentimeformat[p]) }; + +// Standard TP mode - write all tables to single log, roll daily +.stplg.logname.singular:{[dir;tab;p] ` sv (hsym dir;`$raze string .proc.procname,"_",.stplg.gentimeformat[p]) }; + +// Periodic-only mode - write all tables to single log, roll periodically intraday +.stplg.logname.periodic:{[dir;tab;p] ` sv (hsym dir;`$raze string .proc.procname,"_periodic",.stplg.gentimeformat[p]) }; + +// Tabular-only mode - write tables to separate logs, roll daily +.stplg.logname.tabular:{[dir;tab;p] ` sv (hsym dir;`$raze string (.proc.procname;"_";tab),.stplg.gentimeformat[p]) }; + +// Custom mode - mixed periodic/tabular mode +// Tables are defined as periodic, tabular, tabperiod or none in config file stpcustom.csv +// Tables not specified in csv are not logged +.stplg.logname.custom:{[dir;tab;p] .stplg.logname[.stplg.custommode tab][dir;tab;p] }; + +// If in error mode, create an error log name using .stplg.errorlogname +.stplg.logname.error:{[dir;ename;p] ` sv (hsym dir;`$raze string (.proc.procname;"_";ename),.stplg.gentimeformat[p]) }; + +// Update and timer functions in three batch modes //////////////////////////////////// +// preserve pre-existing definitions +upd:@[value;`.stplg.upd;enlist[`]!enlist ()]; +zts:@[value;`.stplg.zts;enlist[`]!enlist ()]; + +// Functions to add columns on updates +updtab:@[value;`.stplg.updtab;enlist[`]!enlist {(enlist(count first x)#y),x}] + +// If set to memorybatch, publish and write to disk will be run in batches +// insert to table in memory, on a timer flush the table to disk and publish, update counts +upd[`memorybatch]:{[t;x;now] + t insert updtab[t] . (x;now); + }; + +zts[`memorybatch]:{ + {[t] + if[count value t; + `..loghandles[t] enlist (`upd;t;value flip value t); + @[`.stplg.msgcount;t;+;1]; + @[`.stplg.rowcount;t;+;count value t]; + .stpps.pubclear[t]]; + }each .stpps.t; + }; + +// Standard batch mode - write to disk immediately, publish in batches +upd[`defaultbatch]:{[t;x;now] + t insert x:.stplg.updtab[t] . (x;now); + `..loghandles[t] enlist(`upd;t;x); + // track tmp counts, and add these after publish + @[`.stplg.tmpmsgcount;t;+;1]; + @[`.stplg.tmprowcount;t;+;count first x]; + }; + +zts[`defaultbatch]:{ + // publish and clear all tables, increment counts + .stpps.pubclear[.stpps.t]; + // after data has been published, updated the counts + .stplg.msgcount+:.stplg.tmpmsgcount; + .stplg.rowcount+:.stplg.tmprowcount; + // reset temp counts + .stplg.tmpmsgcount:.stplg.tmprowcount:()!(); + }; + +// Immediate mode - publish and write immediately +upd[`immediate]:{[t;x;now] + x:updtab[t] . (x;now); + `..loghandles[t] enlist(`upd;t;x); + x:$[0h>type last x;enlist;flip] .stpps.tabcols[t]!x; + @[`.stplg.msgcount;t;+;1]; + @[`.stplg.rowcount;t;+;count x]; + .stpps.pub[t;x] + }; + +zts[`immediate]:{} + +////////////////////////////////////////////////////////////////////////////////////// + +// Functions to obtain logs for client replay //////////////////////////////////////// +// replaylog called from client-side, returns nested list of logcounts and lognames +replaylog:{[t] + getlogs[replayperiod][t] + } + +// alternative replay allows for 'pass through logging' +// if SCTP not producing logs, subscribers replay from STP log files +if[.sctp.loggingmode=`parent; + replaylog:{[t] + .sctp.tph (`.stplg.replaylog; t) + } + ] + +getlogs:enlist[`]!enlist () + +// If replayperiod set to `period, only replay logs for current logging period +getlogs[`period]:{[t] + distinct flip (.stplg.msgcount;exec tbl!logname from `..currlog where tbl in t)@\:t + }; + +// If replayperiod set to `day, replay all of today's logs +getlogs[`day]:{[t] + // set the msgcount to 0Wj for all logs which have closed + lnames:select seq,tbls,logname,msgcount:0Wj from .stpm.metatable where any each tbls in\: t; + // Meta table does not store counts for live logs, so these are populated here + lnames:update msgcount:sum each .stplg.msgcount[tbls] from lnames where seq=.stplg.i; + flip value exec `long$msgcount,logname from lnames + }; + +////////////////////////////////////////////////////////////////////////////////////// + +// Open log for a single table at start of logging period +openlog:{[multilog;dir;tab;p] + lname:logname[multilog][dir;tab;p]; + .lg.o[`openlog;"opening logfile: ",string lname]; + h:$[(notexists:not type key lname)or null h0:exec first handle from `..currlog where logname=lname; + [if[notexists;.[lname;();:;()]];hopen lname]; + h0 + ]; + `..currlog upsert (tab;lname;h); + }; + +// Error log for failed updates in error mode +openlogerr:{[dir] + lname:.[.stplg.logname.error;(dir;.stplg.errorlogname;.z.p+.eodtime.dailyadj);{.lg.e[`openlogerr;"failed to make error log: ",x]}]; + if[not type key lname;.[lname;();:;()]]; + h:@[{hopen x};lname;{.lg.e[`openlogerr;"failed to open handle to error log with error: ",x]}]; + `..currlog upsert (errorlogname;lname;h); + }; + +// Log failed message and error type in error mode +badmsg:{[e;t;x] + .lg.o[`upd;"Bad message received, error: ",e]; + `..loghandles[errorlogname] enlist(`upderr;t;x); + }; + +closelog:{[tab] + if[null h:`..currlog[tab;`handle];.lg.o[`closelog;"no open handle to log file"];:()]; + .lg.o[`closelog;"closing log file ",string `..currlog[tab;`logname]]; + @[hclose;h;{.lg.e[`closelog;"handle already closed"]}]; + update handle:0N from `..currlog where tbl=tab; + }; + +// Roll all logs at end of logging period +rolllog:{[multilog;dir;tabs;p] + .stpm.updmeta[multilog][`close;tabs;p]; + closelog each tabs; + @[`.stplg.msgcount;tabs;:;0]; + {[m;d;t] + .[openlog;(m;d;t;currperiod); + {.lg.e[`stp;"failed to open log for table ",string[y],": ",x]}[;t]] + }[multilog;dir;]each tabs; + .stpm.updmeta[multilog][`open;tabs;p]; + }; + +// Creates dictionary of process data to be used at endofday/endofperiod - configurable but default provided +endofdaydata:@[value;`.stplg.endofdaydata;{ {`proctype`procname`tables!(.proc.proctype;.proc.procname;.stpps.t)} }]; + +// endofperiod function defined in SCTP +// passes on eop messages to subscribers and rolls logs +endofperiod:{[currentpd;nextpd;data] + .lg.o[`endofperiod;"flushing remaining data to subscribers and clearing tables"]; + .stpps.pubclear[.stplg.t]; + .lg.o[`endofperiod;"executing end of period for ",.Q.s1 `currentperiod`nextperiod!(currentpd;nextpd)]; + .stpps.endp[currentpd;nextpd;data]; // sends endofperiod message to subscribers + currperiod::nextpd; // increments current period + if[.sctp.loggingmode=`create;periodrollover[data]] // logs only rolled if in create mode + }; + +// stp runs function to send out end of period messages and roll logs +// eop log roll is stopped if eod is also going to be triggered (roll is not stopped in SCTP) +stpeoperiod:{[currentpd;nextpd;data;rolllogs] + .lg.o[`stpeoperiod;"passing on endofperiod message to subscribers"]; + .stpps.endp[currentpd;nextpd;data]; // sends endofperiod message to subscribers + currperiod::nextperiod; // increments current period + if[(data`p)>nextperiod::multilogperiod+currperiod; + system"t 0";'"next period is in the past"]; // timer off + getnextendUTC[]; // grabs next end time + if[rolllogs;periodrollover[data]]; // roll if appropriate + .lg.o[`stpeoperiod;"end of period complete, new values for current and next period are ",.Q.s1 (currentpd;nextpd)]; + } + +// common eop log rolling logic for STP and SCTP +periodrollover:{[data] + i+::1; // increments log seq number + rolllog[multilog;dldir;rolltabs;data`p]; + } + +// endofday function defined in SCTP +// passes on eod messages to subscribers and rolls logs +endofday:{[date;data] + .lg.o[`endofday;"flushing remaining data to subscribers and clearing tables"]; + .stpps.pubclear[.stplg.t]; + .stpps.end[date;data]; // sends endofday message to subscribers + dayrollover[data]; + } + +// STP runs function to send out eod messages and roll logs +stpeod:{[date;data] + .lg.o[`stpeod;"executing end of day for ",.Q.s1 .eodtime.d]; + .stpps.end[date;data]; // sends endofday message to subscribers + dayrollover[data]; + } + +// common eod log rolling logic for STP and SCTP +dayrollover:{[data] + if[(data`p)>.eodtime.nextroll:.eodtime.getroll[data`p]; + system"t 0";'"next roll is in the past"]; // timer off + getnextendUTC[]; // grabs next end time + .eodtime.d+:1; // increment current day + .stpm.updmeta[multilog][`close;logtabs;(data`p)+.eodtime.dailyadj]; // update meta tables + .stpm.metatable:0#.stpm.metatable; + closelog each logtabs; // close current day logs + init[string .proc.procname]; // reinitialise process + .lg.o[`dayrollover;"end of day complete, new value for date is ",.Q.s1 .eodtime.d]; + } + +// get the next end time to compare to +getnextendUTC:{nextendUTC::-1+min(.eodtime.nextroll;nextperiod - .eodtime.dailyadj)} + +checkends:{ + // jump out early if don't have to do either + if[nextendUTC > x; :()]; + // check for endofperiod + if[nextperiod < x1:x+.eodtime.dailyadj; stpeoperiod[.stplg`currperiod;.stplg`nextperiod;.stplg.endofdaydata[],(enlist `p)!enlist x1;not .eodtime.nextroll < x]]; + // check for endofday + if[.eodtime.nextroll < x;if[.eodtime.d<("d"$x)-1;system"t 0";'"more than one day?"]; stpeod[.eodtime.d;.stplg.endofdaydata[],(enlist `p)!enlist x]]; + }; + +init:{[dbname] + t::tables[`.]except `currlog; + msgcount::rowcount::t!count[t]#0; + tmpmsgcount::tmprowcount::(`symbol$())!`long$(); + logtabs::$[multilog~`custom;key custommode;t]; + rolltabs::$[multilog~`custom;logtabs except where custommode in `tabular`singular;t]; + currperiod::multilogperiod xbar .z.p+.eodtime.dailyadj; + nextperiod::multilogperiod+currperiod; + getnextendUTC[]; + i::1; + seqnum::0; + + if[(value `..createlogs) or .sctp.loggingmode=`create; + createdld[dbname;.eodtime.d]; + openlog[multilog;dldir;;.z.p+.eodtime.dailyadj]each logtabs; + // If appropriate, roll error log + if[.stplg.errmode;openlogerr[dldir]]; + // read in the meta table from disk + .stpm.metatable:@[get;hsym`$string[.stplg.dldir],"/stpmeta";0#.stpm.metatable]; + // set log sequence number to the max of what we've found + i::1+ -1|exec max seq from .stpm.metatable; + // add the info to the meta table + .stpm.updmeta[multilog][`open;logtabs;.z.p+.eodtime.dailyadj]; + ] + + // set loghandles to null if sctp is not creating logs + if[.sctp.chainedtp and not .sctp.loggingmode=`create; + `..loghandles set t! (count t) # enlist (::) + ] + }; + +\d . + +// Close logs on clean exit +.z.exit:{ + if[not x~0i;.lg.e[`stpexit;"Bad exit!"];:()]; + .lg.o[`stpexit;"Exiting process"]; + // exit before logs are touched if process is an sctp NOT in create mode + if[.sctp.chainedtp and not .sctp.loggingmode=`create; :()]; + .lg.o[`stpexit;"Closing off log files"]; + .stpm.updmeta[.stplg.multilog][`close;.stpps.t;.z.p]; + .stplg.closelog each .stpps.t; + } diff --git a/code/querytp/stpmeta.q b/code/querytp/stpmeta.q new file mode 100644 index 000000000..7adf3f4ed --- /dev/null +++ b/code/querytp/stpmeta.q @@ -0,0 +1,49 @@ +// API for writing logfile meta data +// Metatable keeps info on all opened logs, the tables which feed each log, and the number of messages written to each log + +\d .stpm + +metatable:([]seq:`int$();logname:`$();start:`timestamp$();end:`timestamp$();tbls:();msgcount:`int$();schema:();additional:()) + +// Functions to update meta data for all logs in each logging mode +// Meta is updated only when opening and closing logs +updmeta:enlist[`]!enlist () + +updmeta[`tabperiod]:{[x;t;p] + getmeta[x;p;;]'[enlist each t;`..currlog[([]tbl:t)]`logname]; + setmeta[.stplg.dldir;metatable]; + }; + +updmeta[`singular]:{[x;t;p] + getmeta[x;p;t;`..currlog[first t]`logname]; + setmeta[.stplg.dldir;metatable]; + }; + +updmeta[`periodic]:updmeta[`singular] + +updmeta[`tabular]:updmeta[`tabperiod] + +updmeta[`custom]:{[x;t;p] + pertabs:where `periodic=.stplg.custommode; + updmeta[`periodic][x;t inter pertabs;p]; + updmeta[`tabular][x;t except pertabs;p] + }; + +// Logname, start time, table names and schema populated on opening +// End time and final message count updated on close +// Sequence number increments by one on log period rollover +getmeta:{[x;p;t;ln] + if[x~`open; + s:((),t)!(),.stpps.schemas[t]; + `.stpm.metatable upsert (.stplg.i;ln;p;0Np;t;0;s;enlist ()!()); + ]; + if[x~`close; + update end:p,msgcount:sum .stplg.msgcount[t] from `.stpm.metatable where logname = ln + ] + }; + +setmeta:{[dir;mt] + t:(hsym`$string[dir],"/stpmeta"); + .[{x set y};(t;mt);{.lg.e[`setmeta;"Failed to set metatable with error: ",x]}]; + }; + diff --git a/config/passwords/queryfeed.txt b/config/passwords/queryfeed.txt new file mode 100644 index 000000000..02e2087f4 --- /dev/null +++ b/config/passwords/queryfeed.txt @@ -0,0 +1 @@ +queryfeed:pass diff --git a/config/passwords/querygateway.txt b/config/passwords/querygateway.txt new file mode 100644 index 000000000..0d1247cd5 --- /dev/null +++ b/config/passwords/querygateway.txt @@ -0,0 +1 @@ +querygateway:pass diff --git a/config/passwords/queryhdb.txt b/config/passwords/queryhdb.txt new file mode 100644 index 000000000..9e21bd1eb --- /dev/null +++ b/config/passwords/queryhdb.txt @@ -0,0 +1 @@ +queryhdb:pass diff --git a/config/passwords/queryrdb.txt b/config/passwords/queryrdb.txt new file mode 100644 index 000000000..ea3ecb8ef --- /dev/null +++ b/config/passwords/queryrdb.txt @@ -0,0 +1 @@ +queryrdb:pass diff --git a/config/passwords/querytp.txt b/config/passwords/querytp.txt new file mode 100644 index 000000000..5171c10f4 --- /dev/null +++ b/config/passwords/querytp.txt @@ -0,0 +1 @@ +querytp:pass diff --git a/config/querysort.csv b/config/querysort.csv new file mode 100644 index 000000000..4170a28c1 --- /dev/null +++ b/config/querysort.csv @@ -0,0 +1,3 @@ +tabname,att,column,sort +default,p,procname,1 +default,,time,1 diff --git a/config/querytrack.txt b/config/querytrack.txt new file mode 100644 index 000000000..0032cedcc --- /dev/null +++ b/config/querytrack.txt @@ -0,0 +1,6 @@ +rdb1 +hdb1 +rdb2 +hdb2 +gateway1 + diff --git a/config/settings/chainedtp.q b/config/settings/chainedtp.q index ced208538..b4dc3585d 100644 --- a/config/settings/chainedtp.q +++ b/config/settings/chainedtp.q @@ -23,3 +23,6 @@ enabled:0b /- disable heartbeating /- Configuration used by the usage functions - logging of client interaction \d .usage enabled:0b /- switch off the usage logging + +\d .queries +enabled:0b /- disable query tracking diff --git a/config/settings/default.q b/config/settings/default.q index ec55d8591..6af8b937f 100644 --- a/config/settings/default.q +++ b/config/settings/default.q @@ -20,6 +20,8 @@ logtodisk:1b // whether to log to disk or not logtomemory:1b // write query logs to memory ignore:1b // check the ignore list for functions to ignore ignorelist:(`upd;"upd") // the list of functions to ignore in async calls +allowedusers:`angus`michael`stephen +ignoreusers:raze (`discovery`dqc`dqe`gateway`mburns`monitor`queryfeed`querygateway`queryrdb`rdb`reporter`segmentedtickerplant`sort`wdb;.z.u;`) // clients to ignore for query logging flushinterval:0D00:30:00 // default value for how often to flush the in-memory logs flushtime:1D00 // default value for how long to persist the in-memory logs. Set to 0D for no flushing suppressalias:0b // whether to suppress the log file alias creation @@ -41,6 +43,12 @@ MAXIDLE:`long$0D // handles which haven't been used in this length of time will AUTORECONNECT:0b // whether to reconnect to processes previously subscribed to checksubscriptionperiod:0D00:00:10 // how frequently to check subscriptions are still connected - 0D means don't check +//queries config +\d .queries +enabled:1b +ignore:1b +ignorelist:(`upd;"upd") + // Permissions configuration \d .pm enabled:0b @@ -183,4 +191,4 @@ SAVEFILE:`:KUTR.csv; // test results savefile // data striping \d .ds -numseg:0i; // default value for process.csv to overwrite and pubsub.q check \ No newline at end of file +numseg:0i; // default value for process.csv to overwrite and pubsub.q check diff --git a/config/settings/queryfeed.q b/config/settings/queryfeed.q new file mode 100644 index 000000000..58ca14222 --- /dev/null +++ b/config/settings/queryfeed.q @@ -0,0 +1,11 @@ +\d .servers +enabled:1b +CONNECTIONS:`querytp; // Feedhandler connects to the query-tickerplant +HOPENTIMEOUT:30000 + +\d . +subprocs:"S"$read0 hsym `$(getenv `KDBCONFIG),"/querytrack.txt"; // List of procs for query-tickerplant to subscribe to +reloadenabled:1b + +\d .usage +enabled:0b diff --git a/config/settings/querygateway.q b/config/settings/querygateway.q new file mode 100644 index 000000000..1b77a1080 --- /dev/null +++ b/config/settings/querygateway.q @@ -0,0 +1,24 @@ +// Default configuration for the gateway process + +\d .gw +// if error & sync message, throws an error. Else passes result as normal +// status - 1b=success, 0b=error. sync - 1b=sync, 0b=async +formatresponse:{[status;sync;result]$[not[status]and sync;'result;result]}; +synccallsallowed:0b // whether synchronous calls are allowed +querykeeptime:0D00:30 // the time to keep queries in the +errorprefix:"error: " // the prefix for clients to look for in error strings +clearinactivetime:0D01:00 // the time to keep inactive handle data + +\d .kxdash +enabled:0b // Functionality for parsing and handling kx dashboard queries - disabled by default + +\d .proc +loadprocesscode:1b // whether to load the process specific code defined at ${KDBCODE}/{process type} + +// Server connection details +\d .servers +CONNECTIONS:`queryrdb`queryhdb`gateway // list of connections to make at start up +RETRY:0D00:01 // period on which to retry dead connections. If 0, no reconnection attempts + +\d .aqrest +loadexecute:0b // Whether to reset .aqrest.execute diff --git a/config/settings/queryhdb.q b/config/settings/queryhdb.q new file mode 100644 index 000000000..e89d9a53e --- /dev/null +++ b/config/settings/queryhdb.q @@ -0,0 +1,9 @@ +// Bespoke HDB config + +\d .proc +loadprocesscode:1b // whether to load the process specific code defined at ${KDBCODE}/{process type} + +// Server connection details +\d .servers +CONNECTIONS:() // list of connections to make at start up +STARTUP:1b // create connections diff --git a/config/settings/queryrdb.q b/config/settings/queryrdb.q new file mode 100644 index 000000000..166e1dc3c --- /dev/null +++ b/config/settings/queryrdb.q @@ -0,0 +1,37 @@ +// Bespoke RDB config +\d .rdb +ignorelist:`heartbeat`logmsg`.usage.usage //list of tables to ignore when saving to disk +hdbtypes:`queryhdb //list of hdb types to look for and call in hdb reload +hdbnames:`queryhdb1 //list of hdb names to search for and call in hdb reload +tickerplanttypes:`querytp //list of tickerplant types to try and make a connection to +gatewaytypes:`querygateway //list of gateway types to try and make a connection to +checktpperiod:0D00:00:05 //how often to check for tickerplant connection +onlyclearsaved:0b //if true, eod writedown will only clear tables which have been successfully saved to disk +subscribeto:` //a list of tables to subscribe to, default (`) means all tables +subscribesyms:` //a list of syms to subscribe for, (`) means all syms +savetables:1b //if true tables will be saved at end of day, if false tables wil not be saved, only wiped +garbagecollect:1b //if true .Q.gc will be called after each writedown - tradeoff: latency vs memory usage +upd:insert //value of upd +hdbdir:hsym `$getenv[`KDBQUERYHDB] //the location of the hdb directory +replaylog:1b //replay the tickerplant log file +schema:1b //retrieve the schema from the tickerplant +tpconnsleepintv:10 //number of seconds between attempts to connect to the tp +gc:1b //if true .Q.gc will be called after each writedown - tradeoff: latency vs memory usage +sortcsv:hsym first .proc.getconfigfile["querysort.csv"] //location of csv file +reloadenabled:0b //if true, the RDB will not save when .u.end is called but + //will clear it's data using reload function (called by the WDB) +parvaluesrc:`log //where to source the rdb partition value, can be log (from tp log file name), + //tab (from the the first value in the time column of the table that is subscribed for) + //anything else will return a null date which is will be filled by pardefault +pardefault:.z.D //if the src defined in parvaluesrc returns null, use this default date instead +tpcheckcycles:0W //specify the number of times the process will check for an available tickerplant +subfiltered:0b //allows subscription filters to be loaded and applied in the rdb +connectonstart:1b //rdb connects to tickerplant as soon as it is started +\d .usage +querytrack:0b +\d .proc +loadprocesscode:1b // whether to load the process specific code defined at ${KDBCODE}/{process type} +// Server connection details +\d .servers +CONNECTIONS:`queryhdb`querygateway // list of connections to make at start up +STARTUP:1b // create connections diff --git a/config/settings/querytp.q b/config/settings/querytp.q new file mode 100644 index 000000000..b4477eafc --- /dev/null +++ b/config/settings/querytp.q @@ -0,0 +1,43 @@ +// Segmented TP config + +\d .stplg + +multilog:`tabperiod; // [tabperiod|none|periodic|tabular|custom] +multilogperiod:0D01; // Length of period for STP periodic logging modes +errmode:1b; // Enable error mode for STP +batchmode:`defaultbatch; // [memorybatch|defaultbatch|immediate] +replayperiod:`day // [period|day|prior] +customcsv:hsym first .proc.getconfigfile["stpcustom.csv"]; // Location for custom logging mode csv + +\d .proc +loadcommoncode:0b // do not load common code +loadprocesscode:1b // load process code +logroll:0b // do not roll logs + +// Configuration used by the usage functions - logging of client interaction +\d .usage +enabled:0b // switch off the usage logging + +// Client tracking configuration +// This is the only thing we want to do +// and only for connections being opened and closed +\d .clients +enabled:1b // whether client tracking is enabled +opencloseonly:1b // only log open and closing of connections + +// Server connection details +\d .servers +enabled:0b // disable server tracking + +\d .timer +enabled:0b // disable the timer + +\d .hb +enabled:0b // disable heartbeating + +\d .zpsignore +enabled:0b // disable zpsignore - zps should be empty + +\d .queries +enabled:0b + diff --git a/config/settings/segmentedchainedtickerplant.q b/config/settings/segmentedchainedtickerplant.q index 051fe8b75..9e47c93fc 100644 --- a/config/settings/segmentedchainedtickerplant.q +++ b/config/settings/segmentedchainedtickerplant.q @@ -38,3 +38,6 @@ enabled:1b // switch on subscribercutoff \d .servers CONNECTIONS,:`segmentedtickerplant CONNECTIONSFROMDISCOVERY:1b + +\d .queries +enabled:0b // disable query tracking diff --git a/config/settings/segmentedtickerplant.q b/config/settings/segmentedtickerplant.q index 72c28bda6..ab3791e25 100644 --- a/config/settings/segmentedtickerplant.q +++ b/config/settings/segmentedtickerplant.q @@ -36,4 +36,7 @@ enabled:0b // disable the timer enabled:0b // disable heartbeating \d .zpsignore -enabled:0b // disable zpsignore - zps should be empty +enabled:0b // disable zpsignore - zps should be empty + +\d .queries +enabled:0b // disable query tracking diff --git a/config/settings/tickerplant.q b/config/settings/tickerplant.q index 5119ffcac..ef356a4ee 100644 --- a/config/settings/tickerplant.q +++ b/config/settings/tickerplant.q @@ -28,3 +28,5 @@ enabled:0b // disable heartbeating \d .zpsignore enabled:0b // disable zpsignore - zps should be empty +\d .queries +enabled:0b // disable query tracking diff --git a/docs/QueryManagement.md b/docs/QueryManagement.md new file mode 100644 index 000000000..b241bcac0 --- /dev/null +++ b/docs/QueryManagement.md @@ -0,0 +1,94 @@ + + +## Functionality Overview + +Query Logging Management is an addition to TorQ to enhance the current query logging system. The aim for this tool is to have access to information about queries that are sent to specific processes throughout the day, allowing for query analysis to be carried out. + +For each query executed we want access to: + +- The time of the query +- The amount of time the query took to run +- Username of the person running the query +- IP address of the person running the query +- Host of the process that was being queried +- Name of the process that was being queried +- Process type of the process that was being queried +- The query that was being executed including any parameters + +This query logging functionality can be enabled or disabled for specific processes using variables within TorQ config files. + +## Architecture + +The architecture of the Query Logging Management framework is shown in the following diagram: + +![QueryLoggingManagementArchitecture](graphics/torq-qlm_architecture.PNG) + +## Processes + +#### Query Feed + +If enabled within a TorQ stack, the Query Feed process subscribes to updates from the .usage.usage table that is defined in the logusage.q handlers script from all enabled TorQ processes. + +Once a connection has been set up to our Query Tickerplant, the Query feed sends a message to execute the .u.upd function to insert this collected .usage.usage table into a usage table. + +#### Query Tickerplant + +The Query Tickerplant process receives updates from the Query Feed process regarding the usage table and publishes it on to any subscribing process, operating the same way as a standard tickerplant. + +#### Query RDB + +The Query RDB works like a normal RDB process, receiving usage table messages from the Tickerplant and storing this in memory in order to be queried. At the end of day, the usage table is saved down onto disk to be loaded into the HDB. + +#### Query HDB + +The Query HDB loads in historical usage data from disk in order for long term query information to be queried. + +#### Query Gateway + +The Query Gateway process connects to our specific Query RDB and HDB processes to load balance queries and allow for access to query information involving both historical and real time data. The query gateway process also contains a number of analytics functions to return specific aggregations from the usage table (e.g. how many queries were executed for each process on a specific day). + +## Table Contents + +The Query usage table contains the follow columns: + +- time: The time that the query was received or when it finished executing + +- runtime: The time taken for the query to be executed + +- zcmd: The .z function that was invoked + +- proctype: The process type of the process being queried + +- procname: The name of the process being queried + +- status: Denotes whether the entry is before the query is executed (b), that the query has been successfully executed (c) or if the query execution was in error (e) + +- ip: The IP address of the process being queried + +- user: The user who is executing the query + +- handle: The handle over which the query is passed + +- cmd: The query and its arguments as a string + +- mem: Memory information connected to the query execution + +- sz: The size in bytes of the result from the query being executed + +- error: An error string, if error occured (i.e. status is `e`) + +## Setup + +The TorQ Query Management logging tool can be added on top of any existing TorQ setup or the full TorQ application including Query logging can be integrated with data sources and feeds. + +As mentioned above, there are 5 processes that need to be initialised: Query Tickerplant, Query RDB, Query Feed, Query HDB and a Query Gateway. An example of how these would be added to a process csv file is displayed below: + + localhost,{KDBBASEPORT}+24,querytp,querytp1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,0,,,${KDBCODE}/processes/segmentedtickerplant.q,1,-schemafile ${TORQAPPHOME}/querydatabase.q -tplogdir ${KDBTPLOG} + localhost,{KDBBASEPORT}+25,queryrdb,queryrdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBCODE}/processes/rdb.q,1, + localhost,{KDBBASEPORT}+26,queryfeed,queryfeed1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBCODE}/processes/queryfeed.q,1, + localhost,{KDBBASEPORT}+27,queryhdb,queryhdb1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,60,4000,${KDBQUERYHDB},1, + localhost,{KDBBASEPORT}+28,querygateway,querygateway1,${TORQAPPHOME}/appconfig/passwords/accesslist.txt,1,1,,4000,${KDBCODE}/processes/querygateway.q,1, + +Each of these processes have their own unique process types and process files (except the Query RDB which uses the rdb.q file). Functionality for these 5 processes can be refined within each process config file. + +The Query Management logging tool is fully customisable so users can select specific processes that they want queries to be monitored for. To activate this functionality for a process, add the process name to the querytrack.csv file which is located in the TorQ/config/ directory. This csv file is loaded into the query feed process and used to activate Query Management Logging for each of the specified processes. diff --git a/docs/graphics/torq-qlm_architecture.PNG b/docs/graphics/torq-qlm_architecture.PNG new file mode 100644 index 000000000..976a41811 Binary files /dev/null and b/docs/graphics/torq-qlm_architecture.PNG differ