🚧 Please note that this package in a pre-release stage. Until version 1.x is officially released, the API should be considered unstable.
vastdb
is a Python-based SDK designed for interacting with VAST Database & VAST Catalog, enabling schema and table management, efficient ingest, query and modification of columnar data. For more details, see our whitepaper.
- Linux client with Python 3.9+ and a network access to the VAST cluster
- Virtual IP pool configured with DNS service
- S3 access & secret keys on VAST cluster
- Tabular identity policy with the proper permissions
pip install vastdb
Also, see our release notes.
Creating schemas and tables + basic inserts and selects:
import pyarrow as pa
import vastdb
session = vastdb.connect(
endpoint='http://vip-pool.v123-xy.VastENG.lab',
access=AWS_ACCESS_KEY_ID,
secret=AWS_SECRET_ACCESS_KEY)
with session.transaction() as tx:
bucket = tx.bucket("bucket-name")
schema = bucket.create_schema("schema-name")
print(bucket.schemas())
columns = pa.schema([
('c1', pa.int16()),
('c2', pa.float32()),
('c3', pa.utf8()),
])
table = schema.create_table("table-name", columns)
print(schema.tables())
print(table.columns())
arrow_table = pa.table(schema=columns, data=[
[111, 222, 333],
[0.5, 1.5, 2.5],
['a', 'bb', 'ccc'],
])
table.insert(arrow_table)
result = pa.Table.from_batches(table.select()) # SELECT * FROM t
assert result == arrow_table
# the transaction is automatically committed when exiting the context
Our SDK supports predicate and projection pushdown:
from ibis import _
# SELECT c1 FROM t WHERE (c2 > 2) AND (c3 IS NULL)
table.select(columns=['c1'],
predicate=(_.c2 > 2) & _.c3.isnull())
# SELECT c2, c3 FROM t WHERE (c2 BETWEEN 0 AND 1) OR (c2 > 10)
table.select(columns=['c2', 'c3'],
predicate=(_.c2.between(0, 1) | (table['c2'] > 10))
# SELECT * FROM t WHERE c3 LIKE '%substring%'
table.select(predicate=_.c3.contains('substring'))
It is possible to efficiently create a table from a Parquet file (without copying it via the client):
with tempfile.NamedTemporaryFile() as f:
pa.parquet.write_table(arrow_table, f.name)
s3.put_object(Bucket='bucket-name', Key='staging/file.parquet', Body=f)
schema = tx.bucket('bucket-name').schema('schema-name')
table = util.create_table_from_files(
schema=schema, table_name='imported-table',
parquet_files=['/bucket-name/staging/file.parquet'])
We can import multiple files concurrently into a table (by utilizing multiple CNodes' cores):
schema = tx.bucket('bucket-name').schema('schema-name')
table = util.create_table_from_files(
schema=schema, table_name='large-imported-table',
parquet_files=[f'/bucket-name/staging/file{i}.parquet' for i in range(10)])
Table.select()
returns a stream of PyArrow record batches, which can be directly exported into a Parquet file:
batches = table.select()
with contextlib.closing(pa.parquet.ParquetWriter('/path/to/file.parquet', batches.schema)) as writer:
for batch in table_batches:
writer.write_batch(batch)
We can use DuckDB to post-process the resulting stream of PyArrow record batches:
from ibis import _
import duckdb
conn = duckdb.connect()
batches = table.select(columns=['c1'], predicate=(_.c2 > 2))
print(conn.execute("SELECT sum(c1) FROM batches").arrow())
We can create, list and delete available semi-sorted projections:
p = table.create_projection('proj', sorted=['c3'], unsorted=['c1'])
print(table.projections())
print(p.get_stats())
p.drop()
It is possible to use snapshots for accessing the Database:
snaps = bucket.list_snapshots()
batches = snaps[0].schema('schema-name').table('table-name').select()
VAST Catalog can be queried as a regular table:
table = pa.Table.from_batches(tx.catalog.select(['element_type']))
df = table.to_pandas()
total_elements = len(df)
print(f"Total elements in the catalog: {total_elements}")
file_count = (df['element_type'] == 'FILE').sum()
print(f"Number of files/objects: {file_count}")
distinct_elements = df['element_type'].unique()
print("Distinct element types on the system:")
print(distinct_elements)
See the following blog posts for more examples: