refactored automation

This commit is contained in:
Natan Keddem
2023-11-23 18:44:29 -05:00
parent 0221780a19
commit 0148b23310
2 changed files with 142 additions and 159 deletions

View File

@@ -10,18 +10,18 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore
@dataclass(kw_only=True)
class Automation:
id: str
name: str
app: str
hosts: List[str]
host: str
command: str
schedule_mode: str
triggers: Dict[str, str]
options: Union[Dict[str, Any], None] = None
id: str = ""
name: str = ""
app: str = "remote"
hosts: List[str] = field(default_factory=list)
host: str = ""
command: str = ""
schedule_mode: str = ""
triggers: Dict[str, str] = field(default_factory=dict)
options: Dict[str, Any] = field(default_factory=dict)
pipe_success: bool = False
pipe_error: bool = False
timestamp: float = field(default_factory=time.time)
pipe_success: bool
pipe_error: bool
def to_dict(self) -> Dict[str, Any]:
return self.__dict__
@@ -30,12 +30,14 @@ class Automation:
@dataclass(kw_only=True)
class Zfs_Autobackup(Automation):
app: str = "zfs_autobackup"
execute_mode: str = "local"
prop: str
target_host: str
target_path: str
target_paths: List[str]
filesystems: Dict[str, Union[str, List[str], Dict[str, str]]]
prop: str = "autobackup:{name}"
target_host: str = ""
target_path: str = ""
target_paths: List[str] = field(default_factory=list)
parentchildren: List[str] = field(default_factory=list)
parent: List[str] = field(default_factory=list)
children: List[str] = field(default_factory=list)
exclude: List[str] = field(default_factory=list)
class _Scheduler:

View File

