From 163c402924eeeb84dd25c729ef972e74bc3ae327 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 4 Dec 2025 16:03:01 +0530 Subject: [PATCH 1/7] refactored siem_logs & cloud_integrated_logs data streams according to andrew's proof of concept --- .../agent/stream/cel.yml.hbs | 175 ++++++++++-------- .../siem_logs/agent/stream/cel.yml.hbs | 175 ++++++++++-------- 2 files changed, 190 insertions(+), 160 deletions(-) diff --git a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs index c6a35419ddc..71ebe1be6b2 100644 --- a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs @@ -31,6 +31,10 @@ program: | // If it is changed here changes should be reflected in the other data // streams. Do not differentiate the logic between these data streams // lightly; use the state variable for this unless absolutely required. + // + // Memory optimization: This program processes ONE batch file per execution + // cycle to avoid memory pressure. Batch URLs are stored in cursor.blobs + // and processed one at a time. state.with( ( (has(state.?token.expires) && now() < timestamp(state.token.expires)) ? @@ -73,87 +77,98 @@ program: | : work_list ).as(work_list, - get_request( - state.url.trim_right("/") + state.path + "?" + { - "type": [work_list[0].type], - ?"nextPage": work_list[0].?next.optMap(next, [next]), - ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), - ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), - ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), - }.format_query() - ).with({ - "Header": { - "Authorization": ["Bearer " + token.access_token], - "Accept": ["application/json"], - "Content-Type": ["application/json"], - } - }).do_request().as(resp, resp.StatusCode == 200 ? - bytes(resp.Body).decode_json().as(body, - { - "events": body.value.map(b, has(b.url), - get(b.url).as(batch, batch.StatusCode == 200 ? - bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, - { - "message": dyn(e.encode_json()), - } - ) - : - [{ - "error": { - "code": string(batch.StatusCode), - "id": string(batch.Status), - "message": "GET " + b.url + ": " + ( - size(batch.Body) != 0 ? - string(batch.Body) - : - string(batch.Status) + ' (' + string(batch.StatusCode) + ')' - ), - }, - }] - ) - ).flatten(), - "cursor": { - "work_list": ( - "@nextPage" in body && size(body.value) != 0 ? - [work_list[0].with({"next": body["@nextPage"]})] - : - [] - ) + tail(work_list), - }, - "token": { - "access_token": token.access_token, - "expires": token.expires, - }, - "want_more": "@nextPage" in body && size(body.value) != 0, - }.as(to_publish, to_publish.with({ - "want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0, - })) - ).as(state, - // Check whether we still need to get more, but have - // no event for this type. If we do, populate events - // with a place-holder to be discarded by the ingest - // pipeline. - state.want_more && size(state.events) == 0 ? - state.with({"events": [{"message": "want_more"}]}) - : - state - ) - : - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.path + ": " + ( - size(resp.Body) != 0 ? - string(resp.Body) - : - string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + state.?cursor.blobs.orValue([]).as(blobs, + // Process blobs first if any are queued, otherwise fetch batch lists. + (size(blobs) > 0) ? + // Download a single batch file. + get(blobs[0]).as(batch, (batch.StatusCode == 200) ? + { + "events": bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, + { + "message": dyn(e.encode_json()), + } ), - }, - }, - "want_more": false, - } + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(tail(blobs)) != 0 || size(work_list) != 0, + } + : + { + "events": { + "error": { + "code": string(batch.StatusCode), + "id": string(batch.Status), + "message": "GET " + blobs[0] + ": " + ( + (size(batch.Body) != 0) ? + string(batch.Body) + : + string(batch.Status) + " (" + string(batch.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) + : + // No blobs queued, fetch batch list from API. + get_request( + state.url.trim_right("/") + state.path + "?" + { + "type": [work_list[0].type], + ?"nextPage": work_list[0].?next.optMap(next, [next]), + ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), + ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), + ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), + }.format_query() + ).with({ + "Header": { + "Authorization": ["Bearer " + token.access_token], + "Accept": ["application/json"], + "Content-Type": ["application/json"], + } + }).do_request().as(resp, (resp.StatusCode == 200) ? + bytes(resp.Body).decode_json().as(body, + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": body.value.map(b, b.url), + "work_list": ( + // If there are more pages, add pagination work item. + ("@nextPage" in body && size(body.value) != 0) ? + [work_list[0].with({"next": body["@nextPage"]})] + : + [] + ) + tail(work_list), + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(body.value) != 0 || size(tail(work_list)) != 0, + } + ) + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.path + ": " + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) ) ) ) diff --git a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs index c28a8536813..50ccf49aba7 100644 --- a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs @@ -31,6 +31,10 @@ program: | // If it is changed here changes should be reflected in the other data // streams. Do not differentiate the logic between these data streams // lightly; use the state variable for this unless absolutely required. + // + // Memory optimization: This program processes ONE batch file per execution + // cycle to avoid memory pressure. Batch URLs are stored in cursor.blobs + // and processed one at a time. state.with( ( (has(state.?token.expires) && now() < timestamp(state.token.expires)) ? @@ -73,87 +77,98 @@ program: | : work_list ).as(work_list, - get_request( - state.url.trim_right("/") + state.path + "?" + { - "type": [work_list[0].type], - ?"nextPage": work_list[0].?next.optMap(next, [next]), - ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), - ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), - ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), - }.format_query() - ).with({ - "Header": { - "Authorization": ["Bearer " + token.access_token], - "Accept": ["application/json"], - "Content-Type": ["application/json"], - } - }).do_request().as(resp, resp.StatusCode == 200 ? - bytes(resp.Body).decode_json().as(body, - { - "events": body.value.map(b, has(b.url), - get(b.url).as(batch, batch.StatusCode == 200 ? - bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, - { - "message": dyn(e.encode_json()), - } - ) - : - [{ - "error": { - "code": string(batch.StatusCode), - "id": string(batch.Status), - "message": "GET " + b.url + ": " + ( - size(batch.Body) != 0 ? - string(batch.Body) - : - string(batch.Status) + ' (' + string(batch.StatusCode) + ')' - ), - }, - }] - ) - ).flatten(), - "cursor": { - "work_list": ( - "@nextPage" in body && size(body.value) != 0 ? - [work_list[0].with({"next": body["@nextPage"]})] - : - [] - ) + tail(work_list), - }, - "token": { - "access_token": token.access_token, - "expires": token.expires, - }, - "want_more": "@nextPage" in body && size(body.value) != 0, - }.as(to_publish, to_publish.with({ - "want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0, - })) - ).as(state, - // Check whether we still need to get more, but have - // no event for this type. If we do, populate events - // with a place-holder to be discarded by the ingest - // pipeline. - state.want_more && size(state.events) == 0 ? - state.with({"events": [{"message": "want_more"}]}) - : - state - ) - : - { - "events": { - "error": { - "code": string(resp.StatusCode), - "id": string(resp.Status), - "message": "GET " + state.path + ": " + ( - size(resp.Body) != 0 ? - string(resp.Body) - : - string(resp.Status) + ' (' + string(resp.StatusCode) + ')' + state.?cursor.blobs.orValue([]).as(blobs, + // Process blobs first if any are queued, otherwise fetch batch lists. + (size(blobs) > 0) ? + // Download a single batch file. + get(blobs[0]).as(batch, (batch.StatusCode == 200) ? + { + "events": bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e, + { + "message": dyn(e.encode_json()), + } ), - }, - }, - "want_more": false, - } + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(tail(blobs)) != 0 || size(work_list) != 0, + } + : + { + "events": { + "error": { + "code": string(batch.StatusCode), + "id": string(batch.Status), + "message": "GET " + blobs[0] + ": " + ( + (size(batch.Body) != 0) ? + string(batch.Body) + : + string(batch.Status) + " (" + string(batch.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) + : + // No blobs queued, fetch batch list from API. + get_request( + state.url.trim_right("/") + state.path + "?" + { + "type": [work_list[0].type], + ?"nextPage": work_list[0].?next.optMap(next, [next]), + ?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]), + ?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]), + ?"pageSize": state.?page_size.optMap(size, [string(int(size))]), + }.format_query() + ).with({ + "Header": { + "Authorization": ["Bearer " + token.access_token], + "Accept": ["application/json"], + "Content-Type": ["application/json"], + } + }).do_request().as(resp, (resp.StatusCode == 200) ? + bytes(resp.Body).decode_json().as(body, + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": body.value.map(b, b.url), + "work_list": ( + // If there are more pages, add pagination work item. + ("@nextPage" in body && size(body.value) != 0) ? + [work_list[0].with({"next": body["@nextPage"]})] + : + [] + ) + tail(work_list), + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(body.value) != 0 || size(tail(work_list)) != 0, + } + ) + : + { + "events": { + "error": { + "code": string(resp.StatusCode), + "id": string(resp.Status), + "message": "GET " + state.path + ": " + ( + (size(resp.Body) != 0) ? + string(resp.Body) + : + string(resp.Status) + " (" + string(resp.StatusCode) + ")" + ), + }, + }, + "want_more": false, + } + ) ) ) ) From 0d58a3a3ecf773e67e1802e103073d8b7ddfda8b Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 4 Dec 2025 16:58:57 +0530 Subject: [PATCH 2/7] skipped httpjson system test due to health status degraded and template eval failure --- .../data_stream/siem_logs/_dev/test/system/test-v1-config.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml b/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml index b43672ce74d..da1f5df7fa1 100644 --- a/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml +++ b/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml @@ -11,3 +11,6 @@ data_stream: enable_request_tracer: true assert: hit_count: 1 +skip: + reason: "The fleet health status changes to degraded when the HTTPJSON template's value evaluation comes up empty or logs a failure, which leads to system test failures but does not interrupt the data flow." + link: https://github.com/elastic/beats/issues/45664 From 9690d0cfd3bbd1b49524acec4ce55f9c26050ee3 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 4 Dec 2025 17:03:57 +0530 Subject: [PATCH 3/7] added changelog and updated manifest --- packages/mimecast/changelog.yml | 8 ++++++++ packages/mimecast/manifest.yml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/mimecast/changelog.yml b/packages/mimecast/changelog.yml index b278ce1a547..95a0d720a6b 100644 --- a/packages/mimecast/changelog.yml +++ b/packages/mimecast/changelog.yml @@ -1,4 +1,12 @@ # newer versions go on top +- version: "3.2.3" + changes: + - description: Refactored the CEL program for siem_logs data stream to improve memory usage. + type: bugfix + link: https://github.com/elastic/integrations/pull/1111 + - description: Refactored the CEL program for cloud_integrated_logs data stream to improve memory usage. + type: bugfix + link: https://github.com/elastic/integrations/pull/1111 - version: "3.2.2" changes: - description: Allow setting of `batch size` in all datastreams. diff --git a/packages/mimecast/manifest.yml b/packages/mimecast/manifest.yml index 98db5f37605..58607daf707 100644 --- a/packages/mimecast/manifest.yml +++ b/packages/mimecast/manifest.yml @@ -1,7 +1,7 @@ format_version: "3.3.2" name: mimecast title: "Mimecast" -version: "3.2.2" +version: "3.2.3" description: Collect logs from Mimecast with Elastic Agent. type: integration categories: ["security", "email_security"] From 83f85d8f8c763ae43fa0e951f1be9a78831daa24 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 4 Dec 2025 17:14:03 +0530 Subject: [PATCH 4/7] updated changelog --- packages/mimecast/changelog.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/mimecast/changelog.yml b/packages/mimecast/changelog.yml index 95a0d720a6b..31cc9c673c8 100644 --- a/packages/mimecast/changelog.yml +++ b/packages/mimecast/changelog.yml @@ -3,10 +3,10 @@ changes: - description: Refactored the CEL program for siem_logs data stream to improve memory usage. type: bugfix - link: https://github.com/elastic/integrations/pull/1111 + link: https://github.com/elastic/integrations/pull/16308 - description: Refactored the CEL program for cloud_integrated_logs data stream to improve memory usage. type: bugfix - link: https://github.com/elastic/integrations/pull/1111 + link: https://github.com/elastic/integrations/pull/16308 - version: "3.2.2" changes: - description: Allow setting of `batch size` in all datastreams. From 85cd0345ce63892978f1cbb3e37d5ac25d213039 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 5 Dec 2025 16:09:19 +0530 Subject: [PATCH 5/7] addressed Dan's suggestions --- packages/mimecast/changelog.yml | 5 +---- .../cloud_integrated_logs/agent/stream/cel.yml.hbs | 2 +- .../mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/mimecast/changelog.yml b/packages/mimecast/changelog.yml index 31cc9c673c8..ab3af9be5a0 100644 --- a/packages/mimecast/changelog.yml +++ b/packages/mimecast/changelog.yml @@ -1,10 +1,7 @@ # newer versions go on top - version: "3.2.3" changes: - - description: Refactored the CEL program for siem_logs data stream to improve memory usage. - type: bugfix - link: https://github.com/elastic/integrations/pull/16308 - - description: Refactored the CEL program for cloud_integrated_logs data stream to improve memory usage. + - description: Refactored the CEL program for cloud_integrated_logs and siem_logs data streams to improve memory usage. type: bugfix link: https://github.com/elastic/integrations/pull/16308 - version: "3.2.2" diff --git a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs index 71ebe1be6b2..3501f963330 100644 --- a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs @@ -96,7 +96,7 @@ program: | "access_token": token.access_token, "expires": token.expires, }, - "want_more": size(tail(blobs)) != 0 || size(work_list) != 0, + "want_more": size(blobs) > 1 || size(work_list) != 0, } : { diff --git a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs index 50ccf49aba7..cf922e582b0 100644 --- a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs @@ -96,7 +96,7 @@ program: | "access_token": token.access_token, "expires": token.expires, }, - "want_more": size(tail(blobs)) != 0 || size(work_list) != 0, + "want_more": size(blobs) > 1 || size(work_list) != 0, } : { From e0d2adb32b6bdd8c71c1f252468709a49f673171 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 10 Dec 2025 15:44:35 +0530 Subject: [PATCH 6/7] addressed Andrew's suggestions and addressed handling of 404 error codes --- .../agent/stream/cel.yml.hbs | 15 +++++++++++++++ .../siem_logs/agent/stream/cel.yml.hbs | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs index 3501f963330..2046cec2a4c 100644 --- a/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs @@ -98,7 +98,22 @@ program: | }, "want_more": size(blobs) > 1 || size(work_list) != 0, } + : (batch.StatusCode == 404) ? + // 404 Not Found - skip this blob and continue immediately with remaining blobs + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(blobs) > 1 || size(work_list) != 0, + } : + // Other errors - emit error, stop processing { "events": { "error": { diff --git a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs index cf922e582b0..16501d6343d 100644 --- a/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs +++ b/packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs @@ -98,7 +98,22 @@ program: | }, "want_more": size(blobs) > 1 || size(work_list) != 0, } + : (batch.StatusCode == 404) ? + // 404 Not Found - skip this blob and continue immediately with remaining blobs + { + "events": [{"message": "want_more"}], + "cursor": { + "blobs": tail(blobs), + "work_list": work_list, + }, + "token": { + "access_token": token.access_token, + "expires": token.expires, + }, + "want_more": size(blobs) > 1 || size(work_list) != 0, + } : + // Other errors - emit error, stop processing { "events": { "error": { From e4e9b16fc6a9d280b562f9fdb1bc918a6bafa6f1 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 10 Dec 2025 15:46:45 +0530 Subject: [PATCH 7/7] removed skip, as this was used for internal testing --- .../data_stream/siem_logs/_dev/test/system/test-v1-config.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml b/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml index da1f5df7fa1..b43672ce74d 100644 --- a/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml +++ b/packages/mimecast/data_stream/siem_logs/_dev/test/system/test-v1-config.yml @@ -11,6 +11,3 @@ data_stream: enable_request_tracer: true assert: hit_count: 1 -skip: - reason: "The fleet health status changes to degraded when the HTTPJSON template's value evaluation comes up empty or logs a failure, which leads to system test failures but does not interrupt the data flow." - link: https://github.com/elastic/beats/issues/45664