crossdeploy

A declarative approach to manage MLOps resources


License
MPL-2.0
Install
pip install crossdeploy==0.1.8

Documentation


crossdeploy


crossdeploy is a declarative approach to manage MLOps resources.

Simply define the desired state of the resources and the necessary steps required to get to that state are executed automatically.

Subsequent modifications to the resources are automatically created, updated or deleted based on dependencies inferred between them.

Under the hood, crossdeploy is a thin wrapper around provisioning engines such as Terraform or Pulumi (coming soon) to manage resources.

Why crossdeploy?

1. Productivity

Eliminate boilerplate codes and the need to manually track dependencies between resources.

Boilerplate codes are snippets of utility codes that are applied to each type of resource for various tasks, including,

  • Checking if a resource exists before creating or updating them. Without these checks, simply running the same code will result in duplicate resources.

  • Checking if an attribute of a resource requires it to be updated or destroyed and recreated.

  • Maintaining dependencies within the MLOps pipeline to check if downstream resources needs to be updated if upstream resources have been modified.

In a typical machine learning project, resources such as models, deployments, service providers, subscriptions, monitor instances, payload records, etc, can range anywhere from tens to hundreds. Writing boilerplate codes such as the example below or manually managing state and dependences leads to low productivity and inefficiencies.

For example, on IBM Cloud Pak for Data, when saving a model to a project space, promoting to a deployment space and creating a deployment, compare the two approaches below.

Imperative approach

# save model to project
model_details = wml_client.repository.store_model(model, meta_props)

# promote model to deployment space
promoted_model_id = wml_client.repository.promote_model(
    model_id=model_id, 
    source_project_id=PROJECT_ID, 
    target_space_id=SPACE_ID
)

# create deployment
deployment_details = wml_client.deployments.create(promoted_model_id, meta_props=meta_props)

Declarative approach

flow = CrossDeploy()

# define model attributes
model = flow.ibm.Model(
    name = MODEL_NAME,
    project_id = PROJECT_ID,
    type = "scikit-learn_1.1",
    software_spec = "runtime-22.2-py3.10",
    model_path = MODEL_PATH,
)

# define which deployment space to promote the model
promoted_model = flow.ibm.PromotedModel(model)(
    project_id = PROJECT_ID,
    space_id = SPACE_ID,
    asset_id = model.id,
)

# define deployment attributes
deployment = flow.ibm.Deployment(
    name = DEPLOYMENT_NAME,
    space_id = SPACE_ID,
    asset = promoted_model.id,
    online = True,
)

flow.apply()

In the first snippet (imperative), storing, promoting and deploying the model is done explicitly by defining each step.

While the second snippet (declarative), defines the desired state of a deployment, which depends on a promoted model, which depends on a model in a project.

When a model gets updated, running the above code imperatively results in duplicated resources as there are no additional checks to determine if the model exists in the project or deployment space.

Boilerplate codes like this, introduces unnecessary overhead and can become difficult to maintain as the number of resources increases.

# delete deployment if exists in deployment space
for x in wml_client.deployments.get_details()["resources"]:
    if x["metadata"]["name"] == DEPLOYMENT_NAME:
        wml_client.deployments.delete(x["metadata"]["id"])

# delete model if exists in deployment space
for x in wml_client.repository.get_model_details()["resources"]:
    if x["metadata"]["name"] == MODEL_NAME:
        wml_client.repository.delete(x["metadata"]["id"])

However, when running the above code declaratively, the desired state of the model, promoted model and deployment has only one instance, crossdeploy will destroy the earlier version of the model and its promoted version, create a new model, create a new promoted model and update the existing deployment's asset to the new promoted model.

An example of the output looks like this,

Plan: 0 to add, 0 to change, 1 to destroy.
...
ibmcpd_model.crossdeploy_mortgagemodelrf2_82A12E72: Destroying... [id=65b21809-10a9-453c-8026-1991cb249498]
ibmcpd_model.crossdeploy_mortgagemodelrf2_82A12E72: Destruction complete after 3s

Apply complete! Resources: 0 added, 0 changed, 1 destroyed.

2. Reusability

Adapt an existing template to a new use case without modifying a line of code.

In most cases, reusing an existing asset or template, on a new case often results in rewritting majority of the code due to changes in requirements and architecture.

crossdeploy abstracts operational underlying processes so that users can focus only what's relevant.

