Indexing Pipeline Extension State
Indexing Pipeline Extension State
An IPE may need to perform web requests to access an external resource required for its execution, such as an authorization token or a catalog.
In order not to flood the remote server, you can keep and share an IPE state between executions using the state
keyword.
The state
must be a Python dictionary and has a 32kb size limit.
Oversized state or invalid types will be dropped.
The state is shared between the instances of a single IPE bound to a single source.
For example, a state stored for source_1
can’t be accessed from source_2
, and a state stored for IPE_1
can’t be accessed from IPE_2
.
The state is cleared every 24 hours. To support a shorter lifetime, you must implement your own logic.
import time
token = state.get("token")
ttl = state.get("ttl", 0)
now = time.time()
if now < ttl:
log(f"token is valid: {token}")
# ...
else:
log("token has expired, will renew for 60min")
state["token"] = "new_token"
state["ttl"] = now + 3600
# ...
IPEs are executed on multiple machines simultaneously, meaning it may take up to a minute for updates to sync across all running instances of the IPE. If several states are displayed with varying or empty values, this is expected behavior and indicates that it’s still being synchronized.
Using the following extension, the tables show a simplified timeline of the state synchronization between all machines.
import random
value = state.get("value", 0)
log("value is {value}")
if not value:
state["value"] = random.randint(1, 10)
log("new value is {value}")
Time (seconds) | Machine A | Machine B | Machine C |
---|---|---|---|
t0 |
value is 0 |
value is 0 |
value is 0 |
t1 |
new value is 4 |
new value is 2 |
new value is 9 |
t2 |
value is 4 |
value is 2 |
value is 9 |
t3 |
value is 4 |
value is 2 |
value is 9 |
t4 |
value is 4 |
value is 2 |
value is 9 |
After one minute, the state is synced between machines A, B, and C. The final value that’s used is the value of the last machine to update.
Time (seconds) | Machine A | Machine B | Machine C |
---|---|---|---|
t61 |
value is 2 |
value is 2 |
value is 2 |
t62 |
value is 2 |
value is 2 |
value is 2 |
t63 |
value is 2 |
value is 2 |
value is 2 |
t64 |
value is 2 |
value is 2 |
value is 2 |