opensearch-export-csv
Download
#!/usr/bin/env python3

import csv
import datetime

import boto3

from opensearchpy import OpenSearch
from opensearchpy import RequestsHttpConnection
from opensearchpy.helpers import scan
from requests_aws4auth import AWS4Auth


region_name = "us-east-1"
service = "es"
domain_name = "my_domain"

aws = boto3.Session()
credentials = aws.get_credentials()
opensearch = aws.client("opensearch", region_name=region_name)
r = opensearch.describe_domain(DomainName=domain_name)
host = r["DomainStatus"]["Endpoint"]

awsauth = AWS4Auth(
    # access_id=credentials.access_key,
    # secret_key=credentials.secret_key,
    # session_token=credentials.token,
    region=region_name,
    service=service,
    refreshable_credentials=credentials,
)
search = OpenSearch(
    hosts=[{"host": host, "port": 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
)

match_host = {
    "bool": {
        "should": [
            {"match_phrase": {"ClientRequestHost": "example-1.com"}},
            {"match_phrase": {"ClientRequestHost": "example-2.com"}},
        ],
        "minimum_should_match": 1,
    },
}
match_uri = {"match_phrase": {"ClientRequestURI": {"query": "app/home"}}}
match_ip = {"match": {"ClientIP": {"query": "1.1.1.1"}}}
start = datetime.datetime.now(tz=datetime.timezone.utc)
end = start - datetime.timedelta(days=7)
match_range = {
    "range": {
        "EdgeStartTimestamp": {
            "format": "strict_date_optional_time",
            "gte": start.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
            "lte": end.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
        },
    },
}

search_index = "cloudflare-*"
search_query = {
    "size": 1000,
    "sort": [{"EdgeStartTimestamp": {"order": "desc", "unmapped_type": "boolean"}}],
    "docvalue_fields": [
        {"field": "@timestamp", "format": "date_time"},
        {"field": "EdgeEndTimestamp", "format": "date_time"},
        {"field": "EdgeStartTimestamp", "format": "date_time"},
        {"field": "cloudflare.edge.end.timestamp", "format": "date_time"},
        {"field": "cloudflare.edge.start.timestamp", "format": "date_time"},
        {"field": "cloudflare.origin.response.http.expires", "format": "date_time"},
        {"field": "cloudflare.origin.response.http.last_modified", "format": "date_time"},
        {"field": "event.end", "format": "date_time"},
        {"field": "event.start", "format": "date_time"},
    ],
    "query": {
        "bool": {
            "must": [
                match_host,
                match_uri,
                match_ip,
                match_range,
            ],
            "filter": [{"match_all": {}}],
            "should": [],
            "must_not": [],
        }
    },
}
fields = [
    "EdgeStartTimestamp",
    "EdgeEndTimestamp",
    "ClientIP",
    "ClientDeviceType",
    "ClientRequestProtocol",
    "ClientRequestMethod",
    "ClientRequestHost",
    "ClientRequestURI",
    "ClientRequestReferer",
    "ClientRequestUserAgent",
    "OriginResponseStatus",
]

items = [*scan(search, query=search_query, index=search_index)]
with open("result.csv", mode="w", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fields)
    writer.writeheader()
    for item in items:
        writer.writerow({k: item["_source"][k] for k in fields})