Upload large item data

The Coveo Crawling Module uses the Push API to push items to the Coveo Platform. If the compressed item data that you add with a pre-push extension exceeds 5MB, upload it to a temporary storage container provided by the Push API. The example pre-push extension script below demonstrates how to upload large item data using the Push API.

The script creates an Extensions folder under the COVEO_LOGS_ROOT folder (if it doesn’t already exist) and a subfolder named after the source ID. The script logs relevant information about each crawled item in a .log file in that folder.

Tip
Leading practice

Apply the extension to a duplicate of your production source with a name that clearly indicates it’s for testing purposes only. In this test source, crawl only a small subset of content for faster debugging and to limit the log file size.

Only after fully testing and validating the pre-push extension in the test source should you apply it to your production source.

After you apply this extension, rebuild the source.

# Import required Python libraries. Note: Add non-Python standard libraries to the requirements.txt file.
import os
import base64
import zlib
import json
import requests
import logging
from logging.handlers import TimedRotatingFileHandler

# Initialize rotating file logging
log_folder = os.path.join(
    os.getenv("COVEO_LOGS_ROOT"),
    "Extensions",
    os.getenv("SOURCE_ID", "unknown")
)
os.makedirs(log_folder, exist_ok=True)

fname = f"{os.getenv('OPERATION_TYPE','unknown')}_{os.getenv('OPERATION_ID','unknown')}.log"
fpath = os.path.join(log_folder, fname)

handler = TimedRotatingFileHandler(fpath, when="midnight")
handler.suffix = "%Y-%m-%d"

formatter = logging.Formatter(
    fmt="%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)

logging.basicConfig(level=logging.INFO, handlers=[handler])

# Push API request configuration
# See https://docs.coveo.com/en/2976/#data-residency-configuration for list of Push API v1 URL enpoints.
# See https://docs.coveo.com/en/1718#create-an-api-key for API key creation details.
config = {
    'PUSH_API_URL': 'https://api.cloud.coveo.com/push/v1',
    'ORG_ID': os.getenv('ORGANIZATION_ID'),
    'API_KEY': '<Your API Key Here>'
}

MAXIMUM_INLINED_COMPRESSED_DATA_SIZE_IN_BYTES = 5 * 1024 * 1024     # 5MB
MAXIMUM_SUPPORTED_DATA_SIZE_IN_BYTES = 250 * 1024 * 1024           # 250MB

# Helper: Upload large item content
def upload_document_content(container, content):
    logging.info("Uploading compressed content to S3 container...")

    request_headers = {
        "x-amz-server-side-encryption": "AES256",
        "Content-Type": "application/octet-stream"
    }

    response = requests.put(container["uploadUri"], data=content, headers=request_headers)
    response.raise_for_status()

    logging.info("Upload completed successfully.")

# Helper: Request a file container from Push API
def get_file_container():
    logging.info("Requesting file container from Push API...")

    request_headers = {
        "content-type": "application/json",
        "Authorization": "Bearer " + config["API_KEY"]
    }

    request_url = f"{config['PUSH_API_URL']}/organizations/{config['ORG_ID']}/files"
    response = requests.post(request_url, headers=request_headers)
    response.raise_for_status()

    logging.info("Container request succeeded.")
    return json.loads(response.content)

# -----------------------------------------------------------------
# Extension entry point. The do_extension function must be defined.
# -----------------------------------------------------------------
def do_extension(body):

    document_id = body.get("DocumentId", "<missing>")
    logging.info("BEGIN processing item: %s", document_id)

    # Fetch file content. URL used here for demonstration purposes only.
    url = "http://httpbin.org/html"
    logging.info("Fetching content from: %s", url)

    try:
        file_data = requests.get(url).content
        logging.info("Fetched %d bytes.", len(file_data))
    except Exception as ex:
        logging.error("Failed to fetch content: %s", ex)
        return body

    # Basic size check
    if len(file_data) == 0:
        logging.warning("Downloaded content is empty; skipping compression.")
        return body

    # Compress
    compressed = zlib.compress(file_data)
    compressed_size = len(compressed)

    logging.info(
        "Compression finished. Original size: %d bytes, Compressed size: %d bytes.",
        len(file_data), compressed_size
    )

    # Set compression type
    body["CompressionType"] = "ZLIB"

    # Case 1 — Inline compressed content (< 5MB)
    if compressed_size <= MAXIMUM_INLINED_COMPRESSED_DATA_SIZE_IN_BYTES:
        logging.info("Compressed size %d bytes fits inline. Adding CompressedBinaryData.", compressed_size)

        body["CompressedBinaryData"] = base64.b64encode(compressed).decode()

    # Case 2 — Must upload to S3 container (< 250MB)
    elif compressed_size <= MAXIMUM_SUPPORTED_DATA_SIZE_IN_BYTES:
        logging.info(
            "Compressed size %d bytes too large to inline; requesting container upload.",
            compressed_size
        )

        container = get_file_container()

        if "fileId" in container:
            body["CompressedBinaryDataFileId"] = container["fileId"]
            logging.info("Received container fileId: %s", container["fileId"])

            # Upload compressed data to the container
            try:
                upload_document_content(container, compressed)
            except Exception as ex:
                logging.error("Upload to container failed: %s", ex)

        else:
            logging.error("Push API did not return a fileId; upload impossible.")

    # Case 3 — Too large even for container
    else:
        logging.error(
            "Compressed content too large (%d bytes). Maximum supported size is %d bytes. "
            "Document content will NOT be added.",
            compressed_size,
            MAXIMUM_SUPPORTED_DATA_SIZE_IN_BYTES
        )

    logging.info("END processing item: %s", document_id)
    return body