Skip to content

Commit

Permalink
no need to do message passing for run and run_once when we can use as…
Browse files Browse the repository at this point in the history
…ync lua functions
  • Loading branch information
fishman committed Oct 12, 2024
1 parent b8fab3a commit 762a3a7
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 116 deletions.
117 changes: 80 additions & 37 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@ use env_logger::{Builder, Env};
use inotify::{EventMask, Inotify, WatchMask};
use log::{debug, error, info};
use mlua::{AnyUserDataExt, Function, Lua, UserData, UserDataMethods};
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::Write;
use std::{
collections::HashMap,
env,
fs::{self, File},
io::Write,
path::Path,
process::Stdio,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
use utils::Runner;
use sysinfo::{ProcessExt, System, SystemExt};
use tokio::{process::Command, sync::mpsc, task::JoinHandle};
use uuid::Uuid;
use wayland_client::backend::ReadEventsGuard;
use wayland_client::protocol::{wl_output, wl_registry, wl_seat};
use wayland_client::{Connection, Dispatch, EventQueue, QueueHandle};
use wayland_protocols::wp::idle_inhibit::zv1::client::zwp_idle_inhibitor_v1;
use wayland_client::{
backend::ReadEventsGuard,
protocol::{wl_output, wl_registry, wl_seat},
Connection, Dispatch, EventQueue, QueueHandle,
};
use wayland_protocols::{
ext::idle_notify::v1::client::{ext_idle_notification_v1, ext_idle_notifier_v1},
wp::idle_inhibit::zv1::client::zwp_idle_inhibitor_v1,
xdg::activation::v1::client::{xdg_activation_token_v1, xdg_activation_v1},
};
use wayland_protocols_wlr::gamma_control::v1::client::{
Expand Down Expand Up @@ -85,6 +89,7 @@ struct MyLuaFunctions {
idle_notifier: Option<ext_idle_notifier_v1::ExtIdleNotifierV1>,
tx: mpsc::Sender<Request>,
notification_list: NotificationListHandle,
tasks: Mutex<HashMap<String, JoinHandle<anyhow::Result<()>>>>,
//gamma_control: Option<zwlr_gamma_control_v1::ZwlrGammaControlV1>,
}

Expand Down Expand Up @@ -183,19 +188,69 @@ impl UserData for MyLuaFunctions {
Ok(())
},
);
methods.add_method("run", |_lua, this, command: String| {
let tx = this.tx.clone();
std::thread::spawn(move || {
tx.blocking_send(Request::Run(command.to_string())).unwrap();
});

async fn run(cmd: String) -> JoinHandle<Result<(), anyhow::Error>> {
let (cmd, args) = utils::get_args(cmd.clone());

tokio::spawn(async move {
match Command::new(&cmd)
.env(
"WAYLAND_DISPLAY",
env::var("WAYLAND_DISPLAY").unwrap_or_default(),
)
.env(
"DBUS_SESSION_BUS_ADDRESS",
env::var("DBUS_SESSION_BUS_ADDRESS").unwrap_or_default(),
)
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.args(args)
.spawn()
{
Ok(mut child) => match child.wait().await {
Ok(status) => {
info!("Command {} completed with status: {:?}", cmd, status);
Ok(())
}
Err(e) => {
error!("{} process failed to run: {}", cmd, e);
Err(anyhow::Error::msg(format!("Failed to run command: {}", e)))
}
},
Err(e) => {
error!("Failed to spawn {} process: {}", cmd, e);
Err(anyhow::Error::msg(format!(
"Failed to spawn process: {}",
e
)))
}
}
})
}

methods.add_async_method("run", |_lua, _this, command: String| async move {
debug!("run function called {}", command.clone());
let _handle = run(command);
Ok(())
});
methods.add_method("run_once", |_lua, this, command: String| {
let tx = this.tx.clone();
std::thread::spawn(move || {
tx.blocking_send(Request::RunOnce(command.to_string()))
.unwrap();
});

methods.add_async_method("run_once", |_lua, _this, command: String| async move {
debug!("run_once function called {}", command.clone());
let s = System::new_all();
let (cmd_name, _) = utils::get_args(command.clone());

// Check if the process is already running
let is_running = s
.processes_by_exact_name(&cmd_name)
.any(|p| p.name() == cmd_name);

if !is_running {
//let mut tasks = this.tasks.lock();
//if !tasks.contains_key(&cmd) {
let _handle = run(command.clone()).await;
//tasks.insert(cmd_name, handle);
//}
}
Ok(())
});
}
Expand Down Expand Up @@ -259,19 +314,17 @@ pub async fn filewatcher_run(config_path: &Path, tx: mpsc::Sender<Request>) -> a

let mut buffer = [0; 1024];

let _ = tokio::task::spawn_blocking(move || loop {
let _spawn_blocking = tokio::task::spawn_blocking(move || loop {
let events = inotify
.read_events_blocking(&mut buffer)
.expect("Failed to read inotify events");

debug!("Received events");
for event in events {
debug!("File modified: {:?}", event.name);
if event.mask.contains(EventMask::MODIFY) {
if !event.mask.contains(EventMask::ISDIR) {
debug!("File modified: {:?}", event.name);
tx.blocking_send(Request::Reset).unwrap();
}
if event.mask.contains(EventMask::MODIFY) && !event.mask.contains(EventMask::ISDIR) {
debug!("File modified: {:?}", event.name);
tx.blocking_send(Request::Reset).unwrap();
}
}
});
Expand All @@ -284,7 +337,6 @@ async fn process_command(
rx: &mut mpsc::Receiver<Request>,
shared_map: NotificationListHandle,
dbus_handlers: CallbackListHandle,
runner: &Runner,
) -> anyhow::Result<()> {
while let Some(event) = rx.recv().await {
match event {
Expand Down Expand Up @@ -322,14 +374,6 @@ async fn process_command(
}
}
}
Request::Run(cmd) => {
debug!("Running command: {}", cmd);
let _ = runner.run(cmd).await;
}
Request::RunOnce(cmd) => {
debug!("Running command once: {}", cmd);
let _ = runner.run_once(cmd).await;
}
Request::OnBattery(state) => {
let lua = lua.lock().unwrap();
let globals = lua.globals();
Expand Down Expand Up @@ -359,7 +403,6 @@ async fn main() -> anyhow::Result<()> {
let shared_map = Arc::new(Mutex::new(map));
let lua = Arc::new(Mutex::new(Lua::new()));
let dbus_handlers = Arc::new(Mutex::new(HashMap::new()));
let runner = Runner::new();
//let joystick_handler = Arc::new(TokioMutex::new(JoystickHandler::new()));
//let _ = tokio::spawn(JoystickHandler::run(joystick_handler.clone())).await;
//let _ = tokio::spawn(JoystickHandler::udev_handler_run(joystick_handler.clone())).await;
Expand All @@ -384,7 +427,6 @@ async fn main() -> anyhow::Result<()> {
&mut rx,
shared_map.clone(),
dbus_handlers.clone(),
&runner
),
)?;
// .await
Expand Down Expand Up @@ -418,6 +460,7 @@ fn lua_init(state: &mut State) -> anyhow::Result<()> {
qh: state.qh.clone(),
notification_list: state.notification_list.clone(),
tx: state.tx.clone(),
tasks: Mutex::new(HashMap::new()),
};

let globals = lua.globals();
Expand Down
2 changes: 0 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,5 @@ pub enum Request {
LuaReload,
LuaMethod(String),
Reset,
Run(String),
RunOnce(String),
OnBattery(bool),
}
79 changes: 2 additions & 77 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use futures::lock::Mutex;
use log::info;
use std::{collections::HashMap, env, path::PathBuf, process::Stdio};
use sysinfo::{ProcessExt, System, SystemExt};
use tokio::{process::Command, task::JoinHandle};
use std::path::PathBuf;
use xdg::BaseDirectories;

use super::config;
Expand All @@ -20,79 +16,8 @@ pub fn xdg_config_path(filename: Option<String>) -> std::io::Result<PathBuf> {
match filename {
Some(filename) => {
let config_path: PathBuf = xdg_dirs.place_config_file(filename)?;
return Ok(config_path);
Ok(config_path)
}
None => Ok(xdg_dirs.get_config_home()),
}
}

#[derive(Debug)]
pub struct Runner {
tasks: Mutex<HashMap<String, JoinHandle<anyhow::Result<()>>>>,
}

impl Runner {
pub fn new() -> Self {
Self {
tasks: Mutex::new(HashMap::new()),
}
}

pub async fn run(&self, cmd: String) -> JoinHandle<Result<(), anyhow::Error>> {
info!("cmd: {}", cmd);
//TODO: get_args executed twice
let (cmd, args) = get_args(cmd);

tokio::spawn(async move {
let mut child = Command::new(&cmd)
.env(
"WAYLAND_DISPLAY",
env::var("WAYLAND_DISPLAY").unwrap_or_default(),
)
.env(
"DBUS_SESSION_BUS_ADDRESS",
env::var("DBUS_SESSION_BUS_ADDRESS").unwrap_or_default(),
)
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.args(args)
.spawn()
.map_err(|e| {
anyhow::Error::msg(format!("Failed to spawn {} process: {}", cmd, e))
})?;

let status = child
.wait()
.await
.map_err(|e| anyhow::Error::msg(format!("{} process failed to run: {}", cmd, e)))?;

//{
// let mut tasks = self.tasks.lock().await;
// tasks.remove(&cmd);
//}
info!("Command {} completed with status: {:?}", cmd, status);

Ok(())
})
}

pub async fn run_once(&self, cmd: String) -> anyhow::Result<(), Box<dyn std::error::Error>> {
let s = System::new_all();
//TODO: get_args executed twice
let (cmd_name, _) = get_args(cmd.clone());

// Check if the process is already running
let is_running = s
.processes_by_exact_name(&cmd_name)
.any(|p| p.name() == cmd_name);

if !is_running {
let mut tasks = self.tasks.lock().await;
//if !tasks.contains_key(&cmd) {
let _handle = self.run(cmd.clone()).await;
//tasks.insert(cmd_name, handle);
//}
}
Ok(())
}
}

0 comments on commit 762a3a7

Please # to comment.