Consider the below example (simplified to demonstrate concept, refer to notebook examples for full details) of using a simple template to monitor the quality of a model on Watson OpenScale.

@dataclasses.dataclass
class SimpleModelMonitor:
    config: dict
    def __post_init__(self):
        flow = CrossDeploy()
        flow.ibm.Provider(url=CPD_URL, username=CPD_USERNAME, api_key=CPD_API_KEY)
        
        model_rf = flow.ibm.Model(...)
        
        promoted_model_rf = flow.ibm.PromotedModel(model_rf)(...)
        
        deployment = flow.ibm.Deployment(...)
        
        subscription = flow.ibm.Subscription(
            id = "subscription-1",
            name = SUBSCRIPTION_NAME,
            data_mart_id = DATA_MART_ID,
            service_provider_id = SERVICE_PROVIDER,
            asset = flow.ibm.SubscriptionAsset(...),
            deployment = flow.ibm.SubscriptionDeployment(...),
            asset_properties = flow.ibm.SubscriptionAssetProperties(...),
            training_data_schema = flow.ibm.get_training_data_schema(training_data_schema),
            training_data_reference = flow.ibm.SubscriptionTrainingDataReference(...),
            payload_file = export_payload_data(X),
        )

        quality_monitor = flow.ibm.MonitorInstance(
            id = "quality_monitor",
            data_mart_id = DATA_MART_ID,
            monitor_definition_id = "quality",
            subscription_id = subscription.id,
            parameters = flow.ibm.MonitorInstanceParameters(
                min_feedback_data_size = 100
            ),
            thresholds = [flow.ibm.MonitorInstanceThresholds(**config["config_quality_monitor"])],
        )

        flow.ibm.Record(
            id = "feedback_record",
            subscription_id = subscription.id,
            type = "feedback",
            file_path = export_payload_data(df),
            depends_on = [quality_monitor],
        )

        self.flow = flow

    def apply(self):
        self.flow.apply()

Assuming the existing configuration has already been defined and there is a new requirement to build a new model and change the quality monitor threshold,

config_model = {
    "model": new_model, # new sklearn model
}

config_quality_monitor = {
    "metric_id": "area_under_roc",
    "type": "lower_limit",
    "value": 0.9, # new quality monitor threshold
}

config = {
    "config_model": config_model,
    "config_quality_monitor": config_quality_monitor,
}

monitor_flow = SimpleModelMonitor(config)
monitor_flow.apply()

Running the above mutiple times with changes to the configuration inputs for any resource, will result in the appropriate actions being taken.


Not all resources should be destroyed and recreated. Some may need to be updated, while others may need to be recreated.

When there is significant change to a model, it makes sense to destroy and recreate the model. However, if there is a need to just change the model name, it does not makes sense to destroy and recreate the model. An update to the model would suffice. Once again, this adds on to boilerplate codes that are required to check what attribute should be updated for underlying resource, or whether the resource should be destroyed and recreated.

This is especially crucial in areas like model monitoring or governance, where it is important to keep the linage and history of the model. In an IBM Watson OpenScale subscription, when the threshold of a Quality Monitor needs to be adjusted, should the monitor be destroyed and recreated?

By destroying and recreating the monitor, the history of the monitor would be lost and valuable insights would not be available when reviewing the performance of the model. An update would be more appropriate as it preserves historical evaluation data. Using the user interface to update the threshold is an option but may not be feasible when dealing with hundreds of models in production. On the other hand, if there is a significant change in the underlying model, it may be reasonable to destroy and recreate the entire subscription.

As the number of resources in your project increases, it gets exponentially tedious and inefficient to manually check if each attribute for each resources needs to be recreated or updated.

With a declarative approach, the state of each resource is tracked and its dependencies are implicitly inferred so that appropriate actions, such as to recreate or update, can be executed automatically, which allow users to focus on reusing the template.


3. Extendability

In some cases, using an existing template may not be sufficient as there might be new requirements, and additional resources are needed.

crossdeploy is designed to create building blocks using Python objects from MLOps resources. These building blocks can then be stacked and extended like Lego blocks.

What started out as serving a model using Watson Machine Learning,

flow = CrossDeploy()

# define provider
flow.ibm.Provider(url="xxx", username="xxx", api_key="xxx")

# define model attributes
model = flow.ibm.Model(
    name = MODEL_NAME,
    project_id = PROJECT_ID,
    type = "scikit-learn_1.1",
    software_spec = "runtime-22.2-py3.10",
    model_path = MODEL_PATH,
)