@@ -28,6 +28,22 @@ logger = logging.getLogger(__name__)
job_handlers: Dict[str, Union[cli.Cli, ssh.Ssh]] = {}
def automation(raw: Union[str, Job]) -> Union[scheduler.Automation, scheduler.Zfs_Autobackup, None]:
json_data = json.dumps({})
if isinstance(raw, str):
json_data = raw
elif isinstance(raw, Job):
if "data" in raw.kwargs:
json_data = raw.kwargs["data"]
else:
return None
raw_data = json.loads(json_data)
if raw_data["app"] == "zfs_autobackup":
return scheduler.Zfs_Autobackup(**raw_data)
else:
return scheduler.Automation(**raw_data)
def populate_job_handler(app: str, job_id: str, host: str):
tab = Tab(host=None, spinner=None)
if job_id not in job_handlers:
@@ -43,46 +59,43 @@ class AutomationTemplate(string.Template):
async def automation_job(**kwargs) -> None:
if "data" in kwargs:
jd = json.loads(kwargs["data"])
command = AutomationTemplate(jd["command"])
auto = automation(kwargs["data"])
if auto is not None:
command = AutomationTemplate(auto.command)
tab = Tab(host=None, spinner=None)
if jd["app"] == "zfs_autobackup":
d = scheduler.Zfs_Autobackup(**jd)
populate_job_handler(app=d.app, job_id=d.id, host=d.host)
if job_handlers[d.id].is_busy is False:
result = await job_handlers[d.id].execute(command.safe_substitute(name=d.name, host=d.host))
result.name = d.host
if auto.app == "zfs_autobackup":
populate_job_handler(app=auto.app, job_id=auto.id, host=auto.host)
if job_handlers[auto.id].is_busy is False:
result = await job_handlers[auto.id].execute(command.safe_substitute(name=auto.name, host=auto.host))
result.name = auto.host
result.status = "success" if result.return_code == 0 else "error"
if d.pipe_success is True and result.status == "success":
if auto.pipe_success is True and result.status == "success":
tab.pipe_result(result=result)
if d.pipe_error is True and result.status != "success":
if auto.pipe_error is True and result.status != "success":
tab.pipe_result(result=result)
tab.add_history(result=result)
else:
logger.warning("Job Skipped!")
elif jd["app"] == "remote":
d = scheduler.Automation(**jd)
populate_job_handler(app=d.app, job_id=d.id, host=d.host)
if job_handlers[d.id].is_busy is False:
result = await job_handlers[d.id].execute(command.safe_substitute(name=d.name, host=d.host))
result.name = d.host
if d.pipe_success is True and result.status == "success":
elif auto.app == "remote":
populate_job_handler(app=auto.app, job_id=auto.id, host=auto.host)
if job_handlers[auto.id].is_busy is False:
result = await job_handlers[auto.id].execute(command.safe_substitute(name=auto.name, host=auto.host))
result.name = auto.host
if auto.pipe_success is True and result.status == "success":
tab.pipe_result(result=result)
if d.pipe_error is True and result.status != "success":
if auto.pipe_error is True and result.status != "success":
tab.pipe_result(result=result)
tab.add_history(result=result)
else:
logger.warning("Job Skipped!")
elif jd["app"] == "local":
d = scheduler.Automation(**jd)
populate_job_handler(app=d.app, job_id=d.id, host=d.host)
if job_handlers[d.id].is_busy is False:
result = await job_handlers[d.id].execute(command.safe_substitute(name=d.name, host=d.host))
result.name = d.host
if d.pipe_success is True and result.status == "success":
elif auto.app == "local":
populate_job_handler(app=auto.app, job_id=auto.id, host=auto.host)
if job_handlers[auto.id].is_busy is False:
result = await job_handlers[auto.id].execute(command.safe_substitute(name=auto.name, host=auto.host))
result.name = auto.host
if auto.pipe_success is True and result.status == "success":
tab.pipe_result(result=result)
if d.pipe_error is True and result.status != "success":
if auto.pipe_error is True and result.status != "success":
tab.pipe_result(result=result)
tab.add_history(result=result)
else:
@@ -98,7 +111,7 @@ class Automation(Tab):
self.picked_options: Dict[str, str] = {}
self.triggers: Dict[str, str] = {}
self.picked_triggers: Dict[str, str] = {}
self.job_data: Dict[str, str] = {}
self.auto: Union[scheduler.Automation, scheduler.Zfs_Autobackup]
self.job_names: List[str] = []
self.default_options: Dict[str, str] = {}
self.build_command: Callable
@@ -124,6 +137,11 @@ class Automation(Tab):
self.triggers_scroll: ui.scroll_area
self.trigger_controls: Dict[str, str] = {}
self.hosts: el.DSelect
self.prop: el.DInput
self.parentchildren: el.DSelect
self.parent: el.DSelect
self.children: el.DSelect
self.exclude: el.DSelect
super().__init__(spinner, host)
def _build(self) -> None:
@@ -136,7 +154,6 @@ class Automation(Tab):
el.SmButton("Create", on_click=self._create_automation)
el.SmButton("Remove", on_click=self._remove_automation)
el.SmButton("Edit", on_click=self._edit_automation)
# el.SmButton("Duplicate", on_click=self._duplicate_automation)
el.SmButton("Run Now", on_click=self._run_automation)
with ui.row().classes("items-center"):
el.SmButton(text="Refresh", on_click=self._update_automations)
@@ -196,11 +213,10 @@ class Automation(Tab):
job_id = f"{job_data.args['data']['name']}@{self.host}"
for job in self.scheduler.scheduler.get_jobs():
if job.id == job_id:
if "data" in job.kwargs:
jd = json.loads(job.kwargs["data"])
populate_job_handler(app=jd["app"], job_id=job.id, host=self.host)
break
auto = automation(job)
if auto is not None and auto.id == job_id:
populate_job_handler(app=auto.app, job_id=auto.id, host=self.host)
break
async def run():
for job in self.scheduler.scheduler.get_jobs():
@@ -240,17 +256,9 @@ class Automation(Tab):
next_run = job.next_run_time.timestamp()
else:
next_run = "NA"
if "data" in job.kwargs:
jd = json.loads(job.kwargs["data"])
if self.host == jd["host"]:
self._automations.append(
{
"name": job.id.split("@")[0],
"command": jd["command"],
"next_run": next_run,
"status": "",
}
)
auto = automation(job)
if auto is not None and auto.host == self.host:
self._automations.append({"name": auto.name, "command": auto.command, "next_run": next_run, "status": ""})
self._grid.update()
async def _remove_automation(self) -> None:
@@ -279,29 +287,6 @@ class Automation(Tab):
job.modify(next_run_time=datetime.now())
self._set_selection()
async def _duplicate_automation(self) -> None:
rows = await self._grid.get_selected_rows()
if len(rows) == 1:
with ui.dialog() as dialog, el.Card():
with el.DBody():
with el.WColumn():
host = el.DSelect(self._zfs_hosts, value=self.host, label="Host", with_input=True)
with el.WRow():
el.DButton("Duplicate", on_click=lambda: dialog.submit("duplicate"))
result = await dialog
if result == "confirm":
for job in self.scheduler.scheduler.get_jobs():
if job.id == rows[0]["name"]:
self.scheduler.scheduler.add_job(
automation_job,
trigger=build_triggers(),
kwargs={"data": json.dumps(auto.to_dict())},
id=self.auto_name.value.lower(),
coalesce=True,
max_instances=1,
replace_existing=True,
)
async def _edit_automation(self) -> None:
self._set_selection(mode="single")
result = await SelectionConfirm(container=self._confirm, label=">EDIT<")
@@ -329,16 +314,16 @@ class Automation(Tab):
self.picked_options = {}
self.triggers = {}
self.picked_triggers = {}
self.job_data = {}
jobs = self.scheduler.scheduler.get_jobs()
self.job_names = []
self.auto = scheduler.Automation(host=self.host, hosts=[self.host])
job = None
for job in jobs:
j = job.id.split("@")[0]
self.job_names.append(j)
if name == j:
job = self.scheduler.scheduler.get_job(job.id)
self.job_data.update(json.loads(job.kwargs["data"]))
auto = automation(job)
if auto is not None:
self.job_names.append(auto.name)
if auto.name == name:
self.auto = auto
def validate_name(n: str):
if len(n) > 0 and n.islower() and "@" not in n and (n not in self.job_names or name != ""):
@@ -381,18 +366,15 @@ class Automation(Tab):
def option_changed(e):
self.current_help.text = self.options[e.value]["description"]
async def zab_controls() -> None:
async def zab_controls(auto: scheduler.Zfs_Autobackup) -> None:
filesystems = await self.zfs.filesystems
if isinstance(self.job_data.get("filesystems", {}), dict):
self.fs = self.job_data.get(
"filesystems",
{"all": {}, "values": {}, "parent": [], "children": [], "parentchildren": [], "exclude": []},
)
else:
self.fs = {"all": {}, "values": {}, "parent": [], "children": [], "parentchildren": [], "exclude": []}
if not self.fs["all"]:
for fs in filesystems.data:
self.fs["all"][fs] = ""
parent: List[str] = []
children: List[str] = []
parentchildren: List[str] = []
exclude: List[str] = []
all_fs: Dict[str, str] = {}
for fs in filesystems.data:
all_fs[fs] = ""
async def target_host_selected() -> None:
if self.target_host.value != "":
@@ -427,24 +409,24 @@ class Automation(Tab):
self.command.value = base
def all_fs_to_lists():
self.fs["parentchildren"].clear()
self.fs["parent"].clear()
self.fs["children"].clear()
self.fs["exclude"].clear()
for fs, v in self.fs["all"].items():
parentchildren.clear()
parent.clear()
children.clear()
exclude.clear()
for fs, v in all_fs.items():
if v == "":
self.fs["parentchildren"].append(fs)
self.fs["parent"].append(fs)
self.fs["children"].append(fs)
self.fs["exclude"].append(fs)
parentchildren.append(fs)
parent.append(fs)
children.append(fs)
exclude.append(fs)
elif v == "true":
self.fs["parentchildren"].append(fs)
parentchildren.append(fs)
elif v == "parent":
self.fs["parent"].append(fs)
parent.append(fs)
elif v == "child":
self.fs["children"].append(fs)
children.append(fs)
elif v == "false":
self.fs["exclude"].append(fs)
exclude.append(fs)
def cull_fs_list(e: events.GenericEventArguments, value: str = "false") -> None:
if e.sender != self.parentchildren:
@@ -455,11 +437,11 @@ class Automation(Tab):
self.children.disable()
if e.sender != self.exclude:
self.exclude.disable()
for fs, v in self.fs["all"].items():
for fs, v in all_fs.items():
if v == value:
self.fs["all"][fs] = ""
all_fs[fs] = ""
for fs in e.sender.value:
self.fs["all"][fs] = value
all_fs[fs] = value
all_fs_to_lists()
self.parentchildren.enable()
self.parent.enable()
@@ -489,7 +471,7 @@ class Automation(Tab):
"ssh-config": self.zfs.config_path,
}
else:
self.default_options = self.job_data["options"]
self.default_options = auto.options
self.options = zab.options
self.build_command = build_command
filesystems = await self.zfs.filesystems
@@ -501,37 +483,37 @@ class Automation(Tab):
row.tailwind.width("[860px]").justify_content("center")
with ui.column() as col:
col.tailwind.height("full").width("[420px]")
self.prop = el.DInput(label="Property", value="autobackup:{name}", on_change=build_command, validation=validate_prop)
self.prop = el.DInput(label="Property", value=auto.prop, on_change=build_command, validation=validate_prop)
self.app_em.append(self.prop)
self.target_host = el.DSelect(target_host, label="Target Host", on_change=target_host_selected)
self.target_paths = [""]
self.target_path = el.DSelect(self.target_paths, value="", label="Target Path", new_value_mode="add-unique", on_change=build_command)
self.hosts = el.DSelect(source_hosts, label="Source Host(s)", multiple=True, with_input=True)
self.hosts = el.DSelect(source_hosts, label="Source Host(s)", value=auto.hosts, multiple=True, with_input=True)
all_fs_to_lists()
with ui.scroll_area().classes("col"):
self.parentchildren = el.DSelect(
self.fs["parentchildren"],
parentchildren,
label="Source Parent And Children",
with_input=True,
multiple=True,
on_change=lambda e: cull_fs_list(e, "true"),
)
self.parent = el.DSelect(
self.fs["parent"],
parent,
label="Source Parent Only",
with_input=True,
multiple=True,
on_change=lambda e: cull_fs_list(e, "parent"),
)
self.children = el.DSelect(
self.fs["children"],
children,
label="Source Children Only",
with_input=True,
multiple=True,
on_change=lambda e: cull_fs_list(e, "child"),
)
self.exclude = el.DSelect(
self.fs["exclude"],
exclude,
label="Exclude",
with_input=True,
multiple=True,
@@ -540,10 +522,14 @@ class Automation(Tab):
with ui.column() as col:
col.tailwind.height("full").width("[420px]")
options_controls()
self.parentchildren.value = auto.parentchildren
self.parent.value = auto.parent
self.children.value = auto.children
self.exclude.value = auto.exclude
self.previous_prop = auto.prop
if name != "":
self.prop.value = self.job_data.get("prop", "autobackup:{name}")
self.target_host.value = self.job_data.get("target_host", "")
target_path = self.job_data.get("target_path", "")
self.target_host.value = auto.target_host
target_path = auto.target_path
tries = 0
while target_path not in self.target_path.options and tries < 20:
await asyncio.sleep(0.1)
@@ -551,11 +537,6 @@ class Automation(Tab):
if target_path not in self.target_paths:
self.target_paths.append(target_path)
self.target_path.value = target_path
self.parentchildren.value = self.fs["values"].get("parentchildren", None)
self.parent.value = self.fs["values"].get("parent", None)
self.children.value = self.fs["values"].get("children", None)
self.exclude.value = self.fs["values"].get("exclude", None)
self.hosts.value = self.job_data.get("hosts", [self.host])
else:
self.hosts.value = [self.host]
@@ -659,7 +640,7 @@ class Automation(Tab):
if name == "":
self.default_triggers = {"id": {"type": "Cron", "value": ""}}
else:
self.default_triggers = self.job_data["triggers"]
self.default_triggers = self.auto.triggers
with ui.row() as row:
row.tailwind(tw_rows)
self.current_trigger = el.FSelect(["Cron", "Interval"], value="Cron", label="Trigger", with_input=True)
@@ -683,7 +664,10 @@ class Automation(Tab):
if self.app.value is not None:
with options_col:
if self.app.value == "zfs_autobackup":
await zab_controls()
if isinstance(self.auto, scheduler.Zfs_Autobackup):
await zab_controls(self.auto)
else:
await zab_controls(scheduler.Zfs_Autobackup(host=self.host, hosts=[self.host]))
if self.app.value == "local":
local_controls()
if self.app.value == "remote":
@@ -692,20 +676,15 @@ class Automation(Tab):
self.stepper.next()
def local_controls():
command_input = el.DInput("Command").bind_value_to(self.command, "value")
if name != "":
command_input.value = self.job_data["command"]
el.DInput("Command", value=self.auto.command).bind_value_to(self.command, "value")
def remote_controls():
command_input = el.DInput("Command").bind_value_to(self.command, "value")
self.hosts = el.DSelect(self._zfs_hosts, value=self.host, label="Hosts", with_input=True, multiple=True)
command_input = el.DInput("Command", value=self.auto.command).bind_value_to(self.command, "value")
self.hosts = el.DSelect(self._zfs_hosts, value=self.auto.hosts, label="Hosts", with_input=True, multiple=True)
self.save.bind_enabled_from(self.hosts, "value", backward=lambda x: len(x) > 0)
if name != "":
command_input.value = self.job_data["command"]
self.hosts.value = self.job_data["hosts"]
def string_to_interval(string: str):
interval = string.split(":", 4)
def to_interval(value: str):
interval = value.split(":", 4)
interval = interval + ["0"] * (5 - len(interval))
return IntervalTrigger(weeks=int(interval[0]), days=int(interval[1]), hours=int(interval[2]), minutes=int(interval[3]), seconds=int(interval[4]))
@@ -716,7 +695,7 @@ class Automation(Tab):
if "Cron" == value["type"]:
triggers.append(CronTrigger().from_crontab(value["value"]))
elif "Interval" == value["type"]:
triggers.append(string_to_interval(value["value"]))
triggers.append(to_interval(value["value"]))
return combine(triggers)
def validate_hosts(e):
@@ -736,11 +715,11 @@ class Automation(Tab):
col.tailwind.height("full").width("[420px]")
self.auto_name = el.DInput(label="Name", value=" ", validation=validate_name)
with el.WRow():
self.pipe_success = el.DCheckbox("Pipe Success", value=self.job_data.get("pipe_success", False))
self.pipe_error = el.DCheckbox("Pipe Error", value=self.job_data.get("pipe_error", False))
self.pipe_success = el.DCheckbox("Pipe Success", value=self.auto.pipe_success)
self.pipe_error = el.DCheckbox("Pipe Error", value=self.auto.pipe_error)
self.schedule_em = el.ErrorAggregator(self.auto_name)
if name != "":
self.app = el.DInput(label="Application", value=self.job_data["app"]).props("readonly")
self.app = el.DInput(label="Application", value=self.auto.app).props("readonly")
else:
self.app = el.DSelect(
["zfs_autobackup", "local", "remote"],
@@ -773,10 +752,9 @@ class Automation(Tab):
self.auto_name.value = name
if name != "":
self.auto_name.props("readonly")
self.schedule_mode.value = self.job_data["schedule_mode"]
self.schedule_mode.value = self.auto.schedule_mode
result = await automation_dialog
if result == "save":
auto: Union[scheduler.Automation, scheduler.Zfs_Autobackup]
auto_name = self.auto_name.value.lower()
if hasattr(self, "hosts"):
hosts = self.hosts.value
@@ -784,11 +762,16 @@ class Automation(Tab):
hosts = [self.host]
if self.app.value == "zfs_autobackup":
for job in jobs:
j = job.id.split("@")[0]
if j == auto_name:
existing_auto = automation(job)
if existing_auto is not None and existing_auto.name == auto_name:
self.scheduler.scheduler.remove_job(job.id)
for host in hosts:
auto_id = f"{auto_name}@{host}"
if self.previous_prop != "":
command = AutomationTemplate(self.previous_prop)
prop = command.safe_substitute(name=auto_name, host=host)
await self._remove_prop_from_all_fs(host=host, prop=prop)
command = AutomationTemplate(self.prop.value)
prop = command.safe_substitute(name=auto_name, host=host)
await self._remove_prop_from_all_fs(host=host, prop=prop)
@@ -796,11 +779,6 @@ class Automation(Tab):
await self._add_prop_to_fs(host=host, prop=prop, value="parent", filesystems=self.parent.value)
await self._add_prop_to_fs(host=host, prop=prop, value="child", filesystems=self.children.value)
await self._add_prop_to_fs(host=host, prop=prop, value="false", filesystems=self.exclude.value)
self.fs["values"] = {}
self.fs["values"]["parentchildren"] = self.parentchildren.value
self.fs["values"]["parent"] = self.parent.value
self.fs["values"]["children"] = self.children.value
self.fs["values"]["exclude"] = self.exclude.value
auto = scheduler.Zfs_Autobackup(
id=auto_id,
name=auto_name,
@@ -813,10 +791,13 @@ class Automation(Tab):
target_host=self.target_host.value,
target_path=self.target_path.value,
target_paths=self.target_path.options,
filesystems=self.fs,
pipe_success=self.pipe_success.value,
pipe_error=self.pipe_error.value,
prop=self.prop.value,
parentchildren=self.parentchildren.value,
parent=self.parent.value,
children=self.children.value,
exclude=self.exclude.value,
)
self.scheduler.scheduler.add_job(
automation_job,