An indexing pipeline builds indexes derived from source data. The index should always be converging to the current version of source data. In other words, once a new version of source data is processed by the pipeline, all data derived from previous versions should no longer exist in the target index storage. This is called data consistency requirement for an indexing pipeline.
If you like this post and our work, please drop a ⭐ star https://github.com/cocoindex-io/cocoindex to support us ❤️. Thank you so much with a warm coconut hug 🥥🤗
Compared to building single-process in-memory transformation logic, indexing pipeline has additional challenges in two aspects, to maintain data consistency:
Temporally, it's long-running, i.e. the execution may span across multiple processes. e.g. an early execution may terminate in the middle, due to a SIGTERM, or a failure to commit to the target storage or a reboot. When the process restarts later, it should pick up existing states, and keep moving forward to update the target storage to desired status. This is both tedious and error-prone: for example, if a source data is only partially processed, and it gets updated before the restarted pipeline starts to process it again, how can we make sure states derived from the old version are properly cleared?
Spatially, execution happens concurrently, sometimes distributed across multiple machines for large workloads. This brings up the problem caused by out-of-order processing. Consider a scenario where your source data gets updated rapidly:
Data inconsistency in indexing systems can lead to several critical issues:
If you like this post and our work, please drop a ⭐ star https://github.com/cocoindex-io/cocoindex to support us ❤️. Thank you so much with a warm coconut hug 🥥🤗
If all your state management and storage systems are within a single transactional system (e.g., PostgreSQL), you can leverage database transactions for consistency. However, this is often not the case in real-world scenarios. For example, you may use an internal storage (e.g. PostgreSQL) to track per-source metadata, and uses an external vector store (e.g. Pinecone or Milvus) to store vector embeddings.
There're extra challenges around this:
We have to keep track of all the keys in target vector stores derived from each source key, in the internal storage. This is necessary when the old version of source data is updated or deleted: we need to locate previously derived keys and delete them if they no longer exist in the new version after processing.
However, the internal storage is not transactional with the external vector store, as they're two independent storage systems. So that if the process terminates abnormally in the middle, states in two systems will be out of sync.
So we need to carefully design the order of write operations during commit, to avoid data leakage even when the process is only partially executed:
This ensures a key invariant: keys tracked in the internal storage is always a superset of those really exist in the target store. This makes sure all the data in the target store is always tracked so never leaked.
Ordinals are unique identifiers that establish a strict ordering of data updates in your pipeline. They help bookkeep the sequence and timing of changes to ensure consistency. Common examples include:
Bookkeep ordinals carefully in two key stages:
Ordinal bookkeeping happens in Phase 1, which rejects out-of-order commits - they won't arise any updates in Phase 1, and Phase 2 and 3 won't be executed at all.
When there's multiple ongoing processings happen on the same source key, updates writing to the target storage (Phase 2 above) may be out of order, which causes results derived from old versions overwriting newer ones.
To avoid this, we need to:
deleted field set.Specifically:
deleted field is set for deleted versions, and not touched for new versions.deleted field is set.Besides, we need an offline GC process to garbage collect rows in the target storage with the deleted field set. Because of this, the deleted field could be a timestamp of the deletion time, hence we can decide when to GC a specific version based on it.
The discussions above only cover a part of the complexity of building and maintaining long-running or distributed indexing pipelines. There're other additional complexities, compared to building single-process in-memory transformation logic, such as:
CocoIndex framework aims at handling these complexities so users can focus on pure transformations:
Users write transformations as pure functions
Framework handles:
By exposing a data-driven programming paradigm, CocoIndex allows developers to focus on their core business logic while maintaining data consistency and freshness.