mirror of
https://github.com/openai/codex.git
synced 2026-05-05 05:42:33 +03:00
Compare commits
3 Commits
owen/perfo
...
dev/rasmus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
99bcadd0b1 | ||
|
|
217256c348 | ||
|
|
961560b27d |
17
codex-rs/Cargo.lock
generated
17
codex-rs/Cargo.lock
generated
@@ -2698,6 +2698,23 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-feedback-log-sink"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"codex-state",
|
||||
"pretty_assertions",
|
||||
"prost 0.14.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic",
|
||||
"tonic-prost",
|
||||
"tonic-prost-build",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-file-search"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -14,6 +14,7 @@ members = [
|
||||
"apply-patch",
|
||||
"arg0",
|
||||
"feedback",
|
||||
"feedback-log-sink",
|
||||
"features",
|
||||
"install-context",
|
||||
"codex-backend-openapi-models",
|
||||
|
||||
6
codex-rs/feedback-log-sink/BUILD.bazel
Normal file
6
codex-rs/feedback-log-sink/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "feedback-log-sink",
|
||||
crate_name = "codex_feedback_log_sink",
|
||||
)
|
||||
33
codex-rs/feedback-log-sink/Cargo.toml
Normal file
33
codex-rs/feedback-log-sink/Cargo.toml
Normal file
@@ -0,0 +1,33 @@
|
||||
[package]
|
||||
name = "codex-feedback-log-sink"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "codex_feedback_log_sink"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[[example]]
|
||||
name = "generate-proto"
|
||||
path = "examples/generate-proto.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
codex-state = { workspace = true }
|
||||
prost = "0.14.3"
|
||||
tokio = { workspace = true, features = ["rt", "sync", "time"] }
|
||||
tonic = { workspace = true }
|
||||
tonic-prost = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
tokio-stream = { workspace = true, features = ["net"] }
|
||||
tonic = { workspace = true, features = ["router", "transport"] }
|
||||
tonic-prost-build = { version = "=0.14.3", default-features = false, features = ["transport"] }
|
||||
19
codex-rs/feedback-log-sink/examples/generate-proto.rs
Normal file
19
codex-rs/feedback-log-sink/examples/generate-proto.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(proto_dir_arg) = std::env::args().nth(1) else {
|
||||
eprintln!("Usage: generate-proto <proto-dir>");
|
||||
std::process::exit(1);
|
||||
};
|
||||
|
||||
let proto_dir = PathBuf::from(proto_dir_arg);
|
||||
let proto_file = proto_dir.join("codex.feedback_log_sink.v1.proto");
|
||||
|
||||
tonic_prost_build::configure()
|
||||
.build_client(true)
|
||||
.build_server(true)
|
||||
.out_dir(&proto_dir)
|
||||
.compile_protos(&[proto_file], &[proto_dir])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
38
codex-rs/feedback-log-sink/scripts/generate-proto.sh
Executable file
38
codex-rs/feedback-log-sink/scripts/generate-proto.sh
Executable file
@@ -0,0 +1,38 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
repo_root="$(cd "$script_dir/../../.." && pwd)"
|
||||
proto_dir="$repo_root/codex-rs/feedback-log-sink/src/proto"
|
||||
generated="$proto_dir/codex.feedback_log_sink.v1.rs"
|
||||
tmpdir="$(mktemp -d)"
|
||||
|
||||
cleanup() {
|
||||
rm -rf "$tmpdir"
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
(
|
||||
cd "$repo_root/codex-rs"
|
||||
CARGO_TARGET_DIR="$tmpdir/target" cargo run \
|
||||
-p codex-feedback-log-sink \
|
||||
--example generate-proto \
|
||||
-- "$proto_dir"
|
||||
)
|
||||
|
||||
if ! sed -n '2p' "$generated" | grep -q 'clippy::trivially_copy_pass_by_ref'; then
|
||||
{
|
||||
sed -n '1p' "$generated"
|
||||
printf '#![allow(clippy::trivially_copy_pass_by_ref)]\n'
|
||||
sed '1d' "$generated"
|
||||
} > "$tmpdir/generated.rs"
|
||||
mv "$tmpdir/generated.rs" "$generated"
|
||||
fi
|
||||
|
||||
rustfmt --edition 2024 "$generated"
|
||||
|
||||
awk '
|
||||
NR == 3 && previous ~ /clippy::trivially_copy_pass_by_ref/ && $0 != "" { print "" }
|
||||
{ print; previous = $0 }
|
||||
' "$generated" > "$tmpdir/formatted.rs"
|
||||
mv "$tmpdir/formatted.rs" "$generated"
|
||||
664
codex-rs/feedback-log-sink/src/lib.rs
Normal file
664
codex-rs/feedback-log-sink/src/lib.rs
Normal file
@@ -0,0 +1,664 @@
|
||||
//! Remote feedback log sink protocol helpers.
|
||||
|
||||
use std::future::Future;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use codex_state::LogEntry;
|
||||
use codex_state::log_db::LogSinkQueueConfig;
|
||||
use codex_state::log_db::LogWriter;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tonic::transport::Channel;
|
||||
use tonic::transport::Endpoint;
|
||||
use tracing::Event;
|
||||
use tracing::field::Field;
|
||||
use tracing::field::Visit;
|
||||
use tracing::span::Attributes;
|
||||
use tracing::span::Id;
|
||||
use tracing::span::Record;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::field::RecordFields;
|
||||
use tracing_subscriber::fmt::FormatFields;
|
||||
use tracing_subscriber::fmt::FormattedFields;
|
||||
use tracing_subscriber::fmt::format::DefaultFields;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[path = "proto/codex.feedback_log_sink.v1.rs"]
|
||||
pub mod proto;
|
||||
|
||||
use proto::feedback_log_sink_client::FeedbackLogSinkClient;
|
||||
|
||||
const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct GrpcFeedbackLogSinkConfig {
|
||||
pub endpoint: String,
|
||||
pub queue: LogSinkQueueConfig,
|
||||
pub rpc_timeout: Duration,
|
||||
pub source_process_uuid: Option<String>,
|
||||
}
|
||||
|
||||
impl GrpcFeedbackLogSinkConfig {
|
||||
pub fn new(endpoint: impl Into<String>) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
queue: LogSinkQueueConfig::default(),
|
||||
rpc_timeout: DEFAULT_RPC_TIMEOUT,
|
||||
source_process_uuid: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn normalized(self) -> Self {
|
||||
Self {
|
||||
endpoint: self.endpoint,
|
||||
queue: self.queue,
|
||||
rpc_timeout: if self.rpc_timeout.is_zero() {
|
||||
DEFAULT_RPC_TIMEOUT
|
||||
} else {
|
||||
self.rpc_timeout
|
||||
},
|
||||
source_process_uuid: self.source_process_uuid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GrpcFeedbackLogSinkLayer {
|
||||
sender: mpsc::Sender<RemoteLogCommand>,
|
||||
process_uuid: String,
|
||||
}
|
||||
|
||||
impl GrpcFeedbackLogSinkLayer {
|
||||
pub fn start(config: GrpcFeedbackLogSinkConfig) -> Result<Self, tonic::transport::Error> {
|
||||
let config = config.normalized();
|
||||
let endpoint = Endpoint::from_shared(config.endpoint)?
|
||||
.connect_timeout(config.rpc_timeout)
|
||||
.timeout(config.rpc_timeout);
|
||||
let client = FeedbackLogSinkClient::new(endpoint.connect_lazy());
|
||||
let (sender, receiver) = mpsc::channel(config.queue.queue_capacity.max(1));
|
||||
let process_uuid = config
|
||||
.source_process_uuid
|
||||
.unwrap_or_else(|| current_process_log_uuid().to_string());
|
||||
tokio::spawn(run_grpc_sink(
|
||||
client,
|
||||
receiver,
|
||||
config.queue,
|
||||
config.rpc_timeout,
|
||||
process_uuid.clone(),
|
||||
));
|
||||
Ok(Self {
|
||||
sender,
|
||||
process_uuid,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn flush(&self) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
if self.sender.send(RemoteLogCommand::Flush(tx)).await.is_ok() {
|
||||
let _ = rx.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send(&self, entry: LogEntry) {
|
||||
let _ = self
|
||||
.sender
|
||||
.try_send(RemoteLogCommand::Entry(Box::new(entry)));
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for GrpcFeedbackLogSinkLayer {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
sender: self.sender.clone(),
|
||||
process_uuid: self.process_uuid.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for GrpcFeedbackLogSinkLayer
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_new_span(
|
||||
&self,
|
||||
attrs: &Attributes<'_>,
|
||||
id: &Id,
|
||||
ctx: tracing_subscriber::layer::Context<'_, S>,
|
||||
) {
|
||||
let mut visitor = SpanFieldVisitor::default();
|
||||
attrs.record(&mut visitor);
|
||||
|
||||
if let Some(span) = ctx.span(id) {
|
||||
span.extensions_mut().insert(SpanLogContext {
|
||||
name: span.metadata().name().to_string(),
|
||||
formatted_fields: format_fields(attrs),
|
||||
thread_id: visitor.thread_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn on_record(
|
||||
&self,
|
||||
id: &Id,
|
||||
values: &Record<'_>,
|
||||
ctx: tracing_subscriber::layer::Context<'_, S>,
|
||||
) {
|
||||
let mut visitor = SpanFieldVisitor::default();
|
||||
values.record(&mut visitor);
|
||||
|
||||
if let Some(span) = ctx.span(id) {
|
||||
let mut extensions = span.extensions_mut();
|
||||
if let Some(log_context) = extensions.get_mut::<SpanLogContext>() {
|
||||
if let Some(thread_id) = visitor.thread_id {
|
||||
log_context.thread_id = Some(thread_id);
|
||||
}
|
||||
append_fields(&mut log_context.formatted_fields, values);
|
||||
} else {
|
||||
extensions.insert(SpanLogContext {
|
||||
name: span.metadata().name().to_string(),
|
||||
formatted_fields: format_fields(values),
|
||||
thread_id: visitor.thread_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
|
||||
let metadata = event.metadata();
|
||||
let mut visitor = MessageVisitor::default();
|
||||
event.record(&mut visitor);
|
||||
let thread_id = visitor
|
||||
.thread_id
|
||||
.clone()
|
||||
.or_else(|| event_thread_id(event, &ctx));
|
||||
let feedback_log_body = format_feedback_log_body(event, &ctx);
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| Duration::from_secs(0));
|
||||
let entry = LogEntry {
|
||||
ts: now.as_secs() as i64,
|
||||
ts_nanos: now.subsec_nanos() as i64,
|
||||
level: metadata.level().as_str().to_string(),
|
||||
target: metadata.target().to_string(),
|
||||
message: visitor.message,
|
||||
feedback_log_body: Some(feedback_log_body),
|
||||
thread_id,
|
||||
process_uuid: Some(self.process_uuid.clone()),
|
||||
module_path: metadata.module_path().map(ToString::to_string),
|
||||
file: metadata.file().map(ToString::to_string),
|
||||
line: metadata.line().map(|line| line as i64),
|
||||
};
|
||||
|
||||
self.try_send(entry);
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> LogWriter<S> for GrpcFeedbackLogSinkLayer
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn flush(&self) -> impl Future<Output = ()> + Send + '_ {
|
||||
GrpcFeedbackLogSinkLayer::flush(self)
|
||||
}
|
||||
}
|
||||
|
||||
enum RemoteLogCommand {
|
||||
Entry(Box<LogEntry>),
|
||||
Flush(oneshot::Sender<()>),
|
||||
}
|
||||
|
||||
async fn run_grpc_sink(
|
||||
mut client: FeedbackLogSinkClient<Channel>,
|
||||
mut receiver: mpsc::Receiver<RemoteLogCommand>,
|
||||
config: LogSinkQueueConfig,
|
||||
rpc_timeout: Duration,
|
||||
source_process_uuid: String,
|
||||
) {
|
||||
let batch_size = config.batch_size.max(1);
|
||||
let flush_interval = if config.flush_interval.is_zero() {
|
||||
LogSinkQueueConfig::default().flush_interval
|
||||
} else {
|
||||
config.flush_interval
|
||||
};
|
||||
let mut buffer = Vec::with_capacity(batch_size);
|
||||
let mut ticker = tokio::time::interval(flush_interval);
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe_command = receiver.recv() => {
|
||||
match maybe_command {
|
||||
Some(RemoteLogCommand::Entry(entry)) => {
|
||||
buffer.push(*entry);
|
||||
if buffer.len() >= batch_size {
|
||||
flush_remote(&mut client, &mut buffer, &source_process_uuid, rpc_timeout).await;
|
||||
}
|
||||
}
|
||||
Some(RemoteLogCommand::Flush(reply)) => {
|
||||
flush_remote(&mut client, &mut buffer, &source_process_uuid, rpc_timeout).await;
|
||||
let _ = reply.send(());
|
||||
}
|
||||
None => {
|
||||
flush_remote(&mut client, &mut buffer, &source_process_uuid, rpc_timeout).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
flush_remote(&mut client, &mut buffer, &source_process_uuid, rpc_timeout).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn flush_remote(
|
||||
client: &mut FeedbackLogSinkClient<Channel>,
|
||||
buffer: &mut Vec<LogEntry>,
|
||||
source_process_uuid: &str,
|
||||
rpc_timeout: Duration,
|
||||
) {
|
||||
if buffer.is_empty() {
|
||||
return;
|
||||
}
|
||||
let entries = buffer.split_off(0);
|
||||
let request = append_log_batch_request(entries, source_process_uuid.to_string());
|
||||
let _ = tokio::time::timeout(rpc_timeout, client.append_log_batch(request)).await;
|
||||
}
|
||||
|
||||
impl From<LogEntry> for proto::FeedbackLogEntry {
|
||||
fn from(entry: LogEntry) -> Self {
|
||||
Self {
|
||||
ts: entry.ts,
|
||||
ts_nanos: entry.ts_nanos,
|
||||
level: entry.level,
|
||||
target: entry.target,
|
||||
message: entry.message,
|
||||
feedback_log_body: entry.feedback_log_body,
|
||||
thread_id: entry.thread_id,
|
||||
process_uuid: entry.process_uuid,
|
||||
module_path: entry.module_path,
|
||||
file: entry.file,
|
||||
line: entry.line,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn append_log_batch_request(
|
||||
entries: Vec<LogEntry>,
|
||||
source_process_uuid: impl Into<String>,
|
||||
) -> proto::AppendLogBatchRequest {
|
||||
proto::AppendLogBatchRequest {
|
||||
entries: entries.into_iter().map(Into::into).collect(),
|
||||
source_process_uuid: source_process_uuid.into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn current_process_log_uuid() -> &'static str {
|
||||
static PROCESS_LOG_UUID: OnceLock<String> = OnceLock::new();
|
||||
PROCESS_LOG_UUID.get_or_init(|| {
|
||||
let pid = std::process::id();
|
||||
let process_uuid = Uuid::new_v4();
|
||||
format!("pid:{pid}:{process_uuid}")
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SpanLogContext {
|
||||
name: String,
|
||||
formatted_fields: String,
|
||||
thread_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SpanFieldVisitor {
|
||||
thread_id: Option<String>,
|
||||
}
|
||||
|
||||
impl SpanFieldVisitor {
|
||||
fn record_field(&mut self, field: &Field, value: String) {
|
||||
if field.name() == "thread_id" && self.thread_id.is_none() {
|
||||
self.thread_id = Some(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for SpanFieldVisitor {
|
||||
fn record_i64(&mut self, field: &Field, value: i64) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &Field, value: u64) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &Field, value: bool) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_f64(&mut self, field: &Field, value: f64) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &Field, value: &str) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
|
||||
self.record_field(field, format!("{value:?}"));
|
||||
}
|
||||
}
|
||||
|
||||
fn event_thread_id<S>(
|
||||
event: &Event<'_>,
|
||||
ctx: &tracing_subscriber::layer::Context<'_, S>,
|
||||
) -> Option<String>
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
let mut thread_id = None;
|
||||
if let Some(scope) = ctx.event_scope(event) {
|
||||
for span in scope.from_root() {
|
||||
let extensions = span.extensions();
|
||||
if let Some(log_context) = extensions.get::<SpanLogContext>()
|
||||
&& log_context.thread_id.is_some()
|
||||
{
|
||||
thread_id = log_context.thread_id.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
thread_id
|
||||
}
|
||||
|
||||
fn format_feedback_log_body<S>(
|
||||
event: &Event<'_>,
|
||||
ctx: &tracing_subscriber::layer::Context<'_, S>,
|
||||
) -> String
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
let mut feedback_log_body = String::new();
|
||||
if let Some(scope) = ctx.event_scope(event) {
|
||||
for span in scope.from_root() {
|
||||
let extensions = span.extensions();
|
||||
if let Some(log_context) = extensions.get::<SpanLogContext>() {
|
||||
feedback_log_body.push_str(&log_context.name);
|
||||
if !log_context.formatted_fields.is_empty() {
|
||||
feedback_log_body.push('{');
|
||||
feedback_log_body.push_str(&log_context.formatted_fields);
|
||||
feedback_log_body.push('}');
|
||||
}
|
||||
} else {
|
||||
feedback_log_body.push_str(span.metadata().name());
|
||||
}
|
||||
feedback_log_body.push(':');
|
||||
}
|
||||
if !feedback_log_body.is_empty() {
|
||||
feedback_log_body.push(' ');
|
||||
}
|
||||
}
|
||||
feedback_log_body.push_str(&format_fields(event));
|
||||
feedback_log_body
|
||||
}
|
||||
|
||||
fn format_fields<R>(fields: R) -> String
|
||||
where
|
||||
R: RecordFields,
|
||||
{
|
||||
let formatter = DefaultFields::default();
|
||||
let mut formatted = FormattedFields::<DefaultFields>::new(String::new());
|
||||
let _ = formatter.format_fields(formatted.as_writer(), fields);
|
||||
formatted.fields
|
||||
}
|
||||
|
||||
fn append_fields(fields: &mut String, values: &Record<'_>) {
|
||||
let formatter = DefaultFields::default();
|
||||
let mut formatted = FormattedFields::<DefaultFields>::new(std::mem::take(fields));
|
||||
let _ = formatter.add_fields(&mut formatted, values);
|
||||
*fields = formatted.fields;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct MessageVisitor {
|
||||
message: Option<String>,
|
||||
thread_id: Option<String>,
|
||||
}
|
||||
|
||||
impl MessageVisitor {
|
||||
fn record_field(&mut self, field: &Field, value: String) {
|
||||
if field.name() == "message" && self.message.is_none() {
|
||||
self.message = Some(value.clone());
|
||||
}
|
||||
if field.name() == "thread_id" && self.thread_id.is_none() {
|
||||
self.thread_id = Some(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Visit for MessageVisitor {
|
||||
fn record_i64(&mut self, field: &Field, value: i64) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_u64(&mut self, field: &Field, value: u64) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_bool(&mut self, field: &Field, value: bool) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_f64(&mut self, field: &Field, value: f64) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &Field, value: &str) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
|
||||
self.record_field(field, value.to_string());
|
||||
}
|
||||
|
||||
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
|
||||
self.record_field(field, format!("{value:?}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_state::LogEntry;
|
||||
use codex_state::log_db::LogSinkQueueConfig;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
use tonic::Response;
|
||||
use tonic::Status;
|
||||
use tonic::transport::Server;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::filter::Targets;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
|
||||
use super::GrpcFeedbackLogSinkConfig;
|
||||
use super::GrpcFeedbackLogSinkLayer;
|
||||
use super::append_log_batch_request;
|
||||
use super::proto;
|
||||
use super::proto::feedback_log_sink_server::FeedbackLogSink;
|
||||
use super::proto::feedback_log_sink_server::FeedbackLogSinkServer;
|
||||
|
||||
#[test]
|
||||
fn log_entry_to_proto_preserves_all_fields() {
|
||||
let entry = populated_log_entry();
|
||||
|
||||
let actual = proto::FeedbackLogEntry::from(entry);
|
||||
|
||||
assert_eq!(actual, populated_feedback_log_entry());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn log_entry_to_proto_preserves_absent_optional_fields() {
|
||||
let entry = LogEntry {
|
||||
ts: 1700000000,
|
||||
ts_nanos: 42,
|
||||
level: "WARN".to_string(),
|
||||
target: "codex::target".to_string(),
|
||||
message: None,
|
||||
feedback_log_body: None,
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
module_path: None,
|
||||
file: None,
|
||||
line: None,
|
||||
};
|
||||
|
||||
let actual = proto::FeedbackLogEntry::from(entry);
|
||||
|
||||
assert_eq!(
|
||||
actual,
|
||||
proto::FeedbackLogEntry {
|
||||
ts: 1700000000,
|
||||
ts_nanos: 42,
|
||||
level: "WARN".to_string(),
|
||||
target: "codex::target".to_string(),
|
||||
message: None,
|
||||
feedback_log_body: None,
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
module_path: None,
|
||||
file: None,
|
||||
line: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn append_log_batch_request_sets_source_process_uuid_and_entries() {
|
||||
let actual = append_log_batch_request(vec![populated_log_entry()], "source-process");
|
||||
|
||||
assert_eq!(
|
||||
actual,
|
||||
proto::AppendLogBatchRequest {
|
||||
entries: vec![populated_feedback_log_entry()],
|
||||
source_process_uuid: "source-process".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn grpc_layer_sends_tracing_events_on_flush() {
|
||||
let requests = Arc::new(Mutex::new(Vec::new()));
|
||||
let service = RecordingFeedbackLogSink {
|
||||
requests: Arc::clone(&requests),
|
||||
};
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind fake sink server");
|
||||
let endpoint = format!("http://{}", listener.local_addr().expect("local addr"));
|
||||
let server = tokio::spawn(
|
||||
Server::builder()
|
||||
.add_service(FeedbackLogSinkServer::new(service))
|
||||
.serve_with_incoming(TcpListenerStream::new(listener)),
|
||||
);
|
||||
let layer = GrpcFeedbackLogSinkLayer::start(GrpcFeedbackLogSinkConfig {
|
||||
endpoint,
|
||||
queue: LogSinkQueueConfig {
|
||||
queue_capacity: 8,
|
||||
batch_size: 8,
|
||||
flush_interval: Duration::from_secs(60),
|
||||
},
|
||||
rpc_timeout: Duration::from_secs(2),
|
||||
source_process_uuid: Some("source-process".to_string()),
|
||||
})
|
||||
.expect("start grpc feedback log sink");
|
||||
|
||||
let subscriber = tracing_subscriber::registry().with(
|
||||
layer
|
||||
.clone()
|
||||
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
|
||||
);
|
||||
let dispatch = tracing::Dispatch::new(subscriber);
|
||||
tracing::dispatcher::with_default(&dispatch, || {
|
||||
tracing::info_span!("remote-feedback-thread", thread_id = "thread-1", turn = 7)
|
||||
.in_scope(|| {
|
||||
tracing::info!(foo = 2, "remote-log");
|
||||
});
|
||||
});
|
||||
layer.flush().await;
|
||||
|
||||
let requests = requests.lock().expect("requests mutex poisoned");
|
||||
assert_eq!(requests.len(), 1);
|
||||
let request = &requests[0];
|
||||
assert_eq!(request.source_process_uuid, "source-process");
|
||||
assert_eq!(request.entries.len(), 1);
|
||||
let entry = &request.entries[0];
|
||||
assert_eq!(entry.level, "INFO");
|
||||
assert_eq!(entry.message.as_deref(), Some("remote-log"));
|
||||
assert_eq!(
|
||||
entry.feedback_log_body.as_deref(),
|
||||
Some("remote-feedback-thread{thread_id=\"thread-1\" turn=7}: remote-log foo=2")
|
||||
);
|
||||
assert_eq!(entry.thread_id.as_deref(), Some("thread-1"));
|
||||
assert_eq!(entry.process_uuid.as_deref(), Some("source-process"));
|
||||
|
||||
server.abort();
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RecordingFeedbackLogSink {
|
||||
requests: Arc<Mutex<Vec<proto::AppendLogBatchRequest>>>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl FeedbackLogSink for RecordingFeedbackLogSink {
|
||||
async fn append_log_batch(
|
||||
&self,
|
||||
request: tonic::Request<proto::AppendLogBatchRequest>,
|
||||
) -> Result<Response<proto::AppendLogBatchResponse>, Status> {
|
||||
self.requests
|
||||
.lock()
|
||||
.expect("requests mutex poisoned")
|
||||
.push(request.into_inner());
|
||||
Ok(Response::new(proto::AppendLogBatchResponse {}))
|
||||
}
|
||||
}
|
||||
|
||||
fn populated_log_entry() -> LogEntry {
|
||||
LogEntry {
|
||||
ts: 1700000000,
|
||||
ts_nanos: 123456789,
|
||||
level: "INFO".to_string(),
|
||||
target: "codex::feedback".to_string(),
|
||||
message: Some("captured message".to_string()),
|
||||
feedback_log_body: Some("structured body".to_string()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("process-entry".to_string()),
|
||||
module_path: Some("codex_state::log_db".to_string()),
|
||||
file: Some("state/src/log_db.rs".to_string()),
|
||||
line: Some(123),
|
||||
}
|
||||
}
|
||||
|
||||
fn populated_feedback_log_entry() -> proto::FeedbackLogEntry {
|
||||
proto::FeedbackLogEntry {
|
||||
ts: 1700000000,
|
||||
ts_nanos: 123456789,
|
||||
level: "INFO".to_string(),
|
||||
target: "codex::feedback".to_string(),
|
||||
message: Some("captured message".to_string()),
|
||||
feedback_log_body: Some("structured body".to_string()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("process-entry".to_string()),
|
||||
module_path: Some("codex_state::log_db".to_string()),
|
||||
file: Some("state/src/log_db.rs".to_string()),
|
||||
line: Some(123),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package codex.feedback_log_sink.v1;
|
||||
|
||||
service FeedbackLogSink {
|
||||
rpc AppendLogBatch(AppendLogBatchRequest) returns (AppendLogBatchResponse);
|
||||
}
|
||||
|
||||
message AppendLogBatchRequest {
|
||||
repeated FeedbackLogEntry entries = 1;
|
||||
string source_process_uuid = 2;
|
||||
}
|
||||
|
||||
message AppendLogBatchResponse {}
|
||||
|
||||
message FeedbackLogEntry {
|
||||
int64 ts = 1;
|
||||
int64 ts_nanos = 2;
|
||||
string level = 3;
|
||||
string target = 4;
|
||||
optional string message = 5;
|
||||
optional string feedback_log_body = 6;
|
||||
optional string thread_id = 7;
|
||||
optional string process_uuid = 8;
|
||||
optional string module_path = 9;
|
||||
optional string file = 10;
|
||||
optional int64 line = 11;
|
||||
}
|
||||
@@ -0,0 +1,315 @@
|
||||
// This file is @generated by prost-build.
|
||||
#![allow(clippy::trivially_copy_pass_by_ref)]
|
||||
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct AppendLogBatchRequest {
|
||||
#[prost(message, repeated, tag = "1")]
|
||||
pub entries: ::prost::alloc::vec::Vec<FeedbackLogEntry>,
|
||||
#[prost(string, tag = "2")]
|
||||
pub source_process_uuid: ::prost::alloc::string::String,
|
||||
}
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct AppendLogBatchResponse {}
|
||||
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
|
||||
pub struct FeedbackLogEntry {
|
||||
#[prost(int64, tag = "1")]
|
||||
pub ts: i64,
|
||||
#[prost(int64, tag = "2")]
|
||||
pub ts_nanos: i64,
|
||||
#[prost(string, tag = "3")]
|
||||
pub level: ::prost::alloc::string::String,
|
||||
#[prost(string, tag = "4")]
|
||||
pub target: ::prost::alloc::string::String,
|
||||
#[prost(string, optional, tag = "5")]
|
||||
pub message: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, optional, tag = "6")]
|
||||
pub feedback_log_body: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, optional, tag = "7")]
|
||||
pub thread_id: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, optional, tag = "8")]
|
||||
pub process_uuid: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, optional, tag = "9")]
|
||||
pub module_path: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, optional, tag = "10")]
|
||||
pub file: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(int64, optional, tag = "11")]
|
||||
pub line: ::core::option::Option<i64>,
|
||||
}
|
||||
/// Generated client implementations.
|
||||
pub mod feedback_log_sink_client {
|
||||
#![allow(
|
||||
unused_variables,
|
||||
dead_code,
|
||||
missing_docs,
|
||||
clippy::wildcard_imports,
|
||||
clippy::let_unit_value
|
||||
)]
|
||||
use tonic::codegen::http::Uri;
|
||||
use tonic::codegen::*;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FeedbackLogSinkClient<T> {
|
||||
inner: tonic::client::Grpc<T>,
|
||||
}
|
||||
impl FeedbackLogSinkClient<tonic::transport::Channel> {
|
||||
/// Attempt to create a new client by connecting to a given endpoint.
|
||||
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||
where
|
||||
D: TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<StdError>,
|
||||
{
|
||||
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
|
||||
Ok(Self::new(conn))
|
||||
}
|
||||
}
|
||||
impl<T> FeedbackLogSinkClient<T>
|
||||
where
|
||||
T: tonic::client::GrpcService<tonic::body::Body>,
|
||||
T::Error: Into<StdError>,
|
||||
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
|
||||
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
|
||||
{
|
||||
pub fn new(inner: T) -> Self {
|
||||
let inner = tonic::client::Grpc::new(inner);
|
||||
Self { inner }
|
||||
}
|
||||
pub fn with_origin(inner: T, origin: Uri) -> Self {
|
||||
let inner = tonic::client::Grpc::with_origin(inner, origin);
|
||||
Self { inner }
|
||||
}
|
||||
pub fn with_interceptor<F>(
|
||||
inner: T,
|
||||
interceptor: F,
|
||||
) -> FeedbackLogSinkClient<InterceptedService<T, F>>
|
||||
where
|
||||
F: tonic::service::Interceptor,
|
||||
T::ResponseBody: Default,
|
||||
T: tonic::codegen::Service<
|
||||
http::Request<tonic::body::Body>,
|
||||
Response = http::Response<
|
||||
<T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
|
||||
>,
|
||||
>,
|
||||
<T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
|
||||
Into<StdError> + std::marker::Send + std::marker::Sync,
|
||||
{
|
||||
FeedbackLogSinkClient::new(InterceptedService::new(inner, interceptor))
|
||||
}
|
||||
/// Compress requests with the given encoding.
|
||||
///
|
||||
/// This requires the server to support it otherwise it might respond with an
|
||||
/// error.
|
||||
#[must_use]
|
||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.inner = self.inner.send_compressed(encoding);
|
||||
self
|
||||
}
|
||||
/// Enable decompressing responses.
|
||||
#[must_use]
|
||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.inner = self.inner.accept_compressed(encoding);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of a decoded message.
|
||||
///
|
||||
/// Default: `4MB`
|
||||
#[must_use]
|
||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.inner = self.inner.max_decoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of an encoded message.
|
||||
///
|
||||
/// Default: `usize::MAX`
|
||||
#[must_use]
|
||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.inner = self.inner.max_encoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
pub async fn append_log_batch(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::AppendLogBatchRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::AppendLogBatchResponse>, tonic::Status>
|
||||
{
|
||||
self.inner.ready().await.map_err(|e| {
|
||||
tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
|
||||
})?;
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/codex.feedback_log_sink.v1.FeedbackLogSink/AppendLogBatch",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new(
|
||||
"codex.feedback_log_sink.v1.FeedbackLogSink",
|
||||
"AppendLogBatch",
|
||||
));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated server implementations.
|
||||
pub mod feedback_log_sink_server {
|
||||
#![allow(
|
||||
unused_variables,
|
||||
dead_code,
|
||||
missing_docs,
|
||||
clippy::wildcard_imports,
|
||||
clippy::let_unit_value
|
||||
)]
|
||||
use tonic::codegen::*;
|
||||
/// Generated trait containing gRPC methods that should be implemented for use with FeedbackLogSinkServer.
|
||||
#[async_trait]
|
||||
pub trait FeedbackLogSink: std::marker::Send + std::marker::Sync + 'static {
|
||||
async fn append_log_batch(
|
||||
&self,
|
||||
request: tonic::Request<super::AppendLogBatchRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::AppendLogBatchResponse>, tonic::Status>;
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct FeedbackLogSinkServer<T> {
|
||||
inner: Arc<T>,
|
||||
accept_compression_encodings: EnabledCompressionEncodings,
|
||||
send_compression_encodings: EnabledCompressionEncodings,
|
||||
max_decoding_message_size: Option<usize>,
|
||||
max_encoding_message_size: Option<usize>,
|
||||
}
|
||||
impl<T> FeedbackLogSinkServer<T> {
|
||||
pub fn new(inner: T) -> Self {
|
||||
Self::from_arc(Arc::new(inner))
|
||||
}
|
||||
pub fn from_arc(inner: Arc<T>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
accept_compression_encodings: Default::default(),
|
||||
send_compression_encodings: Default::default(),
|
||||
max_decoding_message_size: None,
|
||||
max_encoding_message_size: None,
|
||||
}
|
||||
}
|
||||
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
|
||||
where
|
||||
F: tonic::service::Interceptor,
|
||||
{
|
||||
InterceptedService::new(Self::new(inner), interceptor)
|
||||
}
|
||||
/// Enable decompressing requests with the given encoding.
|
||||
#[must_use]
|
||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.accept_compression_encodings.enable(encoding);
|
||||
self
|
||||
}
|
||||
/// Compress responses with the given encoding, if the client supports it.
|
||||
#[must_use]
|
||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.send_compression_encodings.enable(encoding);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of a decoded message.
|
||||
///
|
||||
/// Default: `4MB`
|
||||
#[must_use]
|
||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.max_decoding_message_size = Some(limit);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of an encoded message.
|
||||
///
|
||||
/// Default: `usize::MAX`
|
||||
#[must_use]
|
||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.max_encoding_message_size = Some(limit);
|
||||
self
|
||||
}
|
||||
}
|
||||
impl<T, B> tonic::codegen::Service<http::Request<B>> for FeedbackLogSinkServer<T>
|
||||
where
|
||||
T: FeedbackLogSink,
|
||||
B: Body + std::marker::Send + 'static,
|
||||
B::Error: Into<StdError> + std::marker::Send + 'static,
|
||||
{
|
||||
type Response = http::Response<tonic::body::Body>;
|
||||
type Error = std::convert::Infallible;
|
||||
type Future = BoxFuture<Self::Response, Self::Error>;
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<std::result::Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
fn call(&mut self, req: http::Request<B>) -> Self::Future {
|
||||
match req.uri().path() {
|
||||
"/codex.feedback_log_sink.v1.FeedbackLogSink/AppendLogBatch" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct AppendLogBatchSvc<T: FeedbackLogSink>(pub Arc<T>);
|
||||
impl<T: FeedbackLogSink>
|
||||
tonic::server::UnaryService<super::AppendLogBatchRequest>
|
||||
for AppendLogBatchSvc<T>
|
||||
{
|
||||
type Response = super::AppendLogBatchResponse;
|
||||
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::AppendLogBatchRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as FeedbackLogSink>::append_log_batch(&inner, request).await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let method = AppendLogBatchSvc(inner);
|
||||
let codec = tonic_prost::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
_ => Box::pin(async move {
|
||||
let mut response = http::Response::new(tonic::body::Body::default());
|
||||
let headers = response.headers_mut();
|
||||
headers.insert(
|
||||
tonic::Status::GRPC_STATUS,
|
||||
(tonic::Code::Unimplemented as i32).into(),
|
||||
);
|
||||
headers.insert(
|
||||
http::header::CONTENT_TYPE,
|
||||
tonic::metadata::GRPC_CONTENT_TYPE,
|
||||
);
|
||||
Ok(response)
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T> Clone for FeedbackLogSinkServer<T> {
|
||||
fn clone(&self) -> Self {
|
||||
let inner = self.inner.clone();
|
||||
Self {
|
||||
inner,
|
||||
accept_compression_encodings: self.accept_compression_encodings,
|
||||
send_compression_encodings: self.send_compression_encodings,
|
||||
max_decoding_message_size: self.max_decoding_message_size,
|
||||
max_encoding_message_size: self.max_encoding_message_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated gRPC service name
|
||||
pub const SERVICE_NAME: &str = "codex.feedback_log_sink.v1.FeedbackLogSink";
|
||||
impl<T> tonic::server::NamedService for FeedbackLogSinkServer<T> {
|
||||
const NAME: &'static str = SERVICE_NAME;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user