1use alloc::string::String;
7
8use crate::event::{EventSink, FabricEvent};
9
10pub struct JsonEventSink;
36
37impl JsonEventSink {
38 pub fn format_event(event: &FabricEvent) -> String {
43 let ts = timestamp_json();
44 let layer = event_layer(event);
45 let body = Self::format_event_body(event);
46 format!(r#"{{{ts}"layer":"{layer}",{body}}}"#)
47 }
48
49 fn format_event_body(event: &FabricEvent) -> String {
51 match event {
52 FabricEvent::LeaseAcquired {
53 resource_type,
54 lease_id,
55 node,
56 bytes,
57 trace_id,
58 } => {
59 let tid = trace_id_json(trace_id);
60 format!(
61 r#""event_type":"lease_acquired","resource_type":"{}","lease_id":{},"node":"{}","bytes":{}{}"#,
62 resource_type,
63 lease_id,
64 escape_json(node),
65 bytes,
66 tid
67 )
68 }
69 FabricEvent::LeaseDropped {
70 resource_type,
71 lease_id,
72 node,
73 } => {
74 format!(
75 r#""event_type":"lease_dropped","resource_type":"{}","lease_id":{},"node":"{}""#,
76 resource_type,
77 lease_id,
78 escape_json(node)
79 )
80 }
81 FabricEvent::LeaseExpired {
82 resource_type,
83 lease_id,
84 node,
85 } => {
86 format!(
87 r#""event_type":"lease_expired","resource_type":"{}","lease_id":{},"node":"{}""#,
88 resource_type,
89 lease_id,
90 escape_json(node)
91 )
92 }
93 FabricEvent::OpCompleted {
94 op_type,
95 duration_us,
96 bytes,
97 } => {
98 format!(
99 r#""event_type":"op_completed","op_type":"{}","duration_us":{},"bytes":{}"#,
100 op_type, duration_us, bytes
101 )
102 }
103 FabricEvent::OpFailed { op_type, error } => {
104 format!(
105 r#""event_type":"op_failed","op_type":"{}","error":"{}""#,
106 op_type,
107 escape_json(error)
108 )
109 }
110 FabricEvent::RewriteStarted { plan_id } => {
111 format!(r#""event_type":"rewrite_started","plan_id":{}"#, plan_id)
112 }
113 FabricEvent::RewriteCompleted { plan_id, phase } => {
114 format!(
115 r#""event_type":"rewrite_completed","plan_id":{},"phase":"{}""#,
116 plan_id, phase
117 )
118 }
119 FabricEvent::ServiceRegistered { name, version } => {
120 format!(
121 r#""event_type":"service_registered","name":"{}","version":"{}""#,
122 escape_json(name),
123 escape_json(version)
124 )
125 }
126 FabricEvent::ServiceDeregistered { name } => {
127 format!(
128 r#""event_type":"service_deregistered","name":"{}""#,
129 escape_json(name)
130 )
131 }
132 FabricEvent::MessagePublished { topic, bytes } => {
133 format!(
134 r#""event_type":"message_published","topic":"{}","bytes":{}"#,
135 escape_json(topic),
136 bytes
137 )
138 }
139 FabricEvent::MessageConsumed { topic, group } => {
140 format!(
141 r#""event_type":"message_consumed","topic":"{}","group":"{}""#,
142 escape_json(topic),
143 escape_json(group)
144 )
145 }
146 FabricEvent::ObjectStored { bucket, key, bytes } => {
147 format!(
148 r#""event_type":"object_stored","bucket":"{}","key":"{}","bytes":{}"#,
149 escape_json(bucket),
150 escape_json(key),
151 bytes
152 )
153 }
154 FabricEvent::ObjectRetrieved { bucket, key, bytes } => {
155 format!(
156 r#""event_type":"object_retrieved","bucket":"{}","key":"{}","bytes":{}"#,
157 escape_json(bucket),
158 escape_json(key),
159 bytes
160 )
161 }
162 FabricEvent::LeaseRevoked {
163 resource_type,
164 lease_id,
165 node,
166 trace_id,
167 } => {
168 let tid = trace_id_json(trace_id);
169 format!(
170 r#""event_type":"lease_revoked","resource_type":"{}","lease_id":{},"node":"{}"{}"#,
171 resource_type,
172 lease_id,
173 escape_json(node),
174 tid
175 )
176 }
177 FabricEvent::LeaseFenced {
178 resource_type,
179 lease_id,
180 node,
181 reason,
182 trace_id,
183 } => {
184 let tid = trace_id_json(trace_id);
185 format!(
186 r#""event_type":"lease_fenced","resource_type":"{}","lease_id":{},"node":"{}","reason":"{}"{}"#,
187 resource_type,
188 lease_id,
189 escape_json(node),
190 escape_json(reason),
191 tid
192 )
193 }
194 FabricEvent::TeardownFailed {
195 resource_type,
196 lease_id,
197 node,
198 error,
199 } => {
200 format!(
201 r#""event_type":"teardown_failed","resource_type":"{}","lease_id":{},"node":"{}","error":"{}""#,
202 resource_type,
203 lease_id,
204 escape_json(node),
205 escape_json(error)
206 )
207 }
208 FabricEvent::AuthFailed {
209 node,
210 reason,
211 trace_id,
212 } => {
213 let tid = trace_id_json(trace_id);
214 format!(
215 r#""event_type":"auth_failed","node":"{}","reason":"{}"{}"#,
216 escape_json(node),
217 escape_json(reason),
218 tid
219 )
220 }
221 FabricEvent::ReplayRejected {
222 node,
223 nonce,
224 trace_id,
225 } => {
226 let tid = trace_id_json(trace_id);
227 format!(
228 r#""event_type":"replay_rejected","node":"{}","nonce":{}{}"#,
229 escape_json(node),
230 nonce,
231 tid
232 )
233 }
234 FabricEvent::TokenValidationFailed { node, reason } => {
235 format!(
236 r#""event_type":"token_validation_failed","node":"{}","reason":"{}""#,
237 escape_json(node),
238 escape_json(reason)
239 )
240 }
241 FabricEvent::ListenerAcquired {
242 port,
243 node,
244 lease_id,
245 } => {
246 format!(
247 r#""event_type":"listener_acquired","port":{},"node":"{}","lease_id":{}"#,
248 port,
249 escape_json(node),
250 lease_id
251 )
252 }
253 FabricEvent::ListenerRevoked {
254 port,
255 node,
256 lease_id,
257 } => {
258 format!(
259 r#""event_type":"listener_revoked","port":{},"node":"{}","lease_id":{}"#,
260 port,
261 escape_json(node),
262 lease_id
263 )
264 }
265 FabricEvent::ListenerFenced {
266 port,
267 node,
268 lease_id,
269 reason,
270 } => {
271 format!(
272 r#""event_type":"listener_fenced","port":{},"node":"{}","lease_id":{},"reason":"{}""#,
273 port,
274 escape_json(node),
275 lease_id,
276 escape_json(reason)
277 )
278 }
279 FabricEvent::SessionAccepted {
280 listener_port,
281 session_id,
282 node,
283 } => {
284 format!(
285 r#""event_type":"session_accepted","listener_port":{},"session_id":{},"node":"{}""#,
286 listener_port,
287 session_id,
288 escape_json(node)
289 )
290 }
291 FabricEvent::SessionClosed {
292 listener_port,
293 session_id,
294 node,
295 } => {
296 format!(
297 r#""event_type":"session_closed","listener_port":{},"session_id":{},"node":"{}""#,
298 listener_port,
299 session_id,
300 escape_json(node)
301 )
302 }
303 FabricEvent::SessionDrained {
304 listener_port,
305 sessions_drained,
306 node,
307 } => {
308 format!(
309 r#""event_type":"session_drained","listener_port":{},"sessions_drained":{},"node":"{}""#,
310 listener_port,
311 sessions_drained,
312 escape_json(node)
313 )
314 }
315 FabricEvent::ServiceDeployed {
316 name,
317 instance_count,
318 } => {
319 format!(
320 r#""event_type":"service_deployed","name":"{}","instance_count":{}"#,
321 escape_json(name),
322 instance_count
323 )
324 }
325 FabricEvent::ServiceInstanceStateChanged {
326 name,
327 instance_id,
328 state,
329 } => {
330 format!(
331 r#""event_type":"service_instance_state_changed","name":"{}","instance_id":{},"state":"{}""#,
332 escape_json(name),
333 instance_id,
334 escape_json(state)
335 )
336 }
337 FabricEvent::ServiceCutoverStarted { name } => {
338 format!(
339 r#""event_type":"service_cutover_started","name":"{}""#,
340 escape_json(name)
341 )
342 }
343 FabricEvent::ServiceCutoverCompleted { name } => {
344 format!(
345 r#""event_type":"service_cutover_completed","name":"{}""#,
346 escape_json(name)
347 )
348 }
349 FabricEvent::ServiceFailoverTriggered { name, reason } => {
350 format!(
351 r#""event_type":"service_failover_triggered","name":"{}","reason":"{}""#,
352 escape_json(name),
353 escape_json(reason)
354 )
355 }
356 FabricEvent::ServiceFailoverCompleted { name } => {
357 format!(
358 r#""event_type":"service_failover_completed","name":"{}""#,
359 escape_json(name)
360 )
361 }
362 FabricEvent::ServiceIngressFenced { name, instance_id } => {
363 format!(
364 r#""event_type":"service_ingress_fenced","name":"{}","instance_id":{}"#,
365 escape_json(name),
366 instance_id
367 )
368 }
369 FabricEvent::ServiceUndeployed { name } => {
370 format!(
371 r#""event_type":"service_undeployed","name":"{}""#,
372 escape_json(name)
373 )
374 }
375 FabricEvent::TaskletSubmitted {
376 tasklet_id,
377 node,
378 runtime_type,
379 trace_id,
380 } => {
381 let tid = trace_id_json(trace_id);
382 format!(
383 r#""event_type":"tasklet_submitted","tasklet_id":{tasklet_id},"node":"{}","runtime_type":"{}"{}"#,
384 escape_json(node),
385 escape_json(runtime_type),
386 tid
387 )
388 }
389 FabricEvent::TaskletCompleted {
390 tasklet_id,
391 status,
392 duration_us,
393 output_bytes,
394 runtime_type,
395 trace_id,
396 } => {
397 let tid = trace_id_json(trace_id);
398 format!(
399 r#""event_type":"tasklet_completed","tasklet_id":{tasklet_id},"status":{status},"duration_us":{duration_us},"output_bytes":{output_bytes},"runtime_type":"{}"{}"#,
400 escape_json(runtime_type),
401 tid
402 )
403 }
404 FabricEvent::TaskletFailed {
405 tasklet_id,
406 status,
407 duration_us,
408 reason,
409 runtime_type,
410 trace_id,
411 } => {
412 let tid = trace_id_json(trace_id);
413 format!(
414 r#""event_type":"tasklet_failed","tasklet_id":{tasklet_id},"status":{status},"duration_us":{duration_us},"reason":"{}","runtime_type":"{}"{}"#,
415 escape_json(reason),
416 escape_json(runtime_type),
417 tid
418 )
419 }
420 FabricEvent::SecurityProfileActive { mode } => {
421 format!(
422 r#""event_type":"security_profile_active","mode":"{}""#,
423 escape_json(mode)
424 )
425 }
426 FabricEvent::AdmissionApproved {
427 tenant_id,
428 node,
429 resource_type,
430 bytes,
431 trace_id,
432 } => {
433 let tid = trace_id_json(trace_id);
434 format!(
435 r#""event_type":"admission_approved","tenant_id":"{}","node":"{}","resource_type":"{}","bytes":{}{}"#,
436 escape_json(tenant_id),
437 escape_json(node),
438 escape_json(resource_type),
439 bytes,
440 tid
441 )
442 }
443 FabricEvent::AdmissionDenied {
444 tenant_id,
445 resource_type,
446 reason,
447 quota_violation,
448 trace_id,
449 } => {
450 let tid = trace_id_json(trace_id);
451 let quota = quota_violation_json(quota_violation);
452 format!(
453 r#""event_type":"admission_denied","tenant_id":"{}","resource_type":"{}","reason":"{}"{}{}"#,
454 escape_json(tenant_id),
455 escape_json(resource_type),
456 escape_json(reason),
457 quota,
458 tid
459 )
460 }
461 FabricEvent::PlacementDecision {
462 tenant_id,
463 node,
464 strategy,
465 score,
466 trace_id,
467 } => {
468 let tid = trace_id_json(trace_id);
469 format!(
470 r#""event_type":"placement_decision","tenant_id":"{}","node":"{}","strategy":"{}","score":{}{}"#,
471 escape_json(tenant_id),
472 escape_json(node),
473 escape_json(strategy),
474 score,
475 tid
476 )
477 }
478 FabricEvent::PreemptionTriggered {
479 victim_lease_id,
480 victim_tenant,
481 preemptor_tenant,
482 node,
483 reason,
484 trace_id,
485 } => {
486 let tid = trace_id_json(trace_id);
487 format!(
488 r#""event_type":"preemption_triggered","victim_lease_id":{},"victim_tenant":"{}","preemptor_tenant":"{}","node":"{}","reason":"{}"{}"#,
489 victim_lease_id,
490 escape_json(victim_tenant),
491 escape_json(preemptor_tenant),
492 escape_json(node),
493 reason.as_str(),
494 tid
495 )
496 }
497 FabricEvent::CrossStateDisagreementResolved {
498 kind,
499 protocol,
500 lease_id,
501 authority,
502 trace_id,
503 } => {
504 let tid = trace_id_json(trace_id);
505 format!(
506 r#""event_type":"cross_state_disagreement_resolved","kind":"{}","protocol":"{}","lease_id":{},"authority":"{}"{}"#,
507 kind,
508 protocol,
509 lease_id,
510 escape_json(authority),
511 tid
512 )
513 }
514 FabricEvent::QuotaExceeded {
515 tenant_id,
516 resource_type,
517 requested,
518 limit,
519 trace_id,
520 } => {
521 let tid = trace_id_json(trace_id);
522 format!(
523 r#""event_type":"quota_exceeded","tenant_id":"{}","resource_type":"{}","requested":{},"limit":{}{}"#,
524 escape_json(tenant_id),
525 escape_json(resource_type),
526 requested,
527 limit,
528 tid
529 )
530 }
531 FabricEvent::TokenMinted {
532 tenant_id,
533 node,
534 ttl_secs,
535 trace_id,
536 } => {
537 let tid = trace_id_json(trace_id);
538 format!(
539 r#""event_type":"token_minted","tenant_id":"{}","node":"{}","ttl_secs":{}{}"#,
540 escape_json(tenant_id),
541 escape_json(node),
542 ttl_secs,
543 tid
544 )
545 }
546 FabricEvent::ProcessStarted {
547 pid,
548 scenario,
549 nodes,
550 } => {
551 format!(
552 r#""event_type":"process_started","pid":"{}","scenario":"{}","nodes":"{}""#,
553 escape_json(pid),
554 escape_json(scenario),
555 escape_json(nodes)
556 )
557 }
558 FabricEvent::ProcessHeartbeat { pid } => {
559 format!(
560 r#""event_type":"process_heartbeat","pid":"{}""#,
561 escape_json(pid)
562 )
563 }
564 FabricEvent::ProcessCompleted {
565 pid,
566 exit_code,
567 duration_secs,
568 } => {
569 format!(
570 r#""event_type":"process_completed","pid":"{}","exit_code":{},"duration_secs":{}"#,
571 escape_json(pid),
572 exit_code,
573 duration_secs
574 )
575 }
576 FabricEvent::SchedulerElectionLost { reason, epoch } => {
577 format!(
578 r#""event_type":"scheduler_election_lost","reason":"{}","epoch":{}"#,
579 escape_json(reason),
580 epoch
581 )
582 }
583 FabricEvent::SchedulerPromotionFailed { reason, epoch } => {
584 format!(
585 r#""event_type":"scheduler_promotion_failed","reason":"{}","epoch":{}"#,
586 escape_json(reason),
587 epoch
588 )
589 }
590 FabricEvent::SchedulerStaleLeaderDetected {
591 local_epoch,
592 node_epoch,
593 } => {
594 format!(
595 r#""event_type":"scheduler_stale_leader_detected","local_epoch":{},"node_epoch":{}"#,
596 local_epoch, node_epoch
597 )
598 }
599 FabricEvent::SchedulerPromoted {
600 epoch,
601 nodes_fenced,
602 latency_ms,
603 } => {
604 format!(
605 r#""event_type":"scheduler_promoted","epoch":{},"nodes_fenced":{},"latency_ms":{}"#,
606 epoch, nodes_fenced, latency_ms
607 )
608 }
609 }
610 }
611}
612
613impl EventSink for JsonEventSink {
614 fn emit(&self, event: &FabricEvent) {
615 println!("{}", Self::format_event(event));
616 }
617}
618
619pub fn event_layer(event: &FabricEvent) -> &'static str {
627 match event {
628 FabricEvent::LeaseAcquired { .. }
629 | FabricEvent::LeaseDropped { .. }
630 | FabricEvent::LeaseExpired { .. }
631 | FabricEvent::LeaseRevoked { .. }
632 | FabricEvent::LeaseFenced { .. }
633 | FabricEvent::TeardownFailed { .. }
634 | FabricEvent::AuthFailed { .. }
635 | FabricEvent::ReplayRejected { .. }
636 | FabricEvent::TokenValidationFailed { .. }
637 | FabricEvent::SecurityProfileActive { .. }
638 | FabricEvent::OpCompleted { .. }
639 | FabricEvent::OpFailed { .. } => "fabricbios",
640
641 FabricEvent::ServiceRegistered { .. }
642 | FabricEvent::ServiceDeregistered { .. }
643 | FabricEvent::ServiceDeployed { .. }
644 | FabricEvent::ServiceInstanceStateChanged { .. }
645 | FabricEvent::ServiceCutoverStarted { .. }
646 | FabricEvent::ServiceCutoverCompleted { .. }
647 | FabricEvent::ServiceFailoverTriggered { .. }
648 | FabricEvent::ServiceFailoverCompleted { .. }
649 | FabricEvent::ServiceIngressFenced { .. }
650 | FabricEvent::ServiceUndeployed { .. }
651 | FabricEvent::TaskletSubmitted { .. }
652 | FabricEvent::TaskletCompleted { .. }
653 | FabricEvent::TaskletFailed { .. }
654 | FabricEvent::RewriteStarted { .. }
655 | FabricEvent::RewriteCompleted { .. }
656 | FabricEvent::MessagePublished { .. }
657 | FabricEvent::MessageConsumed { .. }
658 | FabricEvent::ObjectStored { .. }
659 | FabricEvent::ObjectRetrieved { .. }
660 | FabricEvent::AdmissionApproved { .. }
661 | FabricEvent::AdmissionDenied { .. }
662 | FabricEvent::PlacementDecision { .. }
663 | FabricEvent::PreemptionTriggered { .. }
664 | FabricEvent::CrossStateDisagreementResolved { .. }
665 | FabricEvent::QuotaExceeded { .. }
666 | FabricEvent::TokenMinted { .. }
667 | FabricEvent::ListenerAcquired { .. }
668 | FabricEvent::ListenerRevoked { .. }
669 | FabricEvent::ListenerFenced { .. }
670 | FabricEvent::SessionAccepted { .. }
671 | FabricEvent::SessionClosed { .. }
672 | FabricEvent::SessionDrained { .. }
673 | FabricEvent::ProcessStarted { .. }
674 | FabricEvent::ProcessHeartbeat { .. }
675 | FabricEvent::ProcessCompleted { .. }
676 | FabricEvent::SchedulerElectionLost { .. }
677 | FabricEvent::SchedulerPromotionFailed { .. }
678 | FabricEvent::SchedulerStaleLeaderDetected { .. }
679 | FabricEvent::SchedulerPromoted { .. } => "grafos",
680 }
681}
682
683#[cfg(feature = "std")]
688fn timestamp_json() -> String {
689 use std::time::SystemTime;
690 let now = SystemTime::now();
691 let dur = now
692 .duration_since(SystemTime::UNIX_EPOCH)
693 .unwrap_or_default();
694 let secs = dur.as_secs();
695
696 let days = secs / 86400;
699 let time_of_day = secs % 86400;
700 let hours = time_of_day / 3600;
701 let minutes = (time_of_day % 3600) / 60;
702 let seconds = time_of_day % 60;
703
704 let z = days as i64 + 719468;
706 let era = if z >= 0 { z } else { z - 146096 } / 146097;
707 let doe = (z - era * 146097) as u64;
708 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
709 let y = yoe as i64 + era * 400;
710 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
711 let mp = (5 * doy + 2) / 153;
712 let d = doy - (153 * mp + 2) / 5 + 1;
713 let m = if mp < 10 { mp + 3 } else { mp - 9 };
714 let y = if m <= 2 { y + 1 } else { y };
715
716 format!(
717 r#""timestamp":"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z","#,
718 y, m, d, hours, minutes, seconds
719 )
720}
721
722#[cfg(not(feature = "std"))]
724fn timestamp_json() -> String {
725 String::new()
726}
727
728fn trace_id_json(trace_id: &Option<String>) -> String {
732 match trace_id {
733 Some(tid) => format!(r#","trace_id":"{}""#, escape_json(tid)),
734 None => String::new(),
735 }
736}
737
738fn quota_violation_json(violation: &Option<grafos_core::QuotaViolation>) -> String {
739 match violation {
740 Some(v) => format!(r#","quota_violation":"{}""#, v.as_str()),
741 None => String::new(),
742 }
743}
744
745fn escape_json(s: &str) -> String {
747 let mut out = String::with_capacity(s.len());
748 for c in s.chars() {
749 match c {
750 '"' => out.push_str("\\\""),
751 '\\' => out.push_str("\\\\"),
752 '\n' => out.push_str("\\n"),
753 '\r' => out.push_str("\\r"),
754 '\t' => out.push_str("\\t"),
755 c if c < '\x20' => {
756 out.push_str(&format!("\\u{:04x}", c as u32));
757 }
758 c => out.push(c),
759 }
760 }
761 out
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767 use crate::event::{OpType, ResourceType, RewritePhase};
768
769 #[test]
770 fn json_lease_acquired() {
771 let event = FabricEvent::LeaseAcquired {
772 resource_type: ResourceType::Mem,
773 lease_id: 42,
774 node: "10.10.0.11".into(),
775 bytes: 8192,
776 trace_id: None,
777 };
778 let json = JsonEventSink::format_event(&event);
779 assert!(json.contains(r#""event_type":"lease_acquired""#));
780 assert!(json.contains(r#""resource_type":"mem""#));
781 assert!(json.contains(r#""lease_id":42"#));
782 assert!(json.contains(r#""node":"10.10.0.11""#));
783 assert!(json.contains(r#""bytes":8192"#));
784 }
785
786 #[test]
787 fn json_lease_dropped() {
788 let event = FabricEvent::LeaseDropped {
789 resource_type: ResourceType::Block,
790 lease_id: 7,
791 node: "node-a".into(),
792 };
793 let json = JsonEventSink::format_event(&event);
794 assert!(json.contains(r#""event_type":"lease_dropped""#));
795 assert!(json.contains(r#""resource_type":"block""#));
796 }
797
798 #[test]
799 fn json_lease_expired() {
800 let event = FabricEvent::LeaseExpired {
801 resource_type: ResourceType::Gpu,
802 lease_id: 99,
803 node: "gpu-node".into(),
804 };
805 let json = JsonEventSink::format_event(&event);
806 assert!(json.contains(r#""event_type":"lease_expired""#));
807 assert!(json.contains(r#""resource_type":"gpu""#));
808 }
809
810 #[test]
811 fn json_op_completed() {
812 let event = FabricEvent::OpCompleted {
813 op_type: OpType::Write,
814 duration_us: 1234,
815 bytes: 512,
816 };
817 let json = JsonEventSink::format_event(&event);
818 assert!(json.contains(r#""event_type":"op_completed""#));
819 assert!(json.contains(r#""op_type":"write""#));
820 assert!(json.contains(r#""duration_us":1234"#));
821 assert!(json.contains(r#""bytes":512"#));
822 }
823
824 #[test]
825 fn json_op_failed() {
826 let event = FabricEvent::OpFailed {
827 op_type: OpType::Read,
828 error: "connection refused".into(),
829 };
830 let json = JsonEventSink::format_event(&event);
831 assert!(json.contains(r#""event_type":"op_failed""#));
832 assert!(json.contains(r#""error":"connection refused""#));
833 }
834
835 #[test]
836 fn json_rewrite_started() {
837 let event = FabricEvent::RewriteStarted { plan_id: 100 };
838 let json = JsonEventSink::format_event(&event);
839 assert!(json.contains(r#""event_type":"rewrite_started""#));
840 assert!(json.contains(r#""plan_id":100"#));
841 }
842
843 #[test]
844 fn json_rewrite_completed() {
845 let event = FabricEvent::RewriteCompleted {
846 plan_id: 100,
847 phase: RewritePhase::Cutover,
848 };
849 let json = JsonEventSink::format_event(&event);
850 assert!(json.contains(r#""event_type":"rewrite_completed""#));
851 assert!(json.contains(r#""phase":"cutover""#));
852 }
853
854 #[test]
855 fn json_escape_special_chars() {
856 let event = FabricEvent::OpFailed {
857 op_type: OpType::Read,
858 error: "line1\nline2\ttab\"quote\\back".into(),
859 };
860 let json = JsonEventSink::format_event(&event);
861 assert!(json.contains(r#"line1\nline2\ttab\"quote\\back"#));
862 }
863
864 #[test]
865 fn escape_json_control_chars() {
866 let escaped = escape_json("\x00\x01\x1f");
867 assert_eq!(escaped, "\\u0000\\u0001\\u001f");
868 }
869
870 #[test]
876 fn preemption_triggered_emits_typed_reason() {
877 let event = FabricEvent::PreemptionTriggered {
878 victim_lease_id: 42,
879 victim_tenant: "tenant-a".into(),
880 preemptor_tenant: "tenant-b".into(),
881 node: "node-1".into(),
882 reason: grafos_core::PreemptionReason::PriorityPreemption,
883 trace_id: None,
884 };
885 let json = JsonEventSink::format_event(&event);
886 assert!(
887 json.contains(r#""reason":"priority_preemption""#),
888 "expected typed reason field, got: {json}"
889 );
890 assert!(json.contains(r#""event_type":"preemption_triggered""#));
891 assert!(json.contains(r#""victim_lease_id":42"#));
893 assert!(json.contains(r#""victim_tenant":"tenant-a""#));
894 }
895
896 #[test]
899 fn admission_denied_emits_typed_quota_violation() {
900 let event = FabricEvent::AdmissionDenied {
901 tenant_id: "tenant-a".into(),
902 resource_type: "mem".into(),
903 reason: "quota denied".into(),
904 quota_violation: Some(grafos_core::QuotaViolation::MemBytesExceeded {
905 limit: 4096,
906 used: 4096,
907 requested: 512,
908 }),
909 trace_id: None,
910 };
911 let json = JsonEventSink::format_event(&event);
912 assert!(json.contains(r#""event_type":"admission_denied""#));
913 assert!(json.contains(r#""quota_violation":"mem_bytes_exceeded""#));
914 }
915
916 #[test]
919 fn preemption_reason_each_variant_renders_stably() {
920 for (reason, expected) in [
921 (
922 grafos_core::PreemptionReason::PriorityPreemption,
923 "priority_preemption",
924 ),
925 (
926 grafos_core::PreemptionReason::QuotaRebalance,
927 "quota_rebalance",
928 ),
929 (
930 grafos_core::PreemptionReason::BurstCreditExhausted,
931 "burst_credit_exhausted",
932 ),
933 (
934 grafos_core::PreemptionReason::BudgetExhausted,
935 "budget_exhausted",
936 ),
937 (
938 grafos_core::PreemptionReason::CostCapEviction,
939 "cost_cap_eviction",
940 ),
941 (
942 grafos_core::PreemptionReason::OperatorDrain,
943 "operator_drain",
944 ),
945 (
946 grafos_core::PreemptionReason::OperatorMigProfileChange,
947 "operator_mig_profile_change",
948 ),
949 (
950 grafos_core::PreemptionReason::MaintenanceWindow,
951 "maintenance_window",
952 ),
953 (
954 grafos_core::PreemptionReason::PolicyViolationRecovery,
955 "policy_violation_recovery",
956 ),
957 ] {
958 let event = FabricEvent::PreemptionTriggered {
959 victim_lease_id: 1,
960 victim_tenant: "v".into(),
961 preemptor_tenant: "p".into(),
962 node: "n".into(),
963 reason,
964 trace_id: None,
965 };
966 let json = JsonEventSink::format_event(&event);
967 let needle = format!(r#""reason":"{expected}""#);
968 assert!(
969 json.contains(&needle),
970 "reason {reason:?} missing expected wire form {needle:?} — got: {json}"
971 );
972 }
973 }
974
975 #[test]
982 fn cross_state_disagreement_resolved_emits_typed_kind_and_protocol() {
983 let event = FabricEvent::CrossStateDisagreementResolved {
984 kind: "lease_free_not_found",
985 protocol: "idempotent_success",
986 lease_id: 0xCAFE,
987 authority: "0xdead00".into(),
988 trace_id: None,
989 };
990 let json = JsonEventSink::format_event(&event);
991 assert!(
992 json.contains(r#""event_type":"cross_state_disagreement_resolved""#),
993 "missing typed event_type; got: {json}"
994 );
995 assert!(
996 json.contains(r#""kind":"lease_free_not_found""#),
997 "missing typed kind; got: {json}"
998 );
999 assert!(
1000 json.contains(r#""protocol":"idempotent_success""#),
1001 "missing typed protocol; got: {json}"
1002 );
1003 assert!(
1004 json.contains(r#""lease_id":51966"#),
1005 "missing lease_id (decoded 0xCAFE); got: {json}"
1006 );
1007 }
1008
1009 #[test]
1014 fn cross_state_disagreement_resolved_combinations_are_byte_distinguishable() {
1015 let a = JsonEventSink::format_event(&FabricEvent::CrossStateDisagreementResolved {
1016 kind: "lease_free_not_found",
1017 protocol: "idempotent_success",
1018 lease_id: 1,
1019 authority: "x".into(),
1020 trace_id: None,
1021 });
1022 let b = JsonEventSink::format_event(&FabricEvent::CrossStateDisagreementResolved {
1023 kind: "lease_free_not_active",
1024 protocol: "idempotent_success",
1025 lease_id: 1,
1026 authority: "x".into(),
1027 trace_id: None,
1028 });
1029 assert_ne!(a, b, "different `kind` values must produce distinct JSON");
1030 }
1031}