diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a92ce47 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "svr" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = { version = "0.3.28", features = ["std"] } +humantime-serde = "1.1.1" +serde = { version = "1.0.160", features = ["derive"] } +thiserror = "1.0.40" +tokio = { version = "1.28.0", features = ["macros", "rt-multi-thread", "fs", "io-util", "time"] } +toml = "0.7.3" +xflags = "0.3.1" +youtube_dl = { version = "0.8.0", features = ["tokio"] } diff --git a/README.md b/README.md new file mode 100644 index 0000000..35dc0ba --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# Stream VOD Recorder + +`svr` is a utility that monitors and downloads livestreams for any site with a youtube-dl extractor. + +Example `config.toml`: +```toml +output = "/media/livestreams" + +[[streams]] +subpath = "ThePrimeagen" +type = "youtube-dl" +url = "https://twitch.tv/ThePrimeagen" +frequency = "10m" + +[[streams]] +subpath = "LofiGirl" +type = "youtube-dl" +url = "https://www.youtube.com/@LofiGirl/live" +frequency = "2h" + +[[streams]] +subpath = "LofiGirl" +type = "youtube-dl" +url = "https://www.youtube.com/watch?v=jfKfPfyJRdk" +frequency = "2h" +``` + +## TODO +- Implement credential management for protected streams +- More configuration options for individual streams +- Handle server/stream interruptions (e.g. merging multiple stream files) +- Monitor playlists and other non-live content (monitor a channel???) +- Automatic deletion after e.g. a period of time +- Improved logging (tracing?) +- Save metadata somewhere diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..b2715b2 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +wrap_comments = true diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..6c5925f --- /dev/null +++ b/src/config.rs @@ -0,0 +1,92 @@ +use crate::Error; +use serde::{Deserialize, Serialize}; +use std::{path::PathBuf, time::Duration}; +use youtube_dl::{YoutubeDl, YoutubeDlOutput}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Config { + pub youtube_dl_path: Option, + pub output: PathBuf, + pub streams: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(tag = "type")] +pub enum Stream { + #[serde( + rename = "youtube-dl", + alias = "yt-dl", + alias = "ytdl", + alias = "yt-dlp", + alias = "ytdlp" + )] + YoutubeDl { + url: String, + #[serde(with = "humantime_serde")] + frequency: Duration, + quality: Option, + // Relative to Config::output. If `None`, will be downloaded directly to Config::output + subpath: Option, + // Overrides Config::output, still respects subpath (becomes relative to new output) + output: Option, + }, +} + +impl Stream { + /// # Errors + /// Should only return an error on an unrecoverable error, things like + /// e.g. a stream not being live or a video not being found are + /// to be handled internally. + pub async fn watch(&self, config: &Config) -> Result<(), Error> { + match self { + Stream::YoutubeDl { + url, + frequency, + quality, + subpath, + output, + } => { + loop { + // TODO: Can this dl struct be reused? + let mut dl = YoutubeDl::new(url); + dl.download(true); + + if let Some(path) = &config.youtube_dl_path { + dl.youtube_dl_path(path); + } + + if let Some(quality) = quality { + dl.format(quality); + } + + let mut output_dir = output.clone().unwrap_or(config.output.clone()); + if let Some(subpath) = subpath { + output_dir = output_dir.join(subpath); + } + dl.output_directory(output_dir.to_string_lossy()); + + match dl.run_async().await { + Ok(YoutubeDlOutput::SingleVideo(video)) => { + println!("Successfully downloading video `{url:#?}`: {video:#?}"); + } + Ok(YoutubeDlOutput::Playlist(playlist)) => { + println!("Successfully downloading playlist `{url:#?}`: {playlist:#?}"); + } + Err(e) => { + let err = if let youtube_dl::Error::ExitCode { stderr, .. } = e { + stderr + } else if let youtube_dl::Error::Io(ioerr) = e { + ioerr.to_string() + } else { + e.to_string() + }; + eprintln!("Error with video at URL `{url:#?}`: {err}"); + } + } + + tokio::time::sleep(*frequency).await; + } + } + } + } +} diff --git a/src/flags.rs b/src/flags.rs new file mode 100644 index 0000000..dd833c6 --- /dev/null +++ b/src/flags.rs @@ -0,0 +1,8 @@ +use std::path::PathBuf; + +xflags::xflags! { + cmd svr { + optional -c, --config config_path: PathBuf + } +} + diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..7b069d9 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,53 @@ +#![feature(async_closure)] + +use config::Config; +use futures::future::join_all; +use std::sync::Arc; +use thiserror::Error; +use tokio::{ + fs, + io::{self, AsyncReadExt}, +}; + +pub mod config; +mod flags; + +const DEFAULT_CONFIG_PATH: &str = "./config.toml"; + +#[derive(Debug, Error)] +pub enum Error { + #[error("IoError: {0:#?}")] + IoError(#[from] io::Error), + #[error("Invalid config: {0}")] + InvalidConfig(#[from] toml::de::Error), + #[error("Invalid command line flags: {0:#?}")] + Arguments(#[from] xflags::Error), +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let flags = flags::Svr::from_env()?; + + let config_path = flags.config.unwrap_or_else(|| DEFAULT_CONFIG_PATH.into()); + + let mut config = String::new(); + fs::File::open(&config_path) + .await? + .read_to_string(&mut config) + .await?; + + let config: Arc = Arc::new(toml::from_str(&config)?); + + // Spawn a task for each stream to watch it + // This config.streams.clone seems unnecessary since we only need an immutable + // reference + join_all(config.streams.clone().into_iter().map(|stream| { + let config = Arc::clone(&config); + tokio::spawn(async move { + stream.watch(&config).await.unwrap(); + }) + })) + .await; + + Ok(()) +}