-mod task;
+mod messages;
mod template;
mod queue_actor;
mod task_actor;
use ::simplelog::{Config, LevelFilter, SimpleLogger};
use crate::queue_actor::QueueActor;
+use crate::task_actor::TaskActor;
use crate::web::init_web_server;
fn main() {
let sys = System::new("webhook-server");
- let queue_actor = SyncArbiter::start(1, move || QueueActor::default());
+ let task_actor = SyncArbiter::start(8, move || TaskActor{queue_actor: None});
+ let queue_actor = QueueActor{task_actor: task_actor.clone(), own_addr: None};
let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
- init_web_server(queue_actor);
+ init_web_server(queue_actor.start());
sys.run();
}
--- /dev/null
+use ::actix::prelude::*;
+use ::std::collections::HashMap;
+
+use crate::queue_actor::QueueActor;
+
+#[derive(Message)]
+pub struct NewTask {
+ pub id: String,
+ pub parameters: HashMap<String, String>,
+ pub command: String,
+}
+
+
+#[derive(Message)]
+pub struct StartTask {
+ pub command: String,
+ pub cwd: String,
+ pub queue_actor: Addr<QueueActor>,
+}
use ::actix::prelude::*;
use ::log::info;
-use crate::task::NewTask;
+use crate::messages::*;
+use crate::task_actor::TaskActor;
+
+
+pub struct QueueActor {
+ pub task_actor: Addr<TaskActor>,
+ pub own_addr: Option<Addr<Self>>
+}
-#[derive(Default)]
-pub struct QueueActor;
impl Actor for QueueActor {
- type Context = SyncContext<Self>;
+ type Context = Context<Self>;
- fn started(&mut self, _: &mut SyncContext<Self>) {
- info!("Background task actor started up")
+ fn started(&mut self, context: &mut Self::Context) {
+ self.own_addr = Some(context.address());
+ info!("Queue management actor started up");
}
}
impl Handler<NewTask> for QueueActor {
type Result = ();
- fn handle(&mut self, task: NewTask, context: &mut SyncContext<Self>) {
- info!("Got new Task: {}", task.id);
+ fn handle(&mut self, new_task: NewTask, context: &mut Self::Context) {
+ info!("Got new Task: {}", new_task.id);
+
+ self.dispatch_task(new_task);
+ }
+}
+
+
+impl QueueActor {
+ fn dispatch_task(&mut self, new_task: NewTask) {
+ let addr = self.own_addr.as_ref().unwrap().clone();
+
+ let start_task = StartTask {
+ command: new_task.command,
+ cwd: "/".to_string(),
+ queue_actor: addr,
+ };
+
+ self.task_actor.do_send(start_task);
}
}
+++ /dev/null
-use ::actix::prelude::*;
-use std::collections::HashMap;
-
-#[derive(Message)]
-pub struct NewTask {
- pub id: String,
- pub parameters: HashMap<String, String>,
-}
use ::actix::prelude::*;
use ::log::info;
-use crate::task::NewTask;
+use crate::messages::*;
+use crate::queue_actor::QueueActor;
-#[derive(Default)]
-pub struct TaskActor;
+pub struct TaskActor {
+ pub queue_actor: Option<Addr<QueueActor>>
+}
impl Actor for TaskActor {
type Context = SyncContext<Self>;
+
+ fn started(&mut self, context: &mut Self::Context) {}
}
-impl Handler<NewTask> for TaskActor {
+impl Handler<StartTask> for TaskActor {
type Result = ();
- fn handle(&mut self, task: NewTask, context: &mut SyncContext<Self>) {
- info!("Got new Task: {}", task.id);
+ fn handle(&mut self, task: StartTask, context: &mut Self::Context) {
+ info!("Starting Task: {}", task.command);
}
}
pub fn verify_template_parameters(
template: String,
params: &HashMap<String, String>,
-) -> Result<(), HttpResponse> {
+) -> Result<String, HttpResponse> {
info!("Got parameters: {:?}", params);
// Create a new handlebar instance and enable strict mode to prevent missing or malformed arguments
let mut handlebars = Handlebars::new();
}
Ok(result) => {
info!("Template renders properly: {}", result);
- Ok(())
+ Ok(result)
}
}
}
use ::log::info;
use crate::queue_actor::QueueActor;
-use crate::task::NewTask;
+use crate::messages::NewTask;
use crate::template::verify_template_parameters;
info!("");
info!("Incoming webhook for \"{}\":", webhook_id);
- verify_template_parameters("This is a test {{rofl}}".to_string(), ¶ms)?;
+ let command = verify_template_parameters("This is a test {{rofl}}".to_string(), ¶ms)?;
// Create a new task with the checked parameters and webhook id
let new_task = NewTask {
id: webhook_id,
parameters: params,
+ command: command,
};
// Send the task to the actor managing the queue
Ok(HttpResponse::Ok().finish())
}
+/// Initialize the web server
+/// Move the address of the queue actor inside the AppState for further dispatch
+/// of tasks to the actor
pub fn init_web_server(queue_actor: Addr<QueueActor>) {
HttpServer::new(move || {
App::new()