# define which deployment space to promote the model
promoted_model = flow.ibm.PromotedModel(model)(
    project_id = PROJECT_ID,
    space_id = SPACE_ID,
    asset_id = model.id,
)

# define deployment attributes
deployment = flow.ibm.Deployment(
    name = DEPLOYMENT_NAME,
    space_id = SPACE_ID,
    asset = promoted_model.id,
    online = True,
)

Can be easily extended to model monitoring using OpenScale,

service_provider = flow.ibm.ServiceProvider(
    id = "service-provider-1",
    name = "WML - Dev",
    service_type = "watson_machine_learning",
    operational_space_id = "pre_production",
    deployment_space_id = SPACE_ID,
    cpd_user_name = CPD_USERNAME,
    cpd_api_key = CPD_API_KEY,
    cpd_url = CPD_URL,
)

subscription = flow.ibm.Subscription(
    id = "subscription-1",
    name = SUBSCRIPTION_NAME,
    data_mart_id = DATA_MART_ID,
    service_provider_id = service_provider.id,
    asset = flow.ibm.SubscriptionAsset(
        asset_id = promoted_model_rf.id,
        asset_type = "model",
        input_data_type = "structured",
        problem_type = "binary",
        url = deployment.url,
    ),
    deployment = flow.ibm.SubscriptionDeployment(
        deployment_id = deployment.id,
        deployment_type = "online",
        deployment_url = deployment.url,
    ),
    asset_properties = flow.ibm.SubscriptionAssetProperties(
        categorical_fields = categorical_fields,
        feature_fields = feature_fields,
        label_column = label_column,
        prediction_field = "prediction",
        probability_fields = ["probability"],
    ),
    training_data_schema = flow.ibm.get_training_data_schema(training_data_schema),
    training_data_reference = flow.ibm.SubscriptionTrainingDataReference(...),
    payload_file = export_payload_data(X),
)

...

flow.ibm.MonitorInstance(
    id = "drift_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "drift",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        min_samples = 100,
        drift_threshold = 0.1,
        train_drift_model = True,
        enable_model_drift = True,
        enable_data_drift = True,
    )
)

flow.ibm.MonitorInstance(
    id = "fairness_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "fairness",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        favourable_class = ["NO"],
        unfavourable_class = ["YES"],
        min_records = 100,
        features = [
            flow.ibm.MonitorInstanceParametersFeatures(
                feature = "AppliedOnline", 
                majority = ["YES"], 
                minority = ["NO"], 
                threshold = 0.95
            )
        ]
    ),
    thresholds = [
        flow.ibm.MonitorInstanceThresholds(
            metric_id = "fairness_value",
            type = "lower_limit",
            value = 90.0,
        ),
    ],
)

flow.ibm.MonitorInstance(
    id = "explainability_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "explainability",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        enabled = True
    )
)

flow.ibm.MonitorInstance(
    id = "mrm_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "mrm",
    subscription_id = subscription.id,
)

flow.apply()

Refer to more examples here.


4. Consistency

Defining desired states and abstracting opertional underlying processes provides a consistent approach to deliver templates.

Some features such as OpenScale Batch Support or OpenScale Headless Monitors, may not be used as frequently as its counterparts. This sometimes results in lesser examples and tutorials on these features.

crossdeploy treats these features the same as the others, and can be stacked as building blocks like earlier examples.

For example, by swapping out the model and service_provider, with a pyspark_model and iae_service_provider block, and some minor configuration changes, the entire flow remains the same.

pyspark_model = flow.ibm.Model(
    name = PYSPARK_MODEL_NAME,
    project_id = PROJECT_ID,
    type = "mllib_3.3",
    software_spec = "spark-mllib_3.3",
    model_path = PYSPARK_MODEL_PATH,
)

iae_service_provider = flow.ibm.ServiceProvider(
    id = "service-provider-iae",
    name = "IBM Analytics Engine - Dev",
    service_type = "iae",
    operational_space_id = "production",
    ...
)

5. Ecosystem

Establishing a common interface for resources across multi-cloud providers

One of the advantages of using Terraform as a provisioning engine is its rich ecosystem. This could be a competitive advantage for Watson OpenScale as it has the ability to monitor models from other cloud providers.

This is especially useful for users on other cloud providers and would like to be able to quickly get Watson OpenScale up and running.

In the below example, a SKLearn model is deployed on AWS SageMaker and monitored on Watson OpenScale.

