Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/mimecast/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# newer versions go on top
- version: "3.2.3"
changes:
- description: Refactored the CEL program for siem_logs data stream to improve memory usage.
type: bugfix
link: https://github.com/elastic/integrations/pull/16308
- description: Refactored the CEL program for cloud_integrated_logs data stream to improve memory usage.
type: bugfix
link: https://github.com/elastic/integrations/pull/16308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- description: Refactored the CEL program for siem_logs data stream to improve memory usage.
type: bugfix
link: https://github.com/elastic/integrations/pull/16308
- description: Refactored the CEL program for cloud_integrated_logs data stream to improve memory usage.
type: bugfix
link: https://github.com/elastic/integrations/pull/16308
- description: Refactored the CEL program for cloud_integrated_logs and siem_logs data streams to improve memory usage.
type: bugfix
link: https://github.com/elastic/integrations/pull/16308

- version: "3.2.2"
changes:
- description: Allow setting of `batch size` in all datastreams.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ program: |
// If it is changed here changes should be reflected in the other data
// streams. Do not differentiate the logic between these data streams
// lightly; use the state variable for this unless absolutely required.
//
// Memory optimization: This program processes ONE batch file per execution
// cycle to avoid memory pressure. Batch URLs are stored in cursor.blobs
// and processed one at a time.
state.with(
(
(has(state.?token.expires) && now() < timestamp(state.token.expires)) ?
Expand Down Expand Up @@ -73,87 +77,98 @@ program: |
:
work_list
).as(work_list,
get_request(
state.url.trim_right("/") + state.path + "?" + {
"type": [work_list[0].type],
?"nextPage": work_list[0].?next.optMap(next, [next]),
?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]),
?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]),
?"pageSize": state.?page_size.optMap(size, [string(int(size))]),
}.format_query()
).with({
"Header": {
"Authorization": ["Bearer " + token.access_token],
"Accept": ["application/json"],
"Content-Type": ["application/json"],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"events": body.value.map(b, has(b.url),
get(b.url).as(batch, batch.StatusCode == 200 ?
bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e,
{
"message": dyn(e.encode_json()),
}
)
:
[{
"error": {
"code": string(batch.StatusCode),
"id": string(batch.Status),
"message": "GET " + b.url + ": " + (
size(batch.Body) != 0 ?
string(batch.Body)
:
string(batch.Status) + ' (' + string(batch.StatusCode) + ')'
),
},
}]
)
).flatten(),
"cursor": {
"work_list": (
"@nextPage" in body && size(body.value) != 0 ?
[work_list[0].with({"next": body["@nextPage"]})]
:
[]
) + tail(work_list),
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": "@nextPage" in body && size(body.value) != 0,
}.as(to_publish, to_publish.with({
"want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0,
}))
).as(state,
// Check whether we still need to get more, but have
// no event for this type. If we do, populate events
// with a place-holder to be discarded by the ingest
// pipeline.
state.want_more && size(state.events) == 0 ?
state.with({"events": [{"message": "want_more"}]})
:
state
)
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.path + ": " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
state.?cursor.blobs.orValue([]).as(blobs,
// Process blobs first if any are queued, otherwise fetch batch lists.
(size(blobs) > 0) ?
// Download a single batch file.
get(blobs[0]).as(batch, (batch.StatusCode == 200) ?
{
"events": bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e,
{
"message": dyn(e.encode_json()),
}
),
},
},
"want_more": false,
}
"cursor": {
"blobs": tail(blobs),
"work_list": work_list,
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": size(tail(blobs)) != 0 || size(work_list) != 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"want_more": size(tail(blobs)) != 0 || size(work_list) != 0,
"want_more": size(blobs) > 1 || size(work_list) != 0,

in both files

}
:
{
"events": {
"error": {
"code": string(batch.StatusCode),
"id": string(batch.Status),
"message": "GET " + blobs[0] + ": " + (
(size(batch.Body) != 0) ?
string(batch.Body)
:
string(batch.Status) + " (" + string(batch.StatusCode) + ")"
),
},
},
"want_more": false,
}
)
:
// No blobs queued, fetch batch list from API.
get_request(
state.url.trim_right("/") + state.path + "?" + {
"type": [work_list[0].type],
?"nextPage": work_list[0].?next.optMap(next, [next]),
?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]),
?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]),
?"pageSize": state.?page_size.optMap(size, [string(int(size))]),
}.format_query()
).with({
"Header": {
"Authorization": ["Bearer " + token.access_token],
"Accept": ["application/json"],
"Content-Type": ["application/json"],
}
}).do_request().as(resp, (resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
{
"events": [{"message": "want_more"}],
"cursor": {
"blobs": body.value.map(b, b.url),
"work_list": (
// If there are more pages, add pagination work item.
("@nextPage" in body && size(body.value) != 0) ?
[work_list[0].with({"next": body["@nextPage"]})]
:
[]
) + tail(work_list),
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": size(body.value) != 0 || size(tail(work_list)) != 0,
}
)
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.path + ": " + (
(size(resp.Body) != 0) ?
string(resp.Body)
:
string(resp.Status) + " (" + string(resp.StatusCode) + ")"
),
},
},
"want_more": false,
}
)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ data_stream:
enable_request_tracer: true
assert:
hit_count: 1
skip:
reason: "The fleet health status changes to degraded when the HTTPJSON template's value evaluation comes up empty or logs a failure, which leads to system test failures but does not interrupt the data flow."
link: https://github.com/elastic/beats/issues/45664
175 changes: 95 additions & 80 deletions packages/mimecast/data_stream/siem_logs/agent/stream/cel.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ program: |
// If it is changed here changes should be reflected in the other data
// streams. Do not differentiate the logic between these data streams
// lightly; use the state variable for this unless absolutely required.
//
// Memory optimization: This program processes ONE batch file per execution
// cycle to avoid memory pressure. Batch URLs are stored in cursor.blobs
// and processed one at a time.
state.with(
(
(has(state.?token.expires) && now() < timestamp(state.token.expires)) ?
Expand Down Expand Up @@ -73,87 +77,98 @@ program: |
:
work_list
).as(work_list,
get_request(
state.url.trim_right("/") + state.path + "?" + {
"type": [work_list[0].type],
?"nextPage": work_list[0].?next.optMap(next, [next]),
?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]),
?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]),
?"pageSize": state.?page_size.optMap(size, [string(int(size))]),
}.format_query()
).with({
"Header": {
"Authorization": ["Bearer " + token.access_token],
"Accept": ["application/json"],
"Content-Type": ["application/json"],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"events": body.value.map(b, has(b.url),
get(b.url).as(batch, batch.StatusCode == 200 ?
bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e,
{
"message": dyn(e.encode_json()),
}
)
:
[{
"error": {
"code": string(batch.StatusCode),
"id": string(batch.Status),
"message": "GET " + b.url + ": " + (
size(batch.Body) != 0 ?
string(batch.Body)
:
string(batch.Status) + ' (' + string(batch.StatusCode) + ')'
),
},
}]
)
).flatten(),
"cursor": {
"work_list": (
"@nextPage" in body && size(body.value) != 0 ?
[work_list[0].with({"next": body["@nextPage"]})]
:
[]
) + tail(work_list),
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": "@nextPage" in body && size(body.value) != 0,
}.as(to_publish, to_publish.with({
"want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0,
}))
).as(state,
// Check whether we still need to get more, but have
// no event for this type. If we do, populate events
// with a place-holder to be discarded by the ingest
// pipeline.
state.want_more && size(state.events) == 0 ?
state.with({"events": [{"message": "want_more"}]})
:
state
)
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.path + ": " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
state.?cursor.blobs.orValue([]).as(blobs,
// Process blobs first if any are queued, otherwise fetch batch lists.
(size(blobs) > 0) ?
// Download a single batch file.
get(blobs[0]).as(batch, (batch.StatusCode == 200) ?
{
"events": bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e,
{
"message": dyn(e.encode_json()),
}
),
},
},
"want_more": false,
}
"cursor": {
"blobs": tail(blobs),
"work_list": work_list,
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": size(tail(blobs)) != 0 || size(work_list) != 0,
}
:
{
"events": {
"error": {
"code": string(batch.StatusCode),
"id": string(batch.Status),
"message": "GET " + blobs[0] + ": " + (
(size(batch.Body) != 0) ?
string(batch.Body)
:
string(batch.Status) + " (" + string(batch.StatusCode) + ")"
),
},
},
"want_more": false,
}
)
:
// No blobs queued, fetch batch list from API.
get_request(
state.url.trim_right("/") + state.path + "?" + {
"type": [work_list[0].type],
?"nextPage": work_list[0].?next.optMap(next, [next]),
?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]),
?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]),
?"pageSize": state.?page_size.optMap(size, [string(int(size))]),
}.format_query()
).with({
"Header": {
"Authorization": ["Bearer " + token.access_token],
"Accept": ["application/json"],
"Content-Type": ["application/json"],
}
}).do_request().as(resp, (resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
{
"events": [{"message": "want_more"}],
"cursor": {
"blobs": body.value.map(b, b.url),
"work_list": (
// If there are more pages, add pagination work item.
("@nextPage" in body && size(body.value) != 0) ?
[work_list[0].with({"next": body["@nextPage"]})]
:
[]
) + tail(work_list),
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": size(body.value) != 0 || size(tail(work_list)) != 0,
}
)
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.path + ": " + (
(size(resp.Body) != 0) ?
string(resp.Body)
:
string(resp.Status) + " (" + string(resp.StatusCode) + ")"
),
},
},
"want_more": false,
}
)
)
)
)
Expand Down
Loading