grafos_observe/file_socket_sink.rs
1//! File + Unix socket event sink.
2//!
3//! [`FileAndSocketSink`] writes JSON-lines to an append-only log file and,
4//! optionally, to a connected Unix stream socket client. Enabled by the
5//! `file-socket` feature flag (requires `std`).
6
7use std::io::{self, Write};
8use std::net::Shutdown;
9use std::os::unix::net::{UnixListener, UnixStream};
10use std::path::Path;
11use std::sync::{Arc, Mutex};
12
13use crate::event::{EventSink, FabricEvent};
14use crate::json_log::JsonEventSink;
15
16/// An [`EventSink`] that writes JSON-lines to a log file and a Unix socket.
17///
18/// The log file is opened in append mode. A background thread accepts
19/// connections on a Unix domain socket; at most one client is served at a time.
20/// If no client is connected, socket writes are silently skipped.
21///
22/// # Examples
23///
24/// ```no_run
25/// use grafos_observe::file_socket_sink::FileAndSocketSink;
26/// use std::path::Path;
27///
28/// let sink = FileAndSocketSink::new(
29/// Path::new("/tmp/fabric-events.jsonl"),
30/// Path::new("/tmp/fabric-events.sock"),
31/// ).expect("failed to create sink");
32/// ```
33pub struct FileAndSocketSink {
34 log_file: Mutex<std::fs::File>,
35 client: Arc<Mutex<Option<UnixStream>>>,
36 // Hold the join handle so the background thread lives as long as the sink.
37 _accept_thread: std::thread::JoinHandle<()>,
38}
39
40impl FileAndSocketSink {
41 /// Create a new sink writing to `log_path` (append-only) and listening on
42 /// `socket_path` for a single Unix stream client at a time.
43 ///
44 /// The socket file is removed if it already exists before binding.
45 pub fn new(log_path: &Path, socket_path: &Path) -> io::Result<Self> {
46 let log_file = std::fs::OpenOptions::new()
47 .create(true)
48 .append(true)
49 .open(log_path)?;
50
51 // Remove stale socket file if present.
52 if socket_path.exists() {
53 std::fs::remove_file(socket_path)?;
54 }
55 let listener = UnixListener::bind(socket_path)?;
56
57 let client: Arc<Mutex<Option<UnixStream>>> = Arc::new(Mutex::new(None));
58 let client_bg = Arc::clone(&client);
59
60 let accept_thread = std::thread::Builder::new()
61 .name("fabric-event-sock".into())
62 .spawn(move || {
63 // Accept loop: replace the current client on each new connection.
64 for stream in listener.incoming() {
65 match stream {
66 Ok(new_stream) => {
67 // Shut down the previous client, if any.
68 if let Ok(mut guard) = client_bg.lock() {
69 if let Some(old) = guard.take() {
70 let _ = old.shutdown(Shutdown::Both);
71 }
72 *guard = Some(new_stream);
73 }
74 }
75 Err(_) => {
76 // Listener error (e.g. socket removed) — exit accept loop.
77 break;
78 }
79 }
80 }
81 })?;
82
83 Ok(Self {
84 log_file: Mutex::new(log_file),
85 client,
86 _accept_thread: accept_thread,
87 })
88 }
89}
90
91impl EventSink for FileAndSocketSink {
92 fn emit(&self, event: &FabricEvent) {
93 let mut line = JsonEventSink::format_event(event);
94 line.push('\n');
95
96 // Write to log file (append mode).
97 if let Ok(mut f) = self.log_file.lock() {
98 if let Err(e) = f.write_all(line.as_bytes()) {
99 eprintln!("[grafos-observe] file write failed: {e}");
100 }
101 }
102
103 // Write to connected Unix socket client, if any.
104 if let Ok(mut guard) = self.client.lock() {
105 if let Some(ref mut stream) = *guard {
106 if stream.write_all(line.as_bytes()).is_err() {
107 // Broken pipe or client disconnected — drop the reference.
108 *guard = None;
109 }
110 }
111 }
112 }
113}