Filter K8s Audit Logs
Abstract
This library provides a simple way to filter Kubernetes audit logs, if you, whit some reason, are not able to apply
audit policy directly at your cloud (e. g. in yandex cloud).
The library does not provide any service, it just give you easy way to filter audit logs in your python script
with AuditFilter class interfaces.
Instalation
pip install k8s-audit-filterUsage
You can easly modify your python script to filter audit logs.
Just import AuditFilter class, init it with your audit-policy.yaml file and use it's methods.
See an example of modification
of this script:
import json
import os
import boto3
import string
import random
from datetime import datetime
from k8s_audit_filter import AuditFilter # import AuditFilter class
def get_random_alphanumeric_string(length):
letters_and_digits = string.ascii_letters + string.digits
result_str = ''.join((random.choice(letters_and_digits) for i in range(length)))
return result_str
client = boto3.client(
service_name='s3',
endpoint_url='https://storage.yandexcloud.net',
region_name='ru-central1'
)
def handler(event, context):
for log_data in event['messages']:
full_log = []
for log_entry in log_data['details']['messages']:
kubernetes_log = json.loads(log_entry['message'])
full_log.append(json.dumps(kubernetes_log))
audit_filter = AuditFilter('path/to/audit_policy.yaml') # init AuditFilter class with path to audit policy file
filtered_log = [line for line in full_log if audit_filter.filter(full_log)] # filter audit logs
bucket_name = os.environ.get('BUCKET_NAME')
object_key = 'AUDIT/' + os.environ.get('CLUSTER_ID') + '/' + datetime.now().strftime(
'%Y-%m-%d-%H:%M:%S') + '-' + get_random_alphanumeric_string(5)
object_value = '\n'.join(filtered_log) # prepare data to load
client.put_object(Bucket=bucket_name, Key=object_key, Body=object_value,
StorageClass='COLD') # load data to cloud storageAlso you can update your policy dinamically, just use add_rule and remove_rule method:
from k8s_audit_filter import AuditFilter
audit_filter = AuditFilter() # init AuditFilter class with blink audit policy
audit_filter.add_rule({'level': 'Metadata'})
audit_filter.filter({'level': 'Metadata'}) # return True
audit_filter.remove_rule({'level': 'Metadata'})
audit_filter.filter({'level': 'Metadata'}) # return FalseDescribing Audit Policy
You can use find the way to describe k8s audit policy rules in Official Kubernetes Documentation at https://kubernetes.io/docs/reference/config-api/apiserver-audit.v1/
See example of audit policy:
apiVersion: audit.k8s.io/v1 # This is required.
kind: Policy
# Don't generate audit events for all requests in RequestReceived stage.
omitStages:
- "RequestReceived"
rules:
# Include line in the audit log which contains verb "get" and have level "Metadata"
- level: Metadata
verbs:
- "get"
# Exclude line in the audit log which contains verb "create"
- level: None
verbs:
- "create"
Supported Rules
The library supports the following rules:
levelverbs
Please note, that level is required field for every rule, and should have of one of next values:
-
None- do not log events that match this rule -
Metadata- log line marked as "Metadata" -
Request- log line marked as "Request" -
RequestResponse- log line marked as "RequestResponse"
Limitations
Currently, the library does not support the following rules:
-
users- will be supported in the future -
userGroups- will be supported in the future -
nonResourceURLs- will be supported in the future -
resources- will be supported in the future -
namespaces- will be supported in the future -
omitStages- now it does not distinguish any stages, and filter any string provided toAuditFilter.filter() omitManagedFields