flow = CrossDeploy()

flow.ibm.Provider(url=CPD_URL, username=CPD_USERNAME, api_key=CPD_API_KEY)

flow.aws.Provider(access_key=AWS_ACCESS_KEY_ID, secret_key=AWS_SECRET_ACCESS_KEY, region="us-west-1")

current = flow.aws.DataAwsCallerIdentity(id_="current")

image = flow.aws.DataAwsSagemakerPrebuiltEcrImage(id_="image", repository_name="sagemaker-scikit-learn", image_tag="1.0-1")

model = flow.aws.SagemakerModel(
    id_="model",
    name="pipeline",
    execution_role_arn = f"arn:aws:iam::{current.account_id}:role/SageMakerRole",
    container = [
        flow.aws.SagemakerModelContainer(
            image = image.registry_path,
            model_data_url = "s3://sagemaker-us-west-1-xxx/preprocessor.tar.gz",
            environment = {
                "SAGEMAKER_SUBMIT_DIRECTORY": "s3://sagemaker-us-west-1-xxx/preprocessor.tar.gz",
                "SAGEMAKER_PROGRAM": "preprocessor.py"
            }
        ),
        flow.aws.SagemakerModelContainer(
            image = image.registry_path,
            model_data_url = "s3://sagemaker-us-west-1-xxx/model.tar.gz",
            environment = {
                "SAGEMAKER_SUBMIT_DIRECTORY": "s3://sagemaker-us-west-1-xxx/model.tar.gz",
                "SAGEMAKER_PROGRAM": "inference.py"
            }
        )
    ],
)

endpoint_config = flow.aws.SagemakerEndpointConfiguration(
    scope=flow.stack,
    id_="endpoint_config",
    name="pipeline-endpoint",
    production_variants = [
        flow.aws.SagemakerEndpointConfigurationProductionVariants(
            variant_name = "AllTraffic",
            model_name = model.name,
            initial_instance_count = 1,
            instance_type = "ml.t2.medium",
        )
    ],
)

endpoint = flow.aws.SagemakerEndpoint(
    scope=flow.stack,
    id_="endpoint",
    name="pipeline-endpoint",
    endpoint_config_name = endpoint_config.name,
)

service_provider = flow.ibm.ServiceProvider(
    id = "service-provider-1",
    name = "AWS - Dev",
    service_type = "amazon_sagemaker",
    operational_space_id = "pre_production",
    aws_access_key_id = AWS_ACCESS_KEY_ID,
    aws_secret_access_key = AWS_SECRET_ACCESS_KEY,
    aws_region = "us-west-1",
)

sagemaker_model = flow.ibm.DataIbmcpdSpAssets(
    id = "sagemaker_model",
    service_provider_id = service_provider.id
)

subscription = flow.ibm.Subscription(
    id = "subscription-1",
    name = SUBSCRIPTION_NAME,
    data_mart_id = DATA_MART_ID,
    service_provider_id = service_provider.id,
    asset = flow.ibm.SubscriptionAsset(
        asset_id = "${" + get_sp_asset(sagemaker_model) + ".asset_id}",
        asset_type = "model",
        input_data_type = "structured",
        problem_type = "binary",
        url = "${" + get_sp_asset(sagemaker_model) + ".deployment_rn}",
    ),
    deployment = flow.ibm.SubscriptionDeployment(
        deployment_id = "${" + get_sp_asset(sagemaker_model) + ".deployment_id}",
        deployment_type = "online",
        deployment_url = "pipeline-endpoint",
    ),
    asset_properties = flow.ibm.SubscriptionAssetProperties(
        categorical_fields = categorical_fields,
        feature_fields = feature_fields,
        label_column = label_column,
        prediction_field = "predicted_label",
        probability_fields = ["score"],
    ),
    training_data_schema = flow.ibm.get_training_data_schema(training_data_schema),
    training_data_reference = flow.ibm.SubscriptionTrainingDataReference(...),
    payload_file = "payload.json",
)


quality_monitor = flow.ibm.MonitorInstance(
    id = "quality_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "quality",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        min_feedback_data_size = 100
    ),
    thresholds = [
        flow.ibm.MonitorInstanceThresholds(
            metric_id = "area_under_roc",
            type = "lower_limit",
            value = 0.9,
        ),
    ],
)

flow.ibm.Record(
    id = "feedback_record",
    subscription_id = subscription.id,
    type = "feedback",
    file_path = "feedback.json",
    depends_on = [quality_monitor],
)

flow.ibm.MonitorInstance(
    id = "drift_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "drift",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        min_samples = 100,
        drift_threshold = 0.1,
        train_drift_model = False,
        enable_model_drift = True,
        enable_data_drift = True,
    ),
    drift_archive_path = "drift_detection_model.tar.gz",
)

flow.ibm.MonitorInstance(
    id = "fairness_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "fairness",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        favourable_class = ["NO"],
        unfavourable_class = ["YES"],
        min_records = 100,
        features = [
            flow.ibm.MonitorInstanceParametersFeatures(
                feature = "AppliedOnline", 
                majority = ["YES"], 
                minority = ["NO"], 
                threshold = 0.95
            )
        ]
    ),
    thresholds = [
        flow.ibm.MonitorInstanceThresholds(
            metric_id = "fairness_value",
            type = "lower_limit",
            value = 80.0,
        ),
    ],
)

flow.ibm.MonitorInstance(
    id = "explainability_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "explainability",
    subscription_id = subscription.id,
    parameters = flow.ibm.MonitorInstanceParameters(
        enabled = True
    )
)

flow.ibm.MonitorInstance(
    id = "mrm_monitor",
    data_mart_id = DATA_MART_ID,
    monitor_definition_id = "mrm",
    subscription_id = subscription.id,
)

flow.apply()

The value of crossdeploy is further amplified as it becomes even more challenging to manage resources across multiple clouds.


How is this different from using my Python utility scripts?

As the complexity of your project increases, the number of resources naturally increases too.

A simple project that started with model deployment can evolve to complex pipelines which includes, model monitoring, AI governance, data observability, etc., and the number of resources can easily range from tens to hundreds.

  1. It is almost impossible to write boilerplate codes to check and manage if each resource and its attributes needs to be recreated or updated.

  2. It is challenging to manage the state and track the lineage and dependencies between the resources.

  3. It requires additional effort to maintain these utility scripts. In some cases, where each individual have their own version, this may introduce inconsistencies and errors across the organization.

crossdeploy is designed to handle all of the above challenges in a streamlined and efficient way.

Flexible execution using Code based or Command Line Interface (CLI) based or both

  1. Command Line Interface (CLI) based using Terraform or Pulumi (coming soon)

Standard way of using Terraform, with a custom IBM Cloud Pak for Data provider.

# main.tf
terraform {
  required_providers {
    ibmcpd = {
      source = "randyphoa/ibmcpd"
    }
  }
}
provider "ibmcpd" {
  url = "xxx"
  username = "xxx"
  api_key = "xxx"
}
resource "ibmcpd_model" "mortgage_model" {
  name = "mortgage-model"
  type = "scikit-learn_1.1"
  software_spec = "runtime-22.2-py3.10"
  project_id = "764ffa76-8fb0-4042-a67b-cfba2ad8085b"
  model_path = "model.tar.gz"
}
# from command line
terraform apply
  1. Code based (Typescript, Python, Java, C# and Go)
# using Python
flow = CrossDeploy()

model = flow.ibm.Model(
    name = "mortgage-model",
    project_id = "764ffa76-8fb0-4042-a67b-cfba2ad8085b",
    type = "scikit-learn_1.1",
    software_spec = "runtime-22.2-py3.10",
    model_path = "model.tar.gz",
)

flow.apply()
  1. Build using code, run using CLI
# using Python
flow = CrossDeploy()

model = flow.ibm.Model(
    name = "mortgage-model",
    project_id = "764ffa76-8fb0-4042-a67b-cfba2ad8085b",
    type = "scikit-learn_1.1",
    software_spec = "runtime-22.2-py3.10",
    model_path = "model.tar.gz",
)

flow.synth()
# from command line
terraform apply

The above examples are exactly the same.

Installing

crossdeploy is a polyglot library and supports Typescript, Python, Java, C# and Go, all from the same codebase. This is done by leveraging CDK for Terraform and AWS Cloud Development Kit.

A node runtime is required, refer to this link for more details on specific node versions.

On IBM Cloud Pak for Data (on-premise) and IBM Cloud Pak for Data as a Service, node is pre-installed for Jupyter Notebooks and JupyterLab.

Simply run the below command to install crossdeploy.

pip install crossdeploy

Currently, it is only available for Linux operating systems. Mac and Windows versions will be released shortly.

Getting started