volcengine-sdk-rec

SDK for volcengine


License
Apache-2.0
Install
pip install volcengine-sdk-rec==1.3.1

Documentation

volcengine data/predict api sdk, python version

import uuid
from datetime import datetime, timedelta

from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Region, Option, NetException, BizException, metrics

# 必传,租户id.
TENANT_ID = "xxx"
# 必传,应用id.
APPLICATION_ID = "xxx"
# 必传,密钥AK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
AK = "xxx"
# 必传,密钥SK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
SK = "xxx"

client: Client = ClientBuilder() \
    .tenant_id(TENANT_ID) \
    .application_id(APPLICATION_ID) \
    .ak(AK) \
    .sk(SK) \
    .region(Region.AIR_CN) \
    .build()
# metrics上报初始化.建议开启,方便火山侧排查问题.
metrics.init(())


def write():
    # 此处为测试数据,实际调用时需注意字段类型和格式
    data_list = [
        {
            "id": "item_id1",
            "title": "test_title1",
            "status": 0,
            "brand": "volcengine",
            "pub_time": 1583641807,
            "current_price": 1.1,
        },
        {
            "id": "item_id2",
            "title": "test_title2",
            "status": 1,
            "brand": "volcengine",
            "pub_time": 1583641503,
            "current_price": 2.2,
        }
    ]
    # topic为枚举值,请参考API文档
    topic = "item"
    # 传输天级数据
    opts = (
        # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        # 必传,数据产生日期,实际传输时需修改为实际日期
        Option.with_data_date(datetime(year=2022, month=1, day=1)),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_request_id(str(uuid.uuid1())),
    )
    try:
        write_response = client.write_data(data_list, topic, *opts)
    except BizException as e:
        print("[write] occur err, msg: %s" % e)
        return
    if not write_response.status.success:
        print("[write] failure")
        return
    print("[write] success")
    return


def done():
    date_list = [datetime(year=2022, month=1, day=1)]
    # topic为枚举值,请参考API文档
    topic = "item"
    opts = (
        # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_request_id(str(uuid.uuid1())),
    )
    try:
        done_response = client.done(date_list, topic, *opts)
    except BizException as e:
        print("[done] occur err, msg: %s" % e)
        return
    if not done_response.status.success:
        print("[done] failure")
        return
    print("[done] success")
    return


def predict():
    # 构造predict请求体
    predict_request = PredictRequest()
    user = predict_request.user
    user.uid = 'uid1'
    context = predict_request.context
    context.spm = "1$##$2$##$3$##$4"
    context.extra["extra_key"] = "extra_value"
    
    feature = context.feature
    feature.stringFeature["key"] = "value1"
    feature.stringFeature["key"] = "value2"
    feature.stringArrayFeature["array_key"].values.append("array_value1")
    feature.stringArrayFeature["array_key"].values.append("array_value2")
    filter = context.filter
    filter.stringFilter["key"] = "value"
    filter.stringArrayFilter["array_key"].values.append("array_value1")
    filter.stringArrayFilter["array_key"].values.append("array_value2")
    
    candidate_item1 = predict_request.candidateItems.add()
    candidate_item1.id = "item_id1"
    candidate_item2 = predict_request.candidateItems.add()
    candidate_item2.id = "item_id2"
    opts = (
        Option.with_request_id(str(uuid.uuid1())),
        Option.with_scene("default"),
        Option.with_timeout(timedelta(milliseconds=1000)),
        Option.with_headers({"Enable-Spm-Route": "true"})
    )
    try:
        rsp = client.predict(predict_request, *opts)
    except (NetException, BizException) as e:
        print("[predict] occur error, msg: %s" % e)
        return
    if not rsp.success:
        print("[predict] failure")
        return
    print("[predict] success")


def callback():
    # 构造callback请求体
    callback_request = CallbackRequest()
    # 对应的predict请求的request id
    callback_request.predict_request_id = "xxx"
    # 对应的predict请求的uid
    callback_request.uid = "uid1"
    # 对应的predict请求的scene.
    callback_request.scene = "default"
    # 对应的predict请求的items列表
    callback_item1 = callback_request.items.add()
    callback_item1.id = "item_id1"
    callback_item1.pos = "position1"
    callback_item1.extra = "{\"reason\":\"exposure\"}"
    callback_item2 = callback_request.items.add()
    callback_item2.id = "item_id2"
    callback_item2.pos = "position2"
    callback_item2.extra = "{\"reason\":\"filter\"}"
    # callback请求上下文
    callback_context = CallbackContext()
    callback_context.spm = "1$##$2$##$3$##$4"
    callback_request.context = callback_context
    opts = (
        Option.with_request_id(str(uuid.uuid1())),
        Option.with_timeout(timedelta(milliseconds=1000)),
    )
    try:
        rsp = client.callback(callback_request, *opts)
    except (NetException, BizException) as e:
        print("[callback] occur error, msg: %s" % e)
        return
    if not rsp.success:
        print("[callback] failure")
        return
    print("[callback] success")