mirror of
https://github.com/natankeddem/bale.git
synced 2026-05-03 14:12:54 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 94fba0b925 | |||
| 8572ad766b | |||
| 8a2922262e |
+11
-4
@@ -34,6 +34,7 @@ class Cli:
|
||||
self.stderr: List[str] = []
|
||||
self._terminate: asyncio.Event = asyncio.Event()
|
||||
self._busy: bool = False
|
||||
self._truncated: bool = False
|
||||
self.prefix_line: str = ""
|
||||
self._stdout_terminals: List[Terminal] = []
|
||||
self._stderr_terminals: List[Terminal] = []
|
||||
@@ -70,8 +71,11 @@ class Cli:
|
||||
else:
|
||||
break
|
||||
|
||||
async def _controller(self, process: Process) -> None:
|
||||
async def _controller(self, process: Process, max_output_lines) -> None:
|
||||
while process.returncode is None:
|
||||
if max_output_lines > 0 and len(self.stderr) + len(self.stdout) > max_output_lines:
|
||||
self._truncated = True
|
||||
process.terminate()
|
||||
if self._terminate.is_set():
|
||||
process.terminate()
|
||||
try:
|
||||
@@ -83,7 +87,7 @@ class Cli:
|
||||
def terminate(self) -> None:
|
||||
self._terminate.set()
|
||||
|
||||
async def execute(self, command: str) -> Result:
|
||||
async def execute(self, command: str, max_output_lines: int = 0) -> Result:
|
||||
self._busy = True
|
||||
c = shlex.split(command, posix=False)
|
||||
try:
|
||||
@@ -92,13 +96,14 @@ class Cli:
|
||||
self.stdout.clear()
|
||||
self.stderr.clear()
|
||||
self._terminate.clear()
|
||||
self._truncated = False
|
||||
terminated = False
|
||||
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
|
||||
self.prefix_line = f"<{now}> {command}\n"
|
||||
for terminal in self._stdout_terminals:
|
||||
terminal.call_terminal_method("write", "\n" + self.prefix_line)
|
||||
await asyncio.gather(
|
||||
self._controller(process=process),
|
||||
self._controller(process=process, max_output_lines=max_output_lines),
|
||||
self._read_stdout(stream=process.stdout),
|
||||
self._read_stderr(stream=process.stderr),
|
||||
)
|
||||
@@ -110,7 +115,9 @@ class Cli:
|
||||
finally:
|
||||
self._terminate.clear()
|
||||
self._busy = False
|
||||
return Result(command=command, return_code=process.returncode, stdout_lines=self.stdout.copy(), stderr_lines=self.stderr.copy(), terminated=terminated)
|
||||
return Result(
|
||||
command=command, return_code=process.returncode, stdout_lines=self.stdout.copy(), stderr_lines=self.stderr.copy(), terminated=terminated, truncated=self._truncated
|
||||
)
|
||||
|
||||
async def shell(self, command: str) -> Result:
|
||||
self._busy = True
|
||||
|
||||
@@ -90,10 +90,10 @@ class Ssh(Cli):
|
||||
del self._config[self.host]
|
||||
self.write_config()
|
||||
|
||||
async def execute(self, command: str) -> Result:
|
||||
async def execute(self, command: str, max_output_lines: int = 0) -> Result:
|
||||
self._base_cmd = f"{'' if self.use_key else f'sshpass -p {self.password} '} ssh -F {self._config_path} {self.host}"
|
||||
self._full_cmd = f"{self._base_cmd} {command}"
|
||||
return await super().execute(self._full_cmd)
|
||||
return await super().execute(self._full_cmd, max_output_lines)
|
||||
|
||||
async def send_key(self) -> Result:
|
||||
await get_public_key(self._raw_path)
|
||||
|
||||
+20
-12
@@ -3,7 +3,7 @@ from pathlib import Path
|
||||
import stat
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
from nicegui import app, events, ui
|
||||
from nicegui import app, background_tasks, events, ui
|
||||
from fastapi.responses import StreamingResponse
|
||||
import asyncssh
|
||||
from bale import elements as el
|
||||
@@ -87,7 +87,7 @@ class SshFileBrowse(ui.dialog):
|
||||
row.tailwind.height("[40px]")
|
||||
el.DButton("Download", on_click=self._start_download)
|
||||
ui.button("Exit", on_click=lambda: self.submit("exit"))
|
||||
await self._update_grid()
|
||||
await self._update_handler()
|
||||
|
||||
async def _connect(self) -> Tuple[asyncssh.SSHClientConnection, asyncssh.SFTPClient]:
|
||||
ssh = await asyncssh.connect(self._zfs.hostname, username=self._zfs.username, client_keys=[self._zfs.key_path])
|
||||
@@ -137,7 +137,7 @@ class SshFileBrowse(ui.dialog):
|
||||
"permissions": attributes.permissions,
|
||||
}
|
||||
|
||||
async def _update_grid(self) -> None:
|
||||
async def _update_handler(self) -> None:
|
||||
self._grid.call_api_method("showLoadingOverlay")
|
||||
if self._ssh is None or self._sftp is None:
|
||||
self._ssh, self._sftp = await self._connect()
|
||||
@@ -165,7 +165,7 @@ class SshFileBrowse(ui.dialog):
|
||||
async def _handle_double_click(self, e: events.GenericEventArguments) -> None:
|
||||
self.path = e.args["data"]["path"]
|
||||
if e.args["data"]["type"] == "directory":
|
||||
await self._update_grid()
|
||||
await self._update_handler()
|
||||
else:
|
||||
await self._start_download(e)
|
||||
|
||||
@@ -226,8 +226,10 @@ class SshFileFind(SshFileBrowse):
|
||||
with el.DBody(height="fit", width="[90vw]"):
|
||||
with el.WColumn().classes("col"):
|
||||
filesystems = await self._zfs.filesystems
|
||||
self._filesystem = el.DSelect(list(filesystems.data.keys()), label="filesystem", with_input=True, on_change=self._update_grid)
|
||||
self._pattern = el.DInput("Pattern", on_change=self._update_grid)
|
||||
self._filesystem = el.DSelect(list(filesystems.data.keys()), label="filesystem", with_input=True, on_change=self._update_handler)
|
||||
with el.WRow():
|
||||
self._pattern = ui.input("Pattern").classes("col").on("keydown.enter", handler=self._update_handler)
|
||||
el.LgButton(icon="search", on_click=self._update_handler)
|
||||
self._grid = ui.aggrid(
|
||||
{
|
||||
"defaultColDef": {"flex": 1, "sortable": True, "suppressMovable": True, "sortingOrder": ["asc", "desc"]},
|
||||
@@ -264,15 +266,21 @@ class SshFileFind(SshFileBrowse):
|
||||
row.tailwind.height("[40px]")
|
||||
el.DButton("Download", on_click=self._start_download)
|
||||
ui.button("Exit", on_click=lambda: self.submit("exit"))
|
||||
await self._update_grid()
|
||||
self._grid.call_api_method("hideOverlay")
|
||||
|
||||
async def _update_grid(self) -> None:
|
||||
self._grid.call_api_method("showLoadingOverlay")
|
||||
if self._filesystem is not None:
|
||||
async def _update_handler(self) -> None:
|
||||
if len(self._pattern.value) > 0 and self._filesystem is not None:
|
||||
self._grid.call_api_method("showLoadingOverlay")
|
||||
self._filesystem.props("readonly")
|
||||
self._pattern.props("readonly")
|
||||
files = await self._zfs.find_files_in_snapshots(filesystem=self._filesystem.value, pattern=self._pattern.value)
|
||||
self._grid.options["rowData"] = files.data
|
||||
self._grid.update()
|
||||
self._grid.call_api_method("hideOverlay")
|
||||
if files.truncated is True:
|
||||
el.notify("Too many files found, truncating list.", type="warning")
|
||||
self._grid.update()
|
||||
self._filesystem.props(remove="readonly")
|
||||
self._pattern.props(remove="readonly")
|
||||
self._grid.call_api_method("hideOverlay")
|
||||
|
||||
async def _handle_double_click(self, e: events.GenericEventArguments) -> None:
|
||||
await self._start_download(e)
|
||||
|
||||
+25
-24
@@ -81,7 +81,7 @@ class Zfs:
|
||||
command = command if len(command) < 160 else command[:160] + "..."
|
||||
el.notify(command)
|
||||
|
||||
async def execute(self, command: str, notify: bool = True) -> Result:
|
||||
async def execute(self, command: str, max_output_lines: int = 0, notify: bool = True) -> Result:
|
||||
if notify:
|
||||
self.notify(command)
|
||||
return Result(command=command)
|
||||
@@ -166,27 +166,28 @@ class Zfs:
|
||||
return result
|
||||
|
||||
async def find_files_in_snapshots(self, filesystem: str, pattern: str) -> Result:
|
||||
filesystems = await self.filesystems
|
||||
if filesystem in filesystems.data.keys():
|
||||
if "mountpoint" in filesystems.data[filesystem]:
|
||||
command = f"find {filesystems.data[filesystem]['mountpoint']}/.zfs/snapshot -type f -name '{pattern}' -printf '%h\t%f\t%s\t%T@\n'"
|
||||
result = await self.execute(command=command, notify=False)
|
||||
files = []
|
||||
for line in result.stdout_lines:
|
||||
matches = re.match(
|
||||
"^(?P<location>[^\t]+)\t(?P<name>[^\t]+)\t(?P<bytes>[^\t]+)\t(?P<modified_timestamp>[^\n]+)",
|
||||
line,
|
||||
)
|
||||
if matches is not None:
|
||||
md = matches.groupdict()
|
||||
md["path"] = f"{md['location']}/{md['name']}"
|
||||
md["bytes"] = int(md["bytes"])
|
||||
md["size"] = format_bytes(md["bytes"])
|
||||
md["modified_datetime"] = datetime.fromtimestamp(float(md["modified_timestamp"])).strftime("%Y/%m/%d %H:%M:%S")
|
||||
md["modified_timestamp"] = float(md["modified_timestamp"])
|
||||
files.append(md)
|
||||
result.data = files
|
||||
return result
|
||||
try:
|
||||
filesystems = await self.filesystems
|
||||
command = f"find {filesystems.data[filesystem]['mountpoint']}/.zfs/snapshot -type f -name '{pattern}' -printf '%h\t%f\t%s\t%T@\n'"
|
||||
result = await self.execute(command=command, notify=False, max_output_lines=1000)
|
||||
files = []
|
||||
for line in result.stdout_lines:
|
||||
matches = re.match(
|
||||
"^(?P<location>[^\t]+)\t(?P<name>[^\t]+)\t(?P<bytes>[^\t]+)\t(?P<modified_timestamp>[^\n]+)",
|
||||
line,
|
||||
)
|
||||
if matches is not None:
|
||||
md = matches.groupdict()
|
||||
md["path"] = f"{md['location']}/{md['name']}"
|
||||
md["bytes"] = int(md["bytes"])
|
||||
md["size"] = format_bytes(md["bytes"])
|
||||
md["modified_datetime"] = datetime.fromtimestamp(float(md["modified_timestamp"])).strftime("%Y/%m/%d %H:%M:%S")
|
||||
md["modified_timestamp"] = float(md["modified_timestamp"])
|
||||
files.append(md)
|
||||
result.data = files
|
||||
return result
|
||||
except KeyError:
|
||||
pass
|
||||
return Result()
|
||||
|
||||
@property
|
||||
@@ -244,10 +245,10 @@ class Ssh(ssh.Ssh, Zfs):
|
||||
def notify(self, command: str):
|
||||
super().notify(f"<{self.host}> {command}")
|
||||
|
||||
async def execute(self, command: str, notify: bool = True) -> Result:
|
||||
async def execute(self, command: str, max_output_lines: int = 0, notify: bool = True) -> Result:
|
||||
if notify:
|
||||
self.notify(command)
|
||||
result = await super().execute(command)
|
||||
result = await super().execute(command, max_output_lines)
|
||||
if result.stderr != "":
|
||||
el.notify(result.stderr, type="negative")
|
||||
result.name = self.host
|
||||
|
||||
@@ -13,6 +13,7 @@ class Result:
|
||||
stdout_lines: List[str] = field(default_factory=list)
|
||||
stderr_lines: List[str] = field(default_factory=list)
|
||||
terminated: bool = False
|
||||
truncated: bool = False
|
||||
data: Any = None
|
||||
trace: str = ""
|
||||
cached: bool = False
|
||||
|
||||
+23
-4
@@ -1,7 +1,8 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
from . import SelectionConfirm, Tab
|
||||
from nicegui import ui, events
|
||||
import httpx
|
||||
from . import SelectionConfirm, Tab
|
||||
from bale import elements as el
|
||||
from bale.result import Result
|
||||
from bale.interfaces import zfs
|
||||
@@ -98,6 +99,20 @@ class History(Tab):
|
||||
http[status]["data"] = e.content["json"]["data"]
|
||||
http[status]["headers"] = e.content["json"]["headers"]
|
||||
|
||||
def test(status):
|
||||
try:
|
||||
url = http[status]["url"]
|
||||
data = self.process_pipe_data(result=Result(name=self.host, command="TEST COMMAND", status=status), data=http[status]["data"])
|
||||
headers = http[status]["headers"]
|
||||
post = httpx.post(url=url, json=data, headers=headers)
|
||||
print(post.status_code)
|
||||
if post.status_code == 200:
|
||||
el.notify("Test successful!", type="positive")
|
||||
else:
|
||||
el.notify(f"Test failed with status code {post.status_code}!", type="negative")
|
||||
except:
|
||||
el.notify("Test failed!", type="negative")
|
||||
|
||||
def show_controls(status):
|
||||
if status not in http:
|
||||
http[status] = {}
|
||||
@@ -115,7 +130,7 @@ class History(Tab):
|
||||
"topic": "mytopic",
|
||||
"tags": ["turtle"],
|
||||
"title": "Successful Automation Run for {name}",
|
||||
"message": "{stdout}",
|
||||
"message": "{command}",
|
||||
},
|
||||
)
|
||||
editor.properties["content"]["json"]["headers"] = self.get_pipe_status("http", status).get("headers", {"Authorization": "Bearer tk_..."})
|
||||
@@ -134,11 +149,15 @@ class History(Tab):
|
||||
with ui.step("On Success"):
|
||||
with el.WColumn().classes("col justify-start"):
|
||||
show_controls(status="success")
|
||||
el.LgButton("NEXT", on_click=lambda _: stepper.next())
|
||||
with el.WRow():
|
||||
el.LgButton("TEST", on_click=lambda _: test(status="success"))
|
||||
el.LgButton("NEXT", on_click=lambda _: stepper.next())
|
||||
with ui.step("On Error"):
|
||||
with el.WColumn().classes("col justify-start"):
|
||||
show_controls(status="error")
|
||||
el.DButton("SAVE", on_click=lambda: host_dialog.submit("save"))
|
||||
with el.WRow():
|
||||
el.LgButton("TEST", on_click=lambda _: test(status="error"))
|
||||
el.DButton("SAVE", on_click=lambda: host_dialog.submit("save"))
|
||||
|
||||
result = await host_dialog
|
||||
if result == "save":
|
||||
|
||||
+6
-3
@@ -111,9 +111,12 @@ class Manage(Tab):
|
||||
result = await SelectionConfirm(container=self._confirm, label=">BROWSE<")
|
||||
if result == "confirm":
|
||||
rows = await self._grid.get_selected_rows()
|
||||
filesystems = await self.zfs.filesystems
|
||||
mount_path = filesystems.data[rows[0]["filesystem"]]["mountpoint"]
|
||||
await sshdl.SshFileBrowse(zfs=self.zfs, path=f"{mount_path}/.zfs/snapshot/{rows[0]['name']}")
|
||||
try:
|
||||
filesystems = await self.zfs.filesystems
|
||||
mount_path = filesystems.data[rows[0]["filesystem"]]["mountpoint"]
|
||||
await sshdl.SshFileBrowse(zfs=self.zfs, path=f"{mount_path}/.zfs/snapshot/{rows[0]['name']}")
|
||||
except KeyError:
|
||||
el.notify(f"Unable to browse {rows[0]['filesystem']}", type="warning")
|
||||
self._set_selection()
|
||||
|
||||
async def _find(self) -> None:
|
||||
|
||||
Reference in New Issue
Block a user