Read and Write Lance Dataset¶
Lance dataset APIs follows the PyArrow API conventions.
Writing Lance Dataset¶
Similar to Apache Pyarrow, the simplest approach to create a Lance dataset is
writing a pyarrow.Table
via lance.write_dataset()
.
import lance import pyarrow as pa table = pa.Table.from_pylist([{"name": "Alice", "age": 20}, {"name": "Bob", "age": 30}]) lance.write_dataset(table, "./alice_and_bob.lance")
If the memory footprint of the dataset is too large to fit in memory, lance.write_dataset()
also supports writing a dataset in iterator of pyarrow.RecordBatch
es.
import lance import pyarrow as pa def producer(): yield pa.RecordBatch.from_pylist([{"name": "Alice", "age": 20}]) yield pa.RecordBatch.from_pylist([{"name": "Blob", "age": 30}]) schema = pa.schema([ pa.field("name", pa.string()), pa.field("age", pa.int64()), ]) lance.write_dataset(reader, "./alice_and_bob.lance", schema)
lance.write_dataset()
supports writing pyarrow.Table
, pandas.DataFrame
,
pyarrow.Dataset
, and Iterator[pyarrow.RecordBatch]
. Check its doc for more details.
Adding new columns¶
New columns can be merged into an existing dataset in using lance.Dataset.merge()
.
This allows filling in additional columns without having to rewrite the whole dataset.
To use the merge
method, provide a new table that includes the columns you
want to add, and a column name to use for joining the new data to the existing
dataset.
For example, imagine we have a dataset of embeddings and ids:
import lance
import pyarrow as pa
import numpy as np
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
np.array([7, 8, 9])])
})
dataset = lance.write_dataset(table, "embeddings")
Now if we want to add a column of labels we have generated, we can do so by merging a new table:
new_data = pa.table({
"id": pa.array([1, 2, 3]),
"label": pa.array(["horse", "rabbit", "cat"])
})
dataset.merge(new_data, "id")
dataset.to_table().to_pandas()
id embedding label
0 1 [1, 2, 3] horse
1 2 [4, 5, 6] rabbit
2 3 [7, 8, 9] cat
Deleting rows¶
Lance supports deleting rows from a dataset using a SQL filter. For example, to delete Bob’s row from the dataset above, one could use:
import lance dataset = lance.dataset("./alice_and_bob.lance") dataset.delete("name = 'Bob'")
lance.LanceDataset.delete()
supports the same filters as described in
Filter push-down.
Rows are deleted by marking them as deleted in a separate deletion index. This is faster than rewriting the files and also avoids invaliding any indices that point to those files. Any subsequent queries will not return the deleted rows.
Warning
Do not read datasets with deleted rows using Lance versions prior to 0.5.0, as they will return the deleted rows. This is fixed in 0.5.0 and later.
Committing mechanisms for S3¶
Most supported storage systems (e.g. local file system, Google Cloud Storage,
Azure Blob Store) natively support atomic commits, which prevent concurrent
writers from corrupting the dataset. However, S3 does not support this natively.
To work around this, you may provide a locking mechanism that Lance can use to
lock the table while providing a write. To do so, you should implement a
context manager that acquires and releases a lock and then pass that to the
commit_lock
parameter of lance.write_dataset()
.
Note
In order for the locking mechanism to work, all writers must use the same exact mechanism. Otherwise, Lance will not be able to detect conflicts.
On entering, the context manager should acquire the lock on the table. The table
version being committed is passed in as an argument, which may be used if the
locking service wishes to keep track of the current version of the table, but
this is not required. If the table is already locked by another transaction,
it should wait until it is unlocked, since the other transaction may fail. Once
unlocked, it should either lock the table or, if the lock keeps track of the
current version of the table, return a CommitConflictError
if the
requested version has already been committed.
To prevent poisoned locks, it’s recommended to set a timeout on the locks. That way, if a process crashes while holding the lock, the lock will be released eventually. The timeout should be no less than 30 seconds.
from contextlib import contextmanager
@contextmanager
def commit_lock(version: int);
# Acquire the lock
my_lock.acquire()
try:
yield
except:
failed = True
finally:
my_lock.release()
lance.write_dataset(data, "s3://bucket/path/", commit_lock=commit_lock)
When the context manager is exited, it will raise an exception if the commit failed. This might be because of a network error or if the version has already been written. Either way, the context manager should release the lock. Use a try/finally block to ensure that the lock is released.
Warning
Lance _detects_ but does not yet resolve conflicts. So if there are multiple writers, only one will succeed if they try to commit at exactly the same time. This will be fixed in future releases.
Reading Lance Dataset¶
To open a Lance dataset, use the lance.dataset()
function:
import lance ds = lance.dataset("s3://bucket/path/imagenet.lance") # Or local path ds = lance.dataset("./imagenet.lance")Note
Lance supports local file system, AWS
s3
and Google Cloud Storage(gs
) as storage backends at the moment. See storages for more details.
The most straightforward approach for reading a Lance dataset is to utilize the lance.LanceDataset.to_table()
method in order to load the entire dataset into memory.
table = ds.to_table()
Due to Lance being a high-performance columnar format, it enables efficient reading of subsets of the dataset by utilizing Column (projection) push-down and filter (predicates) push-downs.
table = ds.to_table( columns=["image", "label"], filter="label = 2 AND text IS NOT NULL", limit=1000, offset=3000)
Lance understands the cost of reading heavy columns such as image
.
Consequently, it employs an optimized query plan to execute the operation efficiently.
Iterative Read¶
If the dataset is too large to fit in memory, you can read it in batches
using the lance.LanceDataset.to_batches()
method:
for batch in ds.to_batches(columns=["image"], filter="label = 10"): # do something with batch compute_on_batch(batch)
Unsurprisingly, to_batches()
takes the same parameters
as to_table()
function.
Filter push-down¶
Lance embraces the utilization of standard SQL expressions as predicates for dataset filtering. By pushing down the SQL predicates directly to the storage system, the overall I/O load during a scan is significantly reduced.
Currently, Lance supports a growing list of expressions.
>
,>=
,<
,<=
,=
AND
,OR
,NOT
IS NULL
,IS NOT NULL
IS TRUE
,IS NOT TRUE
,IS FALSE
,IS NOT FALSE
IN
LIKE
,NOT LIKE
regexp_match(column, pattern)
CAST
For example, the following filter string is acceptable:
((label IN [10, 20]) AND (note.email IS NOT NULL)) OR NOT note.created
If your column name contains special characters or is a SQL Keyword,
you can use backtick (`
) to escape it. For nested fields, each segment of the
path must be wrapped in backticks.
`CUBE` = 10 AND `column name with space` IS NOT NULL AND `nested with space`.`inner with space` < 2
Warning
Field names containing periods (.
) are not supported.
Literals for dates, timestamps, and decimals can be written by writing the string value after the type name. For example
date_col = date '2021-01-01' and timestamp_col = timestamp '2021-01-01 00:00:00' and decimal_col = decimal(8,3) '1.000'
For timestamp columns, the precision can be specified as a number in the type parameter. Microsecond precision (6) is the default.
SQL |
Time unit |
---|---|
|
Seconds |
|
Milliseconds |
|
Microseconds |
|
Nanoseconds |
Lance internally stores data in Arrow format. The mapping from SQL types to Arrow is:
SQL type |
Arrow type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
See precision mapping in previous table.
Random read¶
One district feature of Lance, as columnar format, is that it allows you to read random samples quickly.
# Access the 2nd, 101th and 501th rows data = ds.take([1, 100, 500], columns=["image", "label"])
The ability to achieve fast random access to individual rows plays a crucial role in facilitating various workflows such as random sampling and shuffling in ML training. Additionally, it empowers users to construct secondary indices, enabling swift execution of queries for enhanced performance.