diff --git a/packages/mimecast/changelog.yml b/packages/mimecast/changelog.yml index b278ce1a547..ab3af9be5a0 100644 --- a/packages/mimecast/changelog.yml +++ b/packages/mimecast/changelog.yml @@ -1,4 +1,9 @@ # newer versions go on top +- version: "3.2.3" + changes: + - description: Refactored the CEL program for cloud_integrated_logs and siem_logs data streams to improve memory usage. + type: bugfix + link: https://github.com/elastic/integrations/pull/16308 - version: "3.2.2" changes: - description: Allow setting of `batch size` in all datastreams. diff --git a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs index c6a35419ddc..2046cec2a4c 100644 --- a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs @@ -31,6 +31,10 @@ program: | // If it is changed here changes should be reflected in the other data // streams. Do not differentiate the logic between these data streams // lightly; use the state variable for this unless absolutely required. + // + // Memory optimization: This program processes ONE batch file per execution + // cycle to avoid memory pressure. Batch URLs are stored in cursor.blobs + // and processed one at a time. state.with( ( (has(state.?token.expires) && now() < timestamp(state.token.expires)) ? @@ -73,87 +77,113 @@ program: | : work_list ).as(work_list, - get_request( - state.url.trim_right("/") + state.path + "?" + { - "type": [work_list[0].type], - ?"nextPage": work_list[0].?next.optMap(next, [next]), - ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), - ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), - ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), - }.format_query() - ).with({ - "Header": { - "Authorization": ["Bearer " + token.access_token], - "Accept": ["application/json"], - "Content-Type": ["application/json"], - } - }).do_request().as(resp, resp.StatusCode == 200 ? - bytes(resp.Body).decode_json().as(body, - { - "events": body.value.map(b, has(b.url), - get(b.url).as(batch, batch.StatusCode == 200 ? - bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, - { - "message": dyn(e.encode_json()), - } - ) - : - [{ - "error": { - "code": string(batch.StatusCode), - "id": string(batch.Status), - "message": "GET " + b.url + ": " + ( - size(batch.Body) != 0 ? - string(batch.Body) - : - string(batch.Status) + ' (' + string(batch.StatusCode) + ')' - ), - }, - }] - ) - ).flatten(), - "cursor": { - "work_list": ( - "@nextPage" in body && size(body.value) != 0 ? - [work_list[0].with({"next": body["@nextPage"]})] - : - [] - ) + tail(work_list), - }, - "token": { - "access_token": token.access_token, - "expires": token.expires, - }, - "want_more": "@nextPage" in body && size(body.value) != 0, - }.as(to_publish, to_publish.with({ - "want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0, - })) - ).as(state, - // Check whether we still need to get more, but have - // no event for this type. If we do, populate events - // with a place-holder to be discarded by the ingest - // pipeline. - state.want_more && size(state.events) == 0 ? - state.with({"events": [{"message": "want_more"}]}) - : - state - ) - : - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.path + ": " + ( - size(resp.Body) != 0 ? - string(resp.Body) - : - string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + state.?cursor.blobs.orValue([]).as(blobs, + // Process blobs first if any are queued, otherwise fetch batch lists. + (size(blobs) > 0) ? + // Download a single batch file. + get(blobs[0]).as(batch, (batch.StatusCode == 200) ? + { + "events": bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, + { + "message": dyn(e.encode_json()), + } ), - }, - }, - "want_more": false, - } + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(blobs) > 1 || size(work_list) != 0, + } + : (batch.StatusCode == 404) ? + // 404 Not Found - skip this blob and continue immediately with remaining blobs + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(blobs) > 1 || size(work_list) != 0, + } + : + // Other errors - emit error, stop processing + { + "events": { + "error": { + "code": string(batch.StatusCode), + "id": string(batch.Status), + "message": "GET " + blobs[0] + ": " + ( + (size(batch.Body) != 0) ? + string(batch.Body) + : + string(batch.Status) + " (" + string(batch.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) + : + // No blobs queued, fetch batch list from API. + get_request( + state.url.trim_right("/") + state.path + "?" + { + "type": [work_list[0].type], + ?"nextPage": work_list[0].?next.optMap(next, [next]), + ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), + ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), + ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), + }.format_query() + ).with({ + "Header": { + "Authorization": ["Bearer " + token.access_token], + "Accept": ["application/json"], + "Content-Type": ["application/json"], + } + }).do_request().as(resp, (resp.StatusCode == 200) ? + bytes(resp.Body).decode_json().as(body, + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": body.value.map(b, b.url), + "work_list": ( + // If there are more pages, add pagination work item. + ("@nextPage" in body && size(body.value) != 0) ? + [work_list[0].with({"next": body["@nextPage"]})] + : + [] + ) + tail(work_list), + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(body.value) != 0 || size(tail(work_list)) != 0, + } + ) + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.path + ": " + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) ) ) ) diff --git a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs index c28a8536813..16501d6343d 100644 --- a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs @@ -31,6 +31,10 @@ program: | // If it is changed here changes should be reflected in the other data // streams. Do not differentiate the logic between these data streams // lightly; use the state variable for this unless absolutely required. + // + // Memory optimization: This program processes ONE batch file per execution + // cycle to avoid memory pressure. Batch URLs are stored in cursor.blobs + // and processed one at a time. state.with( ( (has(state.?token.expires) && now() < timestamp(state.token.expires)) ? @@ -73,87 +77,113 @@ program: | : work_list ).as(work_list, - get_request( - state.url.trim_right("/") + state.path + "?" + { - "type": [work_list[0].type], - ?"nextPage": work_list[0].?next.optMap(next, [next]), - ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), - ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), - ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), - }.format_query() - ).with({ - "Header": { - "Authorization": ["Bearer " + token.access_token], - "Accept": ["application/json"], - "Content-Type": ["application/json"], - } - }).do_request().as(resp, resp.StatusCode == 200 ? - bytes(resp.Body).decode_json().as(body, - { - "events": body.value.map(b, has(b.url), - get(b.url).as(batch, batch.StatusCode == 200 ? - bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, - { - "message": dyn(e.encode_json()), - } - ) - : - [{ - "error": { - "code": string(batch.StatusCode), - "id": string(batch.Status), - "message": "GET " + b.url + ": " + ( - size(batch.Body) != 0 ? - string(batch.Body) - : - string(batch.Status) + ' (' + string(batch.StatusCode) + ')' - ), - }, - }] - ) - ).flatten(), - "cursor": { - "work_list": ( - "@nextPage" in body && size(body.value) != 0 ? - [work_list[0].with({"next": body["@nextPage"]})] - : - [] - ) + tail(work_list), - }, - "token": { - "access_token": token.access_token, - "expires": token.expires, - }, - "want_more": "@nextPage" in body && size(body.value) != 0, - }.as(to_publish, to_publish.with({ - "want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0, - })) - ).as(state, - // Check whether we still need to get more, but have - // no event for this type. If we do, populate events - // with a place-holder to be discarded by the ingest - // pipeline. - state.want_more && size(state.events) == 0 ? - state.with({"events": [{"message": "want_more"}]}) - : - state - ) - : - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.path + ": " + ( - size(resp.Body) != 0 ? - string(resp.Body) - : - string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + state.?cursor.blobs.orValue([]).as(blobs, + // Process blobs first if any are queued, otherwise fetch batch lists. + (size(blobs) > 0) ? + // Download a single batch file. + get(blobs[0]).as(batch, (batch.StatusCode == 200) ? + { + "events": bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, + { + "message": dyn(e.encode_json()), + } ), - }, - }, - "want_more": false, - } + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(blobs) > 1 || size(work_list) != 0, + } + : (batch.StatusCode == 404) ? + // 404 Not Found - skip this blob and continue immediately with remaining blobs + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(blobs) > 1 || size(work_list) != 0, + } + : + // Other errors - emit error, stop processing + { + "events": { + "error": { + "code": string(batch.StatusCode), + "id": string(batch.Status), + "message": "GET " + blobs[0] + ": " + ( + (size(batch.Body) != 0) ? + string(batch.Body) + : + string(batch.Status) + " (" + string(batch.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) + : + // No blobs queued, fetch batch list from API. + get_request( + state.url.trim_right("/") + state.path + "?" + { + "type": [work_list[0].type], + ?"nextPage": work_list[0].?next.optMap(next, [next]), + ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), + ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), + ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), + }.format_query() + ).with({ + "Header": { + "Authorization": ["Bearer " + token.access_token], + "Accept": ["application/json"], + "Content-Type": ["application/json"], + } + }).do_request().as(resp, (resp.StatusCode == 200) ? + bytes(resp.Body).decode_json().as(body, + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": body.value.map(b, b.url), + "work_list": ( + // If there are more pages, add pagination work item. + ("@nextPage" in body && size(body.value) != 0) ? + [work_list[0].with({"next": body["@nextPage"]})] + : + [] + ) + tail(work_list), + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(body.value) != 0 || size(tail(work_list)) != 0, + } + ) + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.path + ": " + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) ) ) ) diff --git a/packages/mimecast/manifest.yml b/packages/mimecast/manifest.yml index 98db5f37605..58607daf707 100644 --- a/packages/mimecast/manifest.yml +++ b/packages/mimecast/manifest.yml @@ -1,7 +1,7 @@ format_version: "3.3.2" name: mimecast title: "Mimecast" -version: "3.2.2" +version: "3.2.3" description: Collect logs from Mimecast with Elastic Agent. type: integration categories: ["security", "email_security"]