diff --git a/Cargo.toml b/Cargo.toml index b309509..16eb106 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] # Bin to run the engine +name = "event" +path = "src/components/event.rs" + +[[bin]] # Bin to run the engine +name = "task" +path = "src/components/task.rs" + +[[bin]] # Bin to run the engine +name = "cli" +path = "src/cli.rs" + [dependencies] serde_json = { version = "1.0.104", features = ["preserve_order"] } clap = { version = "4.3.5", features = ["derive"] } @@ -24,4 +36,14 @@ diesel = { version = "2.1.0", features = ["chrono", "postgres"] } diesel_migrations = { version = "2.1.0", features = ["postgres"] } tracing = "0.1.37" prettytable-rs = "^0.10.0" -pnet = "0.34.0" \ No newline at end of file +pnet = "0.34.0" +tonic = "0.9.2" +prost = "0.11.9" +tokio = { version = "1.32", features = ["rt-multi-thread", "macros", "sync", "time"] } +# futures-util-preview = "0.2.2" +rand = "0.8.5" +async-stream = "0.3.5" +tokio-stream = "0.1.14" + +[build-dependencies] +tonic-build = "0.9" \ No newline at end of file diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..f3dd018 --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/grpc.proto")?; + Ok(()) +} diff --git a/proto/grpc.proto b/proto/grpc.proto new file mode 100644 index 0000000..3f63234 --- /dev/null +++ b/proto/grpc.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package grpc; + +service OutputStreaming { + rpc StreamOutput(OutputChunk) returns ( stream Response); +} + +message OutputChunk { + string content = 1; +} + +message Response { + string message = 1; +} \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index ad7d720..d47cc5a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Error as AnyError, Result}; +use anyhow::{anyhow, Error as AnyError, Ok, Result}; use clap::{Parser, Subcommand}; use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper}; use dotenv::dotenv; @@ -9,8 +9,7 @@ use std::env; use std::fs::File; use std::process::Command; use tracing::field; -use workflow::engine::{create_new_engine_entry, handle_stop, run_event_process}; -use workflow::engine::{run_task_process, update_engine_status}; +use workflow::engine_utils::{create_new_engine_entry, handle_stop, update_engine_status}; use workflow::models::{Engine, EngineStatus, Event, Task}; use workflow::parser::process_yaml_file; use workflow::utils::establish_pg_connection; @@ -41,6 +40,10 @@ enum Commands { StartEventProcess { engine_uid: i32, }, + Logs { + #[clap(subcommand)] + subcommand: LogsSubcommands, + }, Stop {}, /// Adds workflow to the queue Add { @@ -96,6 +99,16 @@ enum ShowSubcommands { Engine { uid: i32 }, } +#[derive(Subcommand)] +enum LogsSubcommands { + // Lists all tasks + Task { uid: i32 }, + // Lists all events + Event { uid: i32 }, + // Lists all engines + // Engine { uid: i32 }, +} + #[derive(PartialEq)] enum ProcessType { Task, @@ -109,7 +122,7 @@ fn create_and_clear_log_file(file_path: &str) -> Result { } fn start_process( - subcommand_name: &str, + binary_name: &str, process_type: ProcessType, engine_uid: i32, ) -> Result<(), AnyError> { @@ -141,8 +154,8 @@ fn start_process( } let command = binding - .arg(subcommand_name) - .arg("--") + .arg("--bin") + .arg(binary_name) .arg(engine_uid.to_string()) .stdout(stdout) .stderr(stderr); @@ -151,7 +164,7 @@ fn start_process( Ok(()) } -pub fn cli() { +pub async fn cli() { let cli = Cli::parse(); match &cli.command { @@ -174,15 +187,22 @@ pub fn cli() { } Commands::StartEventProcess { engine_uid } => { println!("StartEventProcess"); - if let Err(e) = run_event_process(*engine_uid) { - println!("Failed to start event process, {}", e); - std::process::exit(1); - }; + // if let Err(e) = run_event_process(*engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; } Commands::StartTaskProcess { engine_uid } => { println!("StartTaskProcess"); - if let Err(e) = run_task_process(*engine_uid) { - println!("Failed to start task process, {}", e); + // if let Err(e) = run_task_process(*engine_uid) { + // println!("Failed to start task process, {}", e); + // std::process::exit(1); + // }; + } + Commands::Logs { subcommand } => { + println!("Logs"); + if let Err(e) = process_log_command(subcommand).await { + println!("Failed to stop the engine, {}", e); std::process::exit(1); }; } @@ -229,6 +249,36 @@ pub fn cli() { std::process::exit(0); } +async fn process_log_command(subcommand: &LogsSubcommands) -> Result<(), AnyError> { + match subcommand { + LogsSubcommands::Task { uid } => show_log(uid.to_string()).await?, + LogsSubcommands::Event { uid } => show_log(uid.to_string()).await?, + }; + Ok(()) +} +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_client::OutputStreamingClient; +use grpc::{OutputChunk, Response as GrpcResponse}; + +use std::error::Error; +use tonic::transport::Channel; + +async fn show_log(server_id: String) -> Result<(), AnyError> { + let mut client = OutputStreamingClient::connect("http://[::1]:10000").await?; + + let mut stream = client + .stream_output(OutputChunk::default()) + .await? + .into_inner(); + + while let Some(log_message) = stream.message().await? { + println!("NOTE = {:?}", log_message); + } + Ok(()) +} + fn get_system_ip_address() -> Result { // Get a vector with all network interfaces found let all_interfaces = interfaces(); @@ -262,13 +312,13 @@ fn process_start_command() -> Result<(), AnyError> { )?; println!("created new engine entry with uid: {}", engine_uid); - if let Err(e) = start_process("start-event-process", ProcessType::Event, engine_uid) { + if let Err(e) = start_process("event", ProcessType::Event, engine_uid) { eprintln!("Failed to start Event process: {}", e); eprintln!("exiting..."); std::process::exit(1); } - if let Err(e) = start_process("start-task-process", ProcessType::Task, engine_uid) { + if let Err(e) = start_process("task", ProcessType::Task, engine_uid) { eprintln!("Failed to start Task process: {}", e); eprintln!("exiting..."); std::process::exit(1); @@ -416,6 +466,13 @@ fn list_items(items: Vec) -> Result<(), AnyError> { Ok(()) } +#[tokio::main] + +async fn main() -> Result<(), AnyError> { + cli().await; + Ok(()) +} + // fn is_redis_running() -> bool { // let redis_result = create_redis_connection(); // if let Err(e) = redis_result { diff --git a/src/engine/event.rs b/src/components/event.rs similarity index 63% rename from src/engine/event.rs rename to src/components/event.rs index 9d01718..bcbf3f5 100644 --- a/src/engine/event.rs +++ b/src/components/event.rs @@ -1,6 +1,3 @@ -use crate::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; -use crate::schema; -use crate::utils::{establish_pg_connection, push_tasks_to_queue}; use anyhow::Error as AnyError; use diesel::prelude::*; use std::path::Path; @@ -8,13 +5,57 @@ use std::process::Command as ShellCommand; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use std::{str, thread}; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{EventStatus, LightEvent, LightTask, ProcessStatus}; +use workflow::schema; +use workflow::utils::{establish_pg_connection, push_tasks_to_queue}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} pub fn poll_events(running: Arc, engine_uid: i32) -> Result<(), AnyError> { let mut event_uids: Vec = Vec::new(); let pg_conn = &mut establish_pg_connection(); - use crate::schema::engines::dsl::*; + use workflow::schema::engines::dsl::*; diesel::update(engines) .filter(uid.eq(engine_uid)) @@ -91,14 +132,14 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> { .expect("failed to execute process"); // if shell command return 0, then the event was triggered successfully - use crate::schema::events::dsl::*; + use workflow::schema::events::dsl::*; if output.status.code().unwrap() == 0 { diesel::update(events.find(event.uid)) .set(status.eq(EventStatus::Succeeded.to_string())) .execute(conn)?; { - use crate::schema::tasks::dsl::*; + use workflow::schema::tasks::dsl::*; let light_tasks: Vec = tasks .select(LightTask::as_select()) .filter(event_uid.eq(event.uid)) @@ -132,3 +173,37 @@ fn execute_event(event: LightEvent) -> Result<(), AnyError> { println!("##############################################"); Ok(()) } + +pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { + run_process("Event", poll_events, engine_uid) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_event_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10001".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello from event".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} diff --git a/src/engine/task.rs b/src/components/task.rs similarity index 50% rename from src/engine/task.rs rename to src/components/task.rs index d51cfa9..3a381f3 100644 --- a/src/engine/task.rs +++ b/src/components/task.rs @@ -1,16 +1,62 @@ -use crate::models::{Engine, EngineStatus, LightTask, ProcessStatus, TaskStatus}; -use crate::utils::{self, create_redis_connection, establish_pg_connection}; use anyhow::Error as AnyError; use bincode::deserialize; use diesel::prelude::*; +use rand::seq::IteratorRandom; use rayon::ThreadPoolBuilder; use redis::{Commands as RedisCommand, FromRedisValue}; +use tonic::server::ServerStreamingService; +// use std::io::BufReader; use std::path::Path; -use std::process::Command as ShellCommand; +use std::pin::Pin; +use std::process::{ChildStdout, Command as ShellCommand, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use std::{str, thread}; +use std::{env, str, thread}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; +use workflow::engine_utils::run_process; +use workflow::models::{LightTask, ProcessStatus, TaskStatus}; +use workflow::utils::{self, create_redis_connection, establish_pg_connection}; + +pub mod grpc { + tonic::include_proto!("grpc"); +} +// use grpc::output_streaming_client::OutputStreamingClient; +use grpc::output_streaming_server::{OutputStreaming, OutputStreamingServer}; +use grpc::{OutputChunk, Response as GrpcResponse}; + +#[derive(Debug)] +pub struct OutputStreamer { + // an stdout pipe steam + features: Arc>, +} + +#[tonic::async_trait] +impl OutputStreaming for OutputStreamer { + type StreamOutputStream = ReceiverStream>; + + async fn stream_output( + &self, + request: Request, + ) -> Result, Status> { + let (mut tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + // Spawn an async task to send the output data to the client + + tokio::spawn(async move { + for feature in &features[..] { + tx.send(Ok(feature.clone())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} const THREAD_COUNT: usize = 4; @@ -19,7 +65,7 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), let pg_conn = &mut establish_pg_connection(); let mut redis_con = create_redis_connection()?; - use crate::schema::engines::dsl::*; + use workflow::schema::engines::dsl::*; diesel::update(engines) .filter(uid.eq(engine_uid)) @@ -36,9 +82,10 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), thread_pool.spawn(move || { let task: LightTask = deserialize(popped_value.as_bytes()).unwrap(); println!("Task: {}", task); - if let Err(e) = execute_task(task) { - println!("Failed to execute task {}", e); - }; + let future = execute_task(task); + // if let Err(e) = execute_task(task) { + // println!("Failed to execute task {}", e); + // }; }); } None => { @@ -83,13 +130,12 @@ pub fn queue_processor(running: Arc, engine_uid: i32) -> Result<(), Ok(()) } -fn execute_task(task: LightTask) -> Result<(), AnyError> { +async fn execute_task(task: LightTask) -> Result<(), AnyError> { println!("Task Executor"); - use crate::schema::tasks::dsl::*; + use workflow::schema::tasks::dsl::*; let conn = &mut establish_pg_connection(); - // todo , update task status to running - + // let mut client = OutputStreamingClient::connect("http://[::1]:50051").await?; diesel::update(tasks.find(task.uid)) .set(( status.eq(TaskStatus::Running.to_string()), @@ -103,12 +149,62 @@ fn execute_task(task: LightTask) -> Result<(), AnyError> { }; let path_dirname = Path::new(&task.path).parent().unwrap(); - let output = ShellCommand::new("bash") + let mut cmd = ShellCommand::new("bash") .arg(path_basename) .current_dir(path_dirname) - .output() + .stdout(Stdio::piped()) + .spawn() .expect("failed to execute process"); + let stdout_content = cmd + .stdout + .take() + .expect("Could not capture standard output"); + // let reader = tokio::io::BufReader::new(stdout_content.into()); + + // let mut client_stream = client + // .stream_output(Request::new(OutputChunk { + // content: "Tonic".into(), + // })) + // .await? + // .into_inner(); + + // tokio::spawn(async move { + // let mut buf = String::new(); + // let mut reader = reader; + // loop { + // buf.clear(); + // if reader.read_line(&mut buf).await.unwrap_or(0) == 0 { + // break; + // } + + // let request = Request::new(grpc::OutputChunk { + // content: buf.clone(), + // }); + + // if let Err(_) = client_stream.send(request).await { + // break; + // } + // } + // }); + + let output = cmd.wait_with_output().expect("Failed to wait for command"); + + println!( + "task id: {} , path: {}\nFinished executing with a status: {}", + task.uid, task.path, output.status + ); + + // let stdout_content = str::from_utf8(stdout_content)?; + + // for chunk in stdout_content.chars().collect::>().chunks(10) { + // let request = tonic::Request::new(mygrpc::OutputChunk { + // content: chunk.iter().collect(), + // }); + + // client.stream_output(request).await?; + // } + if output.status.code().unwrap() == 0 { diesel::update(tasks.find(task.uid)) .set(( @@ -147,3 +243,37 @@ fn execute_task(task: LightTask) -> Result<(), AnyError> { println!("##############################################"); Ok(()) } + +pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { + run_process("Task", queue_processor, engine_uid) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + tokio::spawn(async move { + if let Err(e) = run_task_process(engine_uid) { + println!("Failed to start event process, {}", e); + std::process::exit(1); + }; + }); + + let addr = "[::1]:10000".parse().unwrap(); + + let stream = OutputStreamer { + features: Arc::new(vec![GrpcResponse { + message: "Hello from task".into(), + }]), + }; + + let svc = OutputStreamingServer::new(stream); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} diff --git a/src/engine.rs b/src/engine.rs index bbbaeff..570f1b0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,86 +1,81 @@ -use crate::models::EngineStatus; -use crate::utils::establish_pg_connection; -use crate::{models, schema}; use anyhow::Error as AnyError; -use ctrlc::set_handler; -use diesel::PgConnection; -use std::str; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::env; +use workflow::components::event::poll_events; +use workflow::components::task::queue_processor; +use workflow::engine_utils::run_process; +// mod models; +// mod utils; use diesel::prelude::*; -use self::event::poll_events; -use self::task::queue_processor; +// I want to use the poll_events function from src/event.rs -mod event; -mod task; +use tonic::{transport::Server, Request, Response, Status}; -fn run_process(process_name: &str, process_fn: F, engine_uid: i32) -> Result<(), AnyError> -where - F: FnOnce(Arc, i32) -> Result<(), AnyError>, -{ - let running = Arc::new(AtomicBool::new(true)); - let r = running.clone(); +use grpc::greeter_server::{Greeter, GreeterServer}; +use grpc::{LogMessageRequest, LogMessageResponse}; - set_handler(move || { - r.store(false, Ordering::SeqCst); - }) - .expect("Error setting Ctrl-C handler"); - - if let Err(e) = process_fn(running, engine_uid) { - eprintln!("Failed to start {} process: {}", process_name, e); - eprintln!("exiting..."); - std::process::exit(1); - } - println!("{} process stopped correctly", process_name); - - Ok(()) +pub mod grpc { + tonic::include_proto!("grpc"); } -pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Task", queue_processor, engine_uid) -} +// pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Task", queue_processor, engine_uid) +// } -pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { - run_process("Event", poll_events, engine_uid) -} +// pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Event", poll_events, engine_uid) +// } -pub fn handle_stop() -> Result<(), AnyError> { - diesel::update(schema::engines::table) - .set(schema::engines::stop_signal.eq(true)) - .execute(&mut establish_pg_connection())?; - Ok(()) -} +#[derive(Debug, Default)] +pub struct MyGreeter {} -pub fn update_engine_status( - conn: &mut PgConnection, - engine_uid: i32, - engine_status: EngineStatus, -) -> Result<(), diesel::result::Error> { - use crate::schema::engines::dsl::*; +#[tonic::async_trait] +impl Greeter for MyGreeter { + async fn say_hello( + &self, + request: Request, + ) -> Result, Status> { + println!("Got a request: {:?}", request); - diesel::update(engines) - .filter(uid.eq(engine_uid)) - .set(status.eq(engine_status.to_string())) - .execute(conn)?; + let reply = grpc::LogMessageResponse { + message: format!("Hello {}!", request.into_inner().content).into(), + }; - Ok(()) + Ok(Response::new(reply)) + } } -pub fn create_new_engine_entry( - conn: &mut PgConnection, - name: &str, - ip_address: &str, -) -> Result { - use crate::schema::engines::table as engines; - use crate::schema::engines::uid as engine_uid; - - let new_engine = models::NewEngine { name, ip_address }; - - //insert and return uid - diesel::insert_into(engines) - .values(&new_engine) - .returning(engine_uid) - .get_result::(conn) +//main function that takes an argument +#[tokio::main] +async fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + println!("args: {:?}", args); + + let engine_uid = args[1].parse::().unwrap(); + println!("engine_uid: {}", engine_uid); + + // tokio::spawn(async move { + // if let Err(e) = run_event_process(engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; + // }); + + // tokio::spawn(async move { + // if let Err(e) = run_task_process(engine_uid) { + // println!("Failed to start event process, {}", e); + // std::process::exit(1); + // }; + // }); + + let addr = "[::1]:50051".parse()?; + let greeter = MyGreeter::default(); + + Server::builder() + .add_service(GreeterServer::new(greeter)) + .serve(addr) + .await?; + + Ok(()) } diff --git a/src/engine_utils.rs b/src/engine_utils.rs new file mode 100644 index 0000000..6af005c --- /dev/null +++ b/src/engine_utils.rs @@ -0,0 +1,84 @@ +use anyhow::Error as AnyError; +use ctrlc::set_handler; +use diesel::PgConnection; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::{env, str}; +// use workflow::components::event::poll_events; +// use workflow::components::task::queue_processor; +use crate::models::{EngineStatus, NewEngine}; +use crate::schema; +use crate::utils::establish_pg_connection; +// mod models; +// mod utils; + +use diesel::prelude::*; + +pub fn run_process(process_name: &str, process_fn: F, engine_uid: i32) -> Result<(), AnyError> +where + F: FnOnce(Arc, i32) -> Result<(), AnyError>, +{ + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + + set_handler(move || { + r.store(false, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + if let Err(e) = process_fn(running, engine_uid) { + eprintln!("Failed to start {} process: {}", process_name, e); + eprintln!("exiting..."); + std::process::exit(1); + } + println!("{} process stopped correctly", process_name); + + Ok(()) +} + +// pub fn run_task_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Task", queue_processor, engine_uid) +// } + +// pub fn run_event_process(engine_uid: i32) -> Result<(), AnyError> { +// run_process("Event", poll_events, engine_uid) +// } + +pub fn handle_stop() -> Result<(), AnyError> { + diesel::update(schema::engines::table) + .set(schema::engines::stop_signal.eq(true)) + .execute(&mut establish_pg_connection())?; + Ok(()) +} + +pub fn update_engine_status( + conn: &mut PgConnection, + engine_uid: i32, + engine_status: EngineStatus, +) -> Result<(), diesel::result::Error> { + use crate::schema::engines::dsl::*; + + diesel::update(engines) + .filter(uid.eq(engine_uid)) + .set(status.eq(engine_status.to_string())) + .execute(conn)?; + + Ok(()) +} + +pub fn create_new_engine_entry( + conn: &mut PgConnection, + name: &str, + ip_address: &str, +) -> Result { + use crate::schema::engines::table as engines; + use crate::schema::engines::uid as engine_uid; + + let new_engine = NewEngine { name, ip_address }; + + //insert and return uid + diesel::insert_into(engines) + .values(&new_engine) + .returning(engine_uid) + .get_result::(conn) +} diff --git a/src/lib.rs b/src/lib.rs index 6379791..d349bed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,8 @@ -pub mod engine; +// pub mod components { +// pub mod event; +// pub mod task; +// } +pub mod engine_utils; pub mod models; pub mod parser; pub mod schema; diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index a9e1011..0000000 --- a/src/main.rs +++ /dev/null @@ -1,8 +0,0 @@ -use anyhow::Error as AnyError; - -mod cli; - -fn main() -> Result<(), AnyError> { - cli::cli(); - Ok(()) -} diff --git a/tests/workflows/log_stream/ping.sh b/tests/workflows/log_stream/ping.sh new file mode 100644 index 0000000..38d9020 --- /dev/null +++ b/tests/workflows/log_stream/ping.sh @@ -0,0 +1,3 @@ +#! /bin/sh +set -e +ping -c 1 google.com \ No newline at end of file diff --git a/tests/workflows/log_stream/tasks/stream.sh b/tests/workflows/log_stream/tasks/stream.sh new file mode 100644 index 0000000..54ae910 --- /dev/null +++ b/tests/workflows/log_stream/tasks/stream.sh @@ -0,0 +1,6 @@ +#! /bin/bash + +for second in $(seq 1 100); do + echo "Stream $second" + sleep 1 +done \ No newline at end of file diff --git a/tests/workflows/log_stream/workflow.yml b/tests/workflows/log_stream/workflow.yml new file mode 100644 index 0000000..6b0dcdd --- /dev/null +++ b/tests/workflows/log_stream/workflow.yml @@ -0,0 +1,7 @@ +name: simulation of log stream +description: simulation of log stream +events: + - name: Event2 + trigger: ./ping.sh + tasks: + - path: ./tasks/stream.sh