Upload large item data
Upload large item data
The Coveo Crawling Module uses the Push API to push items to the Coveo Platform. If the compressed item data that you add with a pre-push extension exceeds 5MB, upload it to a temporary storage container provided by the Push API. The example pre-push extension script below demonstrates how to upload large item data using the Push API.
The script creates an Extensions folder under the COVEO_LOGS_ROOT folder (if it doesn’t already exist) and a subfolder named after the source ID.
The script logs relevant information about each crawled item in a .log file in that folder.
|
|
Leading practice
Apply the extension to a duplicate of your production source with a name that clearly indicates it’s for testing purposes only. In this test source, crawl only a small subset of content for faster debugging and to limit the log file size. Only after fully testing and validating the pre-push extension in the test source should you apply it to your production source. |
After you apply this extension, rebuild the source.
# Import required Python libraries. Note: Add non-Python standard libraries to the requirements.txt file.
import os
import base64
import zlib
import json
import requests
import logging
from logging.handlers import TimedRotatingFileHandler
# Initialize rotating file logging
log_folder = os.path.join(
os.getenv("COVEO_LOGS_ROOT"),
"Extensions",
os.getenv("SOURCE_ID", "unknown")
)
os.makedirs(log_folder, exist_ok=True)
fname = f"{os.getenv('OPERATION_TYPE','unknown')}_{os.getenv('OPERATION_ID','unknown')}.log"
fpath = os.path.join(log_folder, fname)
handler = TimedRotatingFileHandler(fpath, when="midnight")
handler.suffix = "%Y-%m-%d"
formatter = logging.Formatter(
fmt="%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
logging.basicConfig(level=logging.INFO, handlers=[handler])
# Push API request configuration
# See https://docs.coveo.com/en/2976/#data-residency-configuration for list of Push API v1 URL enpoints.
# See https://docs.coveo.com/en/1718#create-an-api-key for API key creation details.
config = {
'PUSH_API_URL': 'https://api.cloud.coveo.com/push/v1',
'ORG_ID': os.getenv('ORGANIZATION_ID'),
'API_KEY': '<Your API Key Here>'
}
MAXIMUM_INLINED_COMPRESSED_DATA_SIZE_IN_BYTES = 5 * 1024 * 1024 # 5MB
MAXIMUM_SUPPORTED_DATA_SIZE_IN_BYTES = 250 * 1024 * 1024 # 250MB
# Helper: Upload large item content
def upload_document_content(container, content):
logging.info("Uploading compressed content to S3 container...")
request_headers = {
"x-amz-server-side-encryption": "AES256",
"Content-Type": "application/octet-stream"
}
response = requests.put(container["uploadUri"], data=content, headers=request_headers)
response.raise_for_status()
logging.info("Upload completed successfully.")
# Helper: Request a file container from Push API
def get_file_container():
logging.info("Requesting file container from Push API...")
request_headers = {
"content-type": "application/json",
"Authorization": "Bearer " + config["API_KEY"]
}
request_url = f"{config['PUSH_API_URL']}/organizations/{config['ORG_ID']}/files"
response = requests.post(request_url, headers=request_headers)
response.raise_for_status()
logging.info("Container request succeeded.")
return json.loads(response.content)
# -----------------------------------------------------------------
# Extension entry point. The do_extension function must be defined.
# -----------------------------------------------------------------
def do_extension(body):
document_id = body.get("DocumentId", "<missing>")
logging.info("BEGIN processing item: %s", document_id)
# Fetch file content. URL used here for demonstration purposes only.
url = "http://httpbin.org/html"
logging.info("Fetching content from: %s", url)
try:
file_data = requests.get(url).content
logging.info("Fetched %d bytes.", len(file_data))
except Exception as ex:
logging.error("Failed to fetch content: %s", ex)
return body
# Basic size check
if len(file_data) == 0:
logging.warning("Downloaded content is empty; skipping compression.")
return body
# Compress
compressed = zlib.compress(file_data)
compressed_size = len(compressed)
logging.info(
"Compression finished. Original size: %d bytes, Compressed size: %d bytes.",
len(file_data), compressed_size
)
# Set compression type
body["CompressionType"] = "ZLIB"
# Case 1 — Inline compressed content (< 5MB)
if compressed_size <= MAXIMUM_INLINED_COMPRESSED_DATA_SIZE_IN_BYTES:
logging.info("Compressed size %d bytes fits inline. Adding CompressedBinaryData.", compressed_size)
body["CompressedBinaryData"] = base64.b64encode(compressed).decode()
# Case 2 — Must upload to S3 container (< 250MB)
elif compressed_size <= MAXIMUM_SUPPORTED_DATA_SIZE_IN_BYTES:
logging.info(
"Compressed size %d bytes too large to inline; requesting container upload.",
compressed_size
)
container = get_file_container()
if "fileId" in container:
body["CompressedBinaryDataFileId"] = container["fileId"]
logging.info("Received container fileId: %s", container["fileId"])
# Upload compressed data to the container
try:
upload_document_content(container, compressed)
except Exception as ex:
logging.error("Upload to container failed: %s", ex)
else:
logging.error("Push API did not return a fileId; upload impossible.")
# Case 3 — Too large even for container
else:
logging.error(
"Compressed content too large (%d bytes). Maximum supported size is %d bytes. "
"Document content will NOT be added.",
compressed_size,
MAXIMUM_SUPPORTED_DATA_SIZE_IN_BYTES
)
logging.info("END processing item: %s", document_id)
return body