Documentation
See the API Docs.
Installing
pip install marlinfs
Usage
Login
marlin.login()
Batch Ingestion
transform_client = marlin.transform_client(namespace, name, version, entities)
@transform_client.process_function
def process():
dep1 = transform_client.add_dependency('n1', 't1', 'v1', ['f1', 'f2'])
# Reading by timestamp
ingestion_time_read = dep1.read_by_ingestion_ts(1612140982, 1612150982)
event_time_read = dep1.read_by_event_ts(1612140982, 1612150982)
# Reading by date
ingestion_date_read = dep1.read_by_ingestion_date("2021-01-02-03", "2021-01-02-04")
event_date_read = dep1.read_by_event_date("2021-01-02-03", "2021-01-02-04")
# To commit metadata and store data
transform_client.commit()
# assumption is df contains event_timestamp column in date in this format: 2021-01-02-03.
# To pass different date format change return statement to df, {'date_format': 'str_date', 'str_date_format_type': '<python date format>' e.g. %Y-%m-%d})
# To pass event_timestamp in seconds change return statement to df, {'date_format': 'seconds'}
return df
Batch Serving
batch_serving_client = marlin.batch_training_client(namespace, name, version)
# batch_serving_client = marlin.batch_scoring_client(namespace, name, version)
@batch_serving_client.serving_function
def process():
entity_df = None # Some entity df
dep1 = batch_serving_client.add_dependency('n1', 't1', 'v1', ['f1', 'f2'])
dep2 = batch_serving_client.add_dependency('n2', 't1', 'v1', ['f1', 'f2'])
entity_df = pd.DataFrame([
[1, 1, 1, 1, "2021-01-02-03"],
[1, 1, 1, 1, "2021-01-02-03"]
], columns=['A', 'B', 'C', 'D', 'target_timestamp'])
dep1.point_in_time_join_by_date(entity_df)
dep1.point_in_time_join_across_inputs_by_date(entity_df, [dep2])
entity_df = pd.DataFrame([
[1, 1, 1, 1, 1612140982],
[1, 1, 1, 1, 1612140982]
], columns=['A', 'B', 'C', 'D', 'target_timestamp'])
dep1.point_in_time_join_by_ts(entity_df)
dep1.point_in_time_join_across_inputs_by_ts(entity_df, [dep2])
# To commit metadata
batch_serving_client.commit()
Exploration Client
exploration_client = marlin.exploration_client()
tf1 = exploration_client.get_transform('n1', 't1', 'v1')
tf2 = exploration_client.get_transform('n2', 't2', 'v1')
entity_df = pd.DataFrame([
[1, 1, 1, 1, "2021-01-02-03"],
[1, 1, 1, 1, "2021-01-02-03"]
], columns=['A', 'B', 'C', 'D', 'target_timestamp'])
tf1.point_in_time_join_by_date(entity_df)
tf1.point_in_time_join_across_inputs_by_date(entity_df, [tf2])
entity_df = pd.DataFrame([
[1, 1, 1, 1, 1612140982],
[1, 1, 1, 1, 1612140982]
], columns=['A', 'B', 'C', 'D', 'target_timestamp'])
tf1.point_in_time_join_by_ts(entity_df)
tf1.point_in_time_join_across_inputs_by_ts(entity_df, [tf2])
# Reading by timestamp
ingestion_time_read = tf1.read_by_ingestion_ts(1612140982, 1612150982)
event_time_read = tf1.read_by_event_ts(1612140982, 1612150982)
# Reading by date
ingestion_date_read = tf1.read_by_ingestion_date("2021-01-02-03", "2021-01-02-04")
event_date_read = tf1.read_by_event_date("2021-01-02-03", "2021-01-02-04")