pythonic-toolbox

a toolbox with pythonic utils, tools


Keywords
toolbox, python, python3, tools
License
Apache-2.0
Install
pip install pythonic-toolbox==1.1.39

Documentation

Pythonic toolbox

PyPI version License Supported Python versions Stability CodeQL Status Python3.6 Test Status SNYK Status

Table of Contents

README.md is auto generated by the script tests/generate_readme_markdown.py from testing files,

DO NOT EDIT DIRECTLY! ;)

python3 tests/generate_readme_markdown.py

Introduction

A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .

Installation

pip3 install pythonic-toolbox --upgrade

Usage

decorators

ignore_unexpected_kwargs

import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs

# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud

def foo(a, b=0, c=3):
    return a, b, c

dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
    assert foo(**dct) == (1, 2, 3)

wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)

assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)

@ignore_unexpected_kwargs
def bar(*args: int):
    return sum(args)

# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6

@ignore_unexpected_kwargs
def qux(a, b, **kwargs):
    # function with Parameter.VAR_KEYWORD Aka **kwargs
    return a, b, kwargs.get('c', 3), kwargs.get('d', 4)

assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)

class Person:
    @ignore_unexpected_kwargs
    def __init__(self, name, age, sex):
        self.name = name
        self.age = age
        self.sex = sex

    @classmethod
    @ignore_unexpected_kwargs
    def create(cls, name, age, sex):
        return cls(name, age, sex)

    @staticmethod
    @ignore_unexpected_kwargs
    def greetings(name):
        return f'Hello, I am {name}'

params = {
    'name': 'albert',
    'age': 34,
    'sex': 'male',
    'height': '170cm',
}
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')

# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)

retry

import pytest

from pythonic_toolbox.decorators.common import retry

# use decorator without any arguments, using retry default params
@retry
def func_fail_first_time():
    """func_fail_first_time"""
    self = func_fail_first_time
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'

@retry(tries=2, delay=0.1)  # use decorator with customized params
def func_fail_twice():
    """func_fail_twice"""
    self = func_fail_twice
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times <= 2:
        raise Exception('Fail when called first, second time')
    return 'ok'

assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'

@retry(tries=2, delay=0.1)
def func_fail_three_times():
    """func_fail_three_times"""
    self = func_fail_three_times
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times <= 3:  # 1, 2, 3
        raise Exception('Fail when called first, second, third time')
    return 'ok'

with pytest.raises(Exception) as exec_info:
    func_fail_three_times()
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'

def raw_func_fail_first_time():
    """func_fail_first_time"""
    self = raw_func_fail_first_time
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

assert retry(raw_func_fail_first_time)() == 'ok'

# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
    """func_fail_first_time"""
    self = func_fail_first_time_with_parameters
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return p1 + p2

assert func_fail_first_time_with_parameters(1, 2) == 3

def func_fail_first_time_with_parameters(p1, p2):
    """func_fail_first_time"""
    self = func_fail_first_time_with_parameters
    if not hasattr(self, 'call_times'):
        # set attribute call_times for function, to count call times
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return p1 + p2

assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3

import asyncio

@retry
async def async_func_fail_first_time():
    """async_func_fail_first_time"""
    self = async_func_fail_first_time
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

@retry(delay=0.1)
async def async_func_fail_first_time2():
    """async_func_fail_first_time2"""
    self = async_func_fail_first_time2
    if not hasattr(self, 'call_times'):
        self.call_times = 0
    self.call_times += 1
    if self.call_times == 1:
        raise Exception('Fail when first called')
    return 'ok'

async def async_main():
    assert await async_func_fail_first_time() == 'ok'
    assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
    assert async_func_fail_first_time.call_times == 2
    assert await async_func_fail_first_time2() == 'ok'
    assert async_func_fail_first_time2.call_times == 2
    assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'

loop = asyncio.get_event_loop()
if loop.is_closed():
    loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(async_main())
finally:
    loop.close()

import random
fail_count = 0

@retry(delay=0.1)
async def always_fail_func():
    nonlocal fail_count
    fail_count += 1
    await asyncio.sleep(random.random())
    raise ValueError()

