lxd

LXD Client


License
MIT
Install
pip install lxd==0.2.16

Documentation

Async python client for LXD REST API (currently under heavy development).

Usage

Installation

pip install lxd

Initialize client

import asyncio
from pathlib import Path

from yarl import URL

from lxd.client import lxd_client


async def main():
    client = lxd_client(
        URL('https://mylxd:8443/'),
        cert_path=Path('~/.config/lxc/client.crt'),
        key_path=Path('~/.config/lxc/client.key'),
        endpoint_cert_path=Path('~/.config/lxc/servercerts/mylxd.crt'),
    )

    await client.authenticate(
        cert_path=Path('~/.config/lxc/client.crt'),
        password='your-trust-password'
    )


asyncio.run(main())

Example usages

# Recursion 0 returns only links to objects,
# you can resolve them by awaiting
instance_links = await client.instances.list(recursion=0)
instance = await instance_links[0]

# Recursion 1 returns only some fields
instances = await client.instances.list(recursion=1)

# Recursion 2 returns all possible information
instances = await client.instances.list(recursion=2)

Change instance state

from lxd.entities.instances import InstanceAction

instances = await client.instances.list()
operation = await client.instances.update_state(
    instances[0].name, action=InstanceAction.STOP
)
await client.operations.wait(operation.id)  # wait as long as possible
await client.operations.wait(operation.id, timeout=30)  # 30s

Get event stream

async for event in client.server.get_events():
    # See Event object for more properties
    print(event.type)
    print(event.metadata)

Available Endpoints

Server

server.get

Get server environment and configuration.

# See lxd.entities.server.Server
info = await client.server.get()
print(info.config)
print(info.environment)

server.update_configuration

Update the entire server configuration.

await client.server.update_configuration({
    'core.https_address': '0.0.0.0:8443'
    'core.trust_password': 'very-strong-password'
})

server.update_configuration_subset

Update a subset of the server configuration.

await client.server.update_configuration_subset({
    'images.remote_cache_expiry': 2
})

server.get_resources

Gets the hardware information profile of the LXD server.

# See lxd.entities.server.ServerResources
server_resources = await client.server.get_resources()
print(server_resources.cpu)

server.get_events

Connect to event API using websocket.

# Listen all events
async for event in client.server.get_events():
    print(event.type)
    print(event.metadata)

# Listen to specific events
async for event in client.server.get_events(type='operation'):
    print(event.metadata.id)
    print(event.metadata.status)

Certificates

certificates.list

Returns a list of trusted certificates.

# See lxd.entities.certificates.Certificate
certs = await client.certificates.list()
print(certs[0].fingerprint)

If you pass recursion=0 parameter, lxd would return just references, which are represented in current module as lxd.entities.certificates.CertificateLink objects.

If you await such link object - you would get object itself (separate http request is performed for every await call).

cert_links = await client.certificates.list(recursion=0)
certs = await asyncio.gather(*cert_links)

certificates.get

Gets a specific certificate entry from the trust store by fingerprint.

from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives import hashes

fprint = '97f267c0fe20fd013b6b4ba3f5440ea3e9361ce8568d41c633f28c620ab37ea0'
cert = await client.certificates.get(fprint)

cert_obj = load_pem_x509_certificate(cert.certificate.encode())
assert cert_obj.fingerprint(hashes.SHA256()).hex() == fprint

certificates.add

Adds a certificate to the trust store as trusted user (client certificate should be trusted).

from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa

private_key = rsa.generate_private_key(
    public_exponent=65537, key_size=2048, backend=default_backend()
)
subj = x509.Name([
    x509.NameAttribute(NameOID.COMMON_NAME, "alvassin@osx")
])

cert = x509.CertificateBuilder().subject_name(
    subj
).issuer_name(
    subj
).public_key(
    private_key.public_key()
).serial_number(
    x509.random_serial_number()
).not_valid_before(
    datetime.utcnow()
).not_valid_after(
    datetime.utcnow() + timedelta(days=365)
).sign(
    private_key=private_key,
    algorithm=hashes.SHA256(),
    backend=default_backend()
)

await client.certificates.add(
    cert.public_bytes(serialization.Encoding.PEM)
)

If password argument is specified, adds a certificate to the trust store as an untrusted user.

await client.certificates.add(
    cert.public_bytes(serialization.Encoding.PEM),
    password='your-trust-password'
)

certificates.update_configuration

Update the entire certificate configuration.

await client.certificates.update_configuration(
    '97f267c0fe20fd013b6b4ba3f5440ea3e9361ce8568d41c633f28c620ab37ea0',
    certificate='-----BEGIN CERTIFICATE-----\n...',
    name='new-name',
    projects=[],
    restricted=False,
    type='client'
)

certificates.update_configuration_subset

Update a subset of the certificate configuration.

await client.certificates.update_configuration_subset(
    '97f267c0fe20fd013b6b4ba3f5440ea3e9361ce8568d41c633f28c620ab37ea0',
    name='another-name'
)

certificates.remove

Removes the certificate from the trust store.

await client.certificates.remove(
    '97f267c0fe20fd013b6b4ba3f5440ea3e9361ce8568d41c633f28c620ab37ea0'
)

Instances

  • client.instances.list
  • client.instances.get
  • client.instances.create
  • client.instances.delete
  • client.instances.get_state
  • client.instances.update_state

Operations

  • client.operations.list
  • client.operations.get
  • client.operations.wait
  • client.operations.cancel

TODO