async def async_main_for_always_fail():
    nonlocal fail_count
    tasks = [always_fail_func() for i in range(0, 3)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    assert all(map(lambda e: isinstance(e, ValueError), results))
    assert fail_count == 2 * 3  # each func run twice, three func calls

loop = asyncio.get_event_loop()
if loop.is_closed():
    loop = asyncio.new_event_loop()
try:
    loop.run_until_complete(async_main_for_always_fail())
finally:
    loop.close()

deque_utils

deque_pop_any

from collections import deque

import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any

queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])

# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])

# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])

queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
    deque_pop_any(queue, idx=102)

# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
    deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'

deque_split

import pytest

from collections import deque

from pythonic_toolbox.utils.deque_utils import deque_split

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])

queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])

with pytest.raises(ValueError) as exec_info:
    deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'

dict_utils

DictObj

from copy import deepcopy

import pytest
from pythonic_toolbox.utils.dict_utils import DictObj

naive_dct = {
    'key1': 'val1',
    'key2': 'val2',
}

obj = DictObj(naive_dct)

# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
    obj.popitem()

# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
    obj.pop('key4')
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
    obj.pop('key5')
with pytest.raises(AttributeError) as __:
    del obj.key5

# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj

person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}

person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
person.languages.append('Japanese')
assert person.languages == ['Chinese', 'English', 'Japanese']

person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'

person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'

expected = {
    'name': 'Albert', 'age': '34', 'sex': 'Male',
    'languages': ['Chinese', 'English', 'Japanese'],  # appended new language
    'height': '170cm',  # new added attribute
    'weight': '50kg',  # new added attribute
}
assert person.to_dict() == expected

repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
                      "'languages': ['Chinese', 'English', 'Japanese'],"
                      " 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected

# nested structure will be detected, and changed to DictObj
chessboard_data = {
    'position': [
        [{'name': 'knight'}, {'name': 'pawn'}],
        [{'name': 'pawn'}, {'name': 'queen'}],
    ]
}
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'

# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False

obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'

# params validation
invalid_key_dct = {
    1: '1',
}

# test when dict's key is not str
with pytest.raises(ValueError) as __:
    __ = DictObj(invalid_key_dct)

complicated_key_dct = {
    '1abc': 'Gotcha',  # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
}

obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc

assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'

# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class

# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
    __ = obj_dict['2x']
with pytest.raises(AttributeError):
    __ = getattr(obj_dict, '2x')

obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
    __ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
    __ = getattr(obj_dict, 'NewAttr')

# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)

assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class

# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
    # Oops!!! by-design
    # 'class' cannot be accessed as key anymore,
    # because we store '_class' as key as other valid keys behave
    assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'

# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized

class MyObjDict(DictObj):
    # implement a thread-safe method to increase the value of cnt
    @method_synchronized
    def increase_cnt_by_n(self, n):
        self.cnt += n

def increase_cnt_by_100(dict_obj):
    for i in range(100):
        dict_obj.increase_cnt_by_n(1)

sw_interval = sys.getswitchinterval()
try:
    sys.setswitchinterval(0.0001)
    my_dict_obj = MyObjDict({'cnt': 0})
    threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    assert my_dict_obj.cnt == 10000
finally:
    sys.setswitchinterval(sw_interval)

# test copy/deepcopy of DictObj
import copy

person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader

deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader

FinalDictObj

from typing import cast

import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj

person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}

fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'

# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
    fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str

with pytest.raises(RuntimeError) as __:
    fixed_person.popitem()

with pytest.raises(RuntimeError) as __:
    fixed_person.pop('name')

assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
    # list values are changed into tuple to avoid being modified
    cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct

# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
    'position': [
        [{'name': 'knight'}, {'name': 'pawn'}],
        [{'name': 'pawn'}, {'name': 'queen'}],
    ]
}
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
    chessboard_obj.position[1][1].name = 'knight'

# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
    'class': 'MyClass',  # 'class' is keyword in Python, so obj.class will cause SyntaxError
})
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'

# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader

deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader

RangeKeyDict

import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict

# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
    (float('-inf'), 0): 'Negative',
    (0, 60): 'F',  # 0 <= val < 60
    (60, 70): 'D',  # 60 <= val < 70
    (70, 80): 'C',  # 70 <= val < 80
    (80, 90): 'B',  # 80 <= val < 90
    (90, 100): 'A',  # 90 <= val < 100
    100: 'A+',  # val == 100
})

# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict['95']  # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'

assert range_key_dict.get(150, 'N/A') == 'N/A'

# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})

with pytest.raises(ValueError):
    # [1, 1) is not a valid range
    # there's no value x satisfy 1 <= x < 1
    RangeKeyDict({(1, 1): '1'})

with pytest.raises(ValueError):
    # [1, -1) is not a valid range
    RangeKeyDict({(1, -1): '1'})

# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
    RangeKeyDict({
        (0, 10): 'val-between-0-and-10',
        (0, 5): 'val-between-0-and-5'
    })
expected_error_msg = ("Duplicated left boundary key 0 detected: "
                      "(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg

with pytest.raises(ValueError) as exec_info:
    RangeKeyDict({
        (0, 10): 'val-between-0-and-10',
        (5, 15): 'val-between-5-and-15'
    })
expected_error_msg = ("Overlap detected: "
                      "(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg

# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
    (0, 60): 'F',  # 0 <= val < 60
    (70, 80): 'C',  # 70 <= val < 80
})

assert range_key_dict[10] == 'F'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'

with pytest.raises(KeyError) as exec_info:
    _ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'

from functools import total_ordering

@total_ordering
class Age:
    def __init__(self, val: float):
        if not isinstance(val, (int, float)):
            raise ValueError('Invalid age value')
        self.val = val

    def __le__(self, other):
        return self.val <= other.val

    def __repr__(self):
        return f'Age({repr(self.val)})'

    def __hash__(self):
        return hash(self.val)

age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
    (Age(0), Age(2)): 'Baby',
    (Age(2), Age(15)): 'Children',
    (Age(15), Age(25)): 'Youth',
    (Age(25), Age(65)): 'Adults',
    (Age(65), Age(123)): 'Seniors',
})

assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'

StrKeyIdDict

import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict

data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)

# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'}  # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'

# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'}  # StrKeyIdDict assumes all keys are strings

# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'}  # '4' is not in the dict anymore

# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'

with pytest.raises(TypeError):
    # key '1', 1 both stands for key '1',
    # so we get duplicated keys when initializing instance, oops!
    my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})

assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'

# test edge cases
assert StrKeyIdDict() == {}

# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
my_dict['5'].append('e4')
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']

# test deep copy
from copy import deepcopy

copy_dict = deepcopy(my_dict)
my_dict[5].append('e5')
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']

# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'

# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}

my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'

# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}

# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'

collect_leaves

from pythonic_toolbox.utils.dict_utils import collect_leaves

# a nested dict-like struct
my_dict = {
    'node_1': {
        'node_1_1': {
            'node_1_1_1': 'A',
        },
        'node_1_2': {
            'node_1_2_1': 'B',
            'node_1_2_2': 'C',
            'node_1_2_3': None,
        },
        'node_1_3': [  # dict list
            {
                'node_1_3_1_1': 'D',
                'node_1_3_1_2': 'E',
            },
            {
                'node_1_3_2_1': 'FF',
                'node_1_3_2_2': 'GG',
            }
        ]
    }}

expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected

expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected

assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []

expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected

expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
                      leaf_pred=lambda lf: lf == 'C') == expected

assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
                      leaf_pred=lambda lf: lf == 'C') == []

expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected

expected = ['FF', 'GG']
assert collect_leaves(my_dict,
                      keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
                      leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected

# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []

dict_until

from pythonic_toolbox.utils.dict_utils import dict_until

data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'

select_list_of_dicts

from pythonic_toolbox.utils.dict_utils import select_list_of_dicts

dict_lst = [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # another Peter Parker from multiverse
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # age unknown for Carol Danvers, no age field
    {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
    {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
]

assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
    {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
    {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
    {'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]

# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
    {'name': 'Tony Stark', 'age': 49},
    {'name': 'Peter Parker', 'age': 16},
    {'name': 'Peter Parker', 'age': 16},
]

assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
    {'name': 'Tony Stark', 'age': 49},
    {'name': 'Peter Parker', 'age': 16}]

# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']

# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
                            look_like={'alias': 'Captain Marvel'},
                            keys=['name', 'sex', 'age', 'alias'],
                            val_for_missing_key='Unknown')[0]['age'] == 'Unknown'

# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst

# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35

# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3

# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
                            preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected

# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
                            preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []

unique_list_of_dicts

from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts

dict_lst = [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # Peter Parkers from multiverse in same age.
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
    # test same dict, but the order of dict is different
    {'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
]

# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
    {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
    {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
]

# edge cases
assert unique_list_of_dicts([]) == []

walk_leaves

from pythonic_toolbox.utils.dict_utils import walk_leaves

data = {
    'k1': {
        'k1_1': 1,
        'k1_2': 2,
    },
    'k2': 'N/A',  # stands for not available
}

expected = {
    'k1': {
        'k1_1': 2,
        'k1_2': 4,
    },
    'k2': 'N/A',  # stands for not available
}
assert walk_leaves(data) == data  # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected

# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected

data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected

# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None

functional_utils

filter_multi

from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable

def is_even(x):
    return x % 2 == 0

def is_divisible_by_5(x):
    return x % 5 == 0

# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]

from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected

# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
    assert value == expected[idx]

# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
    if value > 50:
        break
    else:
        assert value == expected[idx]

list_utils

filter_allowable

from pythonic_toolbox.utils.list_utils import filter_allowable

fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']

foods = fruits + vegetables + meats

assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []

# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']

# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []

sort_with_custom_orders

from operator import itemgetter
from typing import List

import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders

# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]

values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected

values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected

assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []

# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]

# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
                               key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]

branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
                              prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
                              key=itemgetter('branch'))
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected

branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
                              prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
                              key=itemgetter('branch'))
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected

# tests for exceptions
with pytest.raises(ValueError) as exec_info:
    sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'

with pytest.raises(ValueError) as exec_info:
    sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'

# tests for class
class Person:
    def __init__(self, id, name, age):
        self.id = id
        self.name = name
        self.age = age

    def __lt__(self, other: 'Person'):
        return self.age < other.age

    def __eq__(self, other: 'Person'):
        return self.age == other.age

    def __hash__(self):
        return self.id

    def __str__(self):
        return f'Person({self.id}, {self.name}, {self.age})'

    def __repr__(self):
        return str(self)

Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)

persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected

expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected

unpack_list

import pytest
from pythonic_toolbox.utils.list_utils import unpack_list

first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'

first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None

first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None

first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0

first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']

# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None

def fib():
    a, b = 0, 1
    while 1:
        yield a
        a, b = b, a + b

# test case for type generator
fib_generator = fib()  # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13

# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []

res = unpack_list([], target_num=2, default=None)
assert res == [None, None]

empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []

empty = unpack_list(range(0, 0), target_num=0)
assert empty == []

empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []

with pytest.raises(ValueError):
    # ValueError: not enough values to unpack (expected 3, got 2)
    first, second, third = unpack_list([1, 2], target_num=2)

until

from itertools import count

from pythonic_toolbox.utils.list_utils import until

# basic usage
counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11

assert until([1, 2, 3], lambda x: x > 10, default=11) == 11

# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None

# edge cases
assert until([], default=3) == 3  # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11

# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None

# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2)  # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None

numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5

string_utils

substitute_string_template_dict

from unittest.mock import patch, PropertyMock

import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError

# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
    'greeting': 'Good Morning, Everyone!',
    'first_name': 'Albert',
    'last_name': 'Lee',
    'full_name': '$first_name $last_name',
    'age': 34,
    'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
}
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech

# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
    'first_name': 'Daenerys',
    'last_name': 'Targaryen',
    'nick_name': 'Dany',
    'full_name': '$first_name $last_name',
    'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
}

variables_dict = {'title1': 'Queen of Meereen',
                  'title2': 'Mother of Dragons'}

class DynamicVariables:
    @property
    def current_time_str(self):
        import datetime
        return datetime.datetime.now().strftime("%H:%M:%S")

class DefaultUnknownTitle:
    """
    A class will always return UnknownTitle, when try to access attribute like
    title1, title2, ..., titleX
    """

    def __getattribute__(self, item):
        if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
            return 'UnknownTitle'
        return super(DefaultUnknownTitle, self).__getattribute__(item)

expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
                   "it's 08:00:00, good morning everyone!")

# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
    output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
                                                  DefaultUnknownTitle(),
                                                  greeting='good morning everyone')
    assert output_dict['speech'] == expected_speech

# edge cases
assert substitute_string_template_dict({}) == {}

# cycle detection
str_template_dict = {
    'variable_a': 'Hello $variable_b',  # variable_a depends on variable_b
    'variable_b': 'Hello $variable_a',  # variable_b depends on variable_a, it's a cycle!
}

with pytest.raises(CycleError) as exec_info:
    substitute_string_template_dict(str_template_dict)

context

SkipContext

import itertools

import pytest
from pythonic_toolbox.utils.context_utils import SkipContext

# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):

    def __init__(self, week_day: str):
        working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
        weekends = {'saturday', 'sunday'}

        if week_day.lower() not in working_days.union(weekends):
            raise ValueError(f'Invalid weekday {week_day}')

        skip = True if week_day.lower() in weekends else False
        super(MyWorkStation, self).__init__(skip=skip)

seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0

for cur_week_day in seven_week_days:
    # MyWorkStation will skip the code block when encountering weekends
    with MyWorkStation(week_day=cur_week_day):
        # log this working day
        logged_opening_days.append(cur_week_day)
        # accumulate working hours, 8 hours on each working day
        total_working_hours += 8

# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5

# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)

flg_skip = True
with SkipContext(skip=flg_skip):
    # if skip = True, all codes inside the context will be skipped(not executed)
    next(count_iterator)  # this will not be executed
    assert sum([1, 1]) == 3
    raise Exception('Codes will not be executed')

assert next(count_iterator) == 0  # check previous context is skipped

flg_skip = False
with SkipContext(skip=flg_skip):
    # codes will be executed as normal, if skip = False
    next(count_iterator)  # generate value 1
    assert sum([1, 1]) == 2

assert next(count_iterator) == 2  # check previous context is executed

with pytest.raises(Exception) as exec_info:
    with SkipContext(skip=False):
        # if skip = False, this SkipContextManager is transparent,
        # internal exception will be detected as normal
        raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'

# another example: ensure there will be only one job, who acquire the lock, run the increase +1

from multiprocessing import Manager, Pool
import time

from pythonic_toolbox.utils.context_utils import SkipContext


def plain_cronjob_increase(ns, lock):
    start = time.time()
    with lock:
        now = time.time()
        if now - start >= 0.5:
            pass
        else:
            ns.cnt += 1
            time.sleep(1)
    return ns.cnt


class PreemptiveLockContext(SkipContext):
    def __init__(self, lock):
        self.start_time = time.perf_counter()
        self.lock = lock
        self.acquired = self.lock.acquire(timeout=0.5)
        skip = not self.acquired
        super(PreemptiveLockContext, self).__init__(skip=skip)

    def __exit__(self, type, value, traceback):
        if self.acquired:
            time.sleep(1)
            self.lock.release()
        if type is None:
            return  # No exception
        else:
            if issubclass(type, self.SkipContentException):
                return True  # Suppress special SkipWithBlockException
            return False


def cronjob_increase(ns, lock):
    # for those who cannot acquire the lock within some time
    # this context block will be skipped, quite simple
    with PreemptiveLockContext(lock):
        ns.cnt += 1
    return ns.cnt



manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)

ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1

# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1