@ -100,7 +100,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
if ! cfg . CopyPrivateID . IsZero ( ) {
if ! cfg . CopyPrivateID . IsZero ( ) {
urlSuffix = "?copyId=" + cfg . CopyPrivateID . String ( )
urlSuffix = "?copyId=" + cfg . CopyPrivateID . String ( )
}
}
l := & Logger {
l ogger := & Logger {
privateID : cfg . PrivateID ,
privateID : cfg . PrivateID ,
stderr : cfg . Stderr ,
stderr : cfg . Stderr ,
stderrLevel : int64 ( cfg . StderrLevel ) ,
stderrLevel : int64 ( cfg . StderrLevel ) ,
@ -124,19 +124,19 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
}
}
if cfg . Bus != nil {
if cfg . Bus != nil {
l . eventClient = cfg . Bus . Client ( "logtail.Logger" )
l ogger . eventClient = cfg . Bus . Client ( "logtail.Logger" )
// Subscribe to change deltas from NetMon to detect when the network comes up.
// Subscribe to change deltas from NetMon to detect when the network comes up.
eventbus . SubscribeFunc ( l . eventClient , l . onChangeDelta )
eventbus . SubscribeFunc ( l ogger . eventClient , l ogger . onChangeDelta )
}
}
l . SetSockstatsLabel ( sockstats . LabelLogtailLogger )
l ogger . SetSockstatsLabel ( sockstats . LabelLogtailLogger )
l . compressLogs = cfg . CompressLogs
l ogger . compressLogs = cfg . CompressLogs
ctx , cancel := context . WithCancel ( context . Background ( ) )
ctx , cancel := context . WithCancel ( context . Background ( ) )
l . uploadCancel = cancel
l ogger . uploadCancel = cancel
go l . uploading ( ctx )
go l ogger . uploading ( ctx )
l . Write ( [ ] byte ( "logtail started" ) )
l ogger . Write ( [ ] byte ( "logtail started" ) )
return l
return l ogger
}
}
// Logger writes logs, splitting them as configured between local
// Logger writes logs, splitting them as configured between local
@ -190,27 +190,27 @@ func (p *atomicSocktatsLabel) Store(label sockstats.Label) { p.p.Store(uint32(la
// SetVerbosityLevel controls the verbosity level that should be
// SetVerbosityLevel controls the verbosity level that should be
// written to stderr. 0 is the default (not verbose). Levels 1 or higher
// written to stderr. 0 is the default (not verbose). Levels 1 or higher
// are increasingly verbose.
// are increasingly verbose.
func ( l * Logger ) SetVerbosityLevel ( level int ) {
func ( l g * Logger ) SetVerbosityLevel ( level int ) {
atomic . StoreInt64 ( & l . stderrLevel , int64 ( level ) )
atomic . StoreInt64 ( & l g . stderrLevel , int64 ( level ) )
}
}
// SetNetMon sets the network monitor.
// SetNetMon sets the network monitor.
//
//
// It should not be changed concurrently with log writes and should
// It should not be changed concurrently with log writes and should
// only be set once.
// only be set once.
func ( l * Logger ) SetNetMon ( lm * netmon . Monitor ) {
func ( l g * Logger ) SetNetMon ( lm * netmon . Monitor ) {
l . netMonitor = lm
l g . netMonitor = lm
}
}
// SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
// SetSockstatsLabel sets the label used in sockstat logs to identify network traffic from this logger.
func ( l * Logger ) SetSockstatsLabel ( label sockstats . Label ) {
func ( l g * Logger ) SetSockstatsLabel ( label sockstats . Label ) {
l . sockstatsLabel . Store ( label )
l g . sockstatsLabel . Store ( label )
}
}
// PrivateID returns the logger's private log ID.
// PrivateID returns the logger's private log ID.
//
//
// It exists for internal use only.
// It exists for internal use only.
func ( l * Logger ) PrivateID ( ) logid . PrivateID { return l . privateID }
func ( l g * Logger ) PrivateID ( ) logid . PrivateID { return l g . privateID }
// Shutdown gracefully shuts down the logger while completing any
// Shutdown gracefully shuts down the logger while completing any
// remaining uploads.
// remaining uploads.
@ -218,33 +218,33 @@ func (l *Logger) PrivateID() logid.PrivateID { return l.privateID }
// It will block, continuing to try and upload unless the passed
// It will block, continuing to try and upload unless the passed
// context object interrupts it by being done.
// context object interrupts it by being done.
// If the shutdown is interrupted, an error is returned.
// If the shutdown is interrupted, an error is returned.
func ( l * Logger ) Shutdown ( ctx context . Context ) error {
func ( l g * Logger ) Shutdown ( ctx context . Context ) error {
done := make ( chan struct { } )
done := make ( chan struct { } )
go func ( ) {
go func ( ) {
select {
select {
case <- ctx . Done ( ) :
case <- ctx . Done ( ) :
l . uploadCancel ( )
l g . uploadCancel ( )
<- l . shutdownDone
<- l g . shutdownDone
case <- l . shutdownDone :
case <- l g . shutdownDone :
}
}
close ( done )
close ( done )
l . httpc . CloseIdleConnections ( )
l g . httpc . CloseIdleConnections ( )
} ( )
} ( )
if l . eventClient != nil {
if l g . eventClient != nil {
l . eventClient . Close ( )
l g . eventClient . Close ( )
}
}
l . shutdownStartMu . Lock ( )
l g . shutdownStartMu . Lock ( )
select {
select {
case <- l . shutdownStart :
case <- l g . shutdownStart :
l . shutdownStartMu . Unlock ( )
l g . shutdownStartMu . Unlock ( )
return nil
return nil
default :
default :
}
}
close ( l . shutdownStart )
close ( l g . shutdownStart )
l . shutdownStartMu . Unlock ( )
l g . shutdownStartMu . Unlock ( )
io . WriteString ( l , "logger closing down\n" )
io . WriteString ( l g , "logger closing down\n" )
<- done
<- done
return nil
return nil
@ -254,8 +254,8 @@ func (l *Logger) Shutdown(ctx context.Context) error {
// process, and any associated goroutines.
// process, and any associated goroutines.
//
//
// Deprecated: use Shutdown
// Deprecated: use Shutdown
func ( l * Logger ) Close ( ) {
func ( l g * Logger ) Close ( ) {
l . Shutdown ( context . Background ( ) )
l g . Shutdown ( context . Background ( ) )
}
}
// drainBlock is called by drainPending when there are no logs to drain.
// drainBlock is called by drainPending when there are no logs to drain.
@ -265,11 +265,11 @@ func (l *Logger) Close() {
//
//
// If the caller specified FlushInterface, drainWake is only sent to
// If the caller specified FlushInterface, drainWake is only sent to
// periodically.
// periodically.
func ( l * Logger ) drainBlock ( ) ( shuttingDown bool ) {
func ( l g * Logger ) drainBlock ( ) ( shuttingDown bool ) {
select {
select {
case <- l . shutdownStart :
case <- l g . shutdownStart :
return true
return true
case <- l . drainWake :
case <- l g . drainWake :
}
}
return false
return false
}
}
@ -277,20 +277,20 @@ func (l *Logger) drainBlock() (shuttingDown bool) {
// drainPending drains and encodes a batch of logs from the buffer for upload.
// drainPending drains and encodes a batch of logs from the buffer for upload.
// If no logs are available, drainPending blocks until logs are available.
// If no logs are available, drainPending blocks until logs are available.
// The returned buffer is only valid until the next call to drainPending.
// The returned buffer is only valid until the next call to drainPending.
func ( l * Logger ) drainPending ( ) ( b [ ] byte ) {
func ( l g * Logger ) drainPending ( ) ( b [ ] byte ) {
b = l . drainBuf [ : 0 ]
b = l g . drainBuf [ : 0 ]
b = append ( b , '[' )
b = append ( b , '[' )
defer func ( ) {
defer func ( ) {
b = bytes . TrimRight ( b , "," )
b = bytes . TrimRight ( b , "," )
b = append ( b , ']' )
b = append ( b , ']' )
l . drainBuf = b
l g . drainBuf = b
if len ( b ) <= len ( "[]" ) {
if len ( b ) <= len ( "[]" ) {
b = nil
b = nil
}
}
} ( )
} ( )
maxLen := cmp . Or ( l . maxUploadSize , maxSize )
maxLen := cmp . Or ( l g . maxUploadSize , maxSize )
if l . lowMem {
if l g . lowMem {
// When operating in a low memory environment, it is better to upload
// When operating in a low memory environment, it is better to upload
// in multiple operations than it is to allocate a large body and OOM.
// in multiple operations than it is to allocate a large body and OOM.
// Even if maxLen is less than maxSize, we can still upload an entry
// Even if maxLen is less than maxSize, we can still upload an entry
@ -298,13 +298,13 @@ func (l *Logger) drainPending() (b []byte) {
maxLen /= lowMemRatio
maxLen /= lowMemRatio
}
}
for len ( b ) < maxLen {
for len ( b ) < maxLen {
line , err := l . buffer . TryReadLine ( )
line , err := l g . buffer . TryReadLine ( )
switch {
switch {
case err == io . EOF :
case err == io . EOF :
return b
return b
case err != nil :
case err != nil :
b = append ( b , '{' )
b = append ( b , '{' )
b = l . appendMetadata ( b , false , true , 0 , 0 , "reading ringbuffer: " + err . Error ( ) , nil , 0 )
b = l g . appendMetadata ( b , false , true , 0 , 0 , "reading ringbuffer: " + err . Error ( ) , nil , 0 )
b = bytes . TrimRight ( b , "," )
b = bytes . TrimRight ( b , "," )
b = append ( b , '}' )
b = append ( b , '}' )
return b
return b
@ -318,10 +318,10 @@ func (l *Logger) drainPending() (b []byte) {
// in our buffer from a previous large write, let it go.
// in our buffer from a previous large write, let it go.
if cap ( b ) > bufferSize {
if cap ( b ) > bufferSize {
b = bytes . Clone ( b )
b = bytes . Clone ( b )
l . drainBuf = b
l g . drainBuf = b
}
}
if shuttingDown := l . drainBlock ( ) ; shuttingDown {
if shuttingDown := l g . drainBlock ( ) ; shuttingDown {
return b
return b
}
}
continue
continue
@ -338,18 +338,18 @@ func (l *Logger) drainPending() (b []byte) {
default :
default :
// This is probably a log added to stderr by filch
// This is probably a log added to stderr by filch
// outside of the logtail logger. Encode it.
// outside of the logtail logger. Encode it.
if ! l . explainedRaw {
if ! l g . explainedRaw {
fmt . Fprintf ( l . stderr , "RAW-STDERR: ***\n" )
fmt . Fprintf ( l g . stderr , "RAW-STDERR: ***\n" )
fmt . Fprintf ( l . stderr , "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n" )
fmt . Fprintf ( l g . stderr , "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n" )
fmt . Fprintf ( l . stderr , "RAW-STDERR: ***\n" )
fmt . Fprintf ( l g . stderr , "RAW-STDERR: ***\n" )
fmt . Fprintf ( l . stderr , "RAW-STDERR:\n" )
fmt . Fprintf ( l g . stderr , "RAW-STDERR:\n" )
l . explainedRaw = true
l g . explainedRaw = true
}
}
fmt . Fprintf ( l . stderr , "RAW-STDERR: %s" , b )
fmt . Fprintf ( l g . stderr , "RAW-STDERR: %s" , b )
// Do not add a client time, as it could be really old.
// Do not add a client time, as it could be really old.
// Do not include instance key or ID either,
// Do not include instance key or ID either,
// since this came from a different instance.
// since this came from a different instance.
b = l . appendText ( b , line , true , 0 , 0 , 0 )
b = l g . appendText ( b , line , true , 0 , 0 , 0 )
}
}
b = append ( b , ',' )
b = append ( b , ',' )
}
}
@ -357,14 +357,14 @@ func (l *Logger) drainPending() (b []byte) {
}
}
// This is the goroutine that repeatedly uploads logs in the background.
// This is the goroutine that repeatedly uploads logs in the background.
func ( l * Logger ) uploading ( ctx context . Context ) {
func ( l g * Logger ) uploading ( ctx context . Context ) {
defer close ( l . shutdownDone )
defer close ( l g . shutdownDone )
for {
for {
body := l . drainPending ( )
body := l g . drainPending ( )
origlen := - 1 // sentinel value: uncompressed
origlen := - 1 // sentinel value: uncompressed
// Don't attempt to compress tiny bodies; not worth the CPU cycles.
// Don't attempt to compress tiny bodies; not worth the CPU cycles.
if l . compressLogs && len ( body ) > 256 {
if l g . compressLogs && len ( body ) > 256 {
zbody := zstdframe . AppendEncode ( nil , body ,
zbody := zstdframe . AppendEncode ( nil , body ,
zstdframe . FastestCompression , zstdframe . LowMemory ( true ) )
zstdframe . FastestCompression , zstdframe . LowMemory ( true ) )
@ -381,20 +381,20 @@ func (l *Logger) uploading(ctx context.Context) {
var numFailures int
var numFailures int
var firstFailure time . Time
var firstFailure time . Time
for len ( body ) > 0 && ctx . Err ( ) == nil {
for len ( body ) > 0 && ctx . Err ( ) == nil {
retryAfter , err := l . upload ( ctx , body , origlen )
retryAfter , err := l g . upload ( ctx , body , origlen )
if err != nil {
if err != nil {
numFailures ++
numFailures ++
firstFailure = l . clock . Now ( )
firstFailure = l g . clock . Now ( )
if ! l . internetUp ( ) {
if ! l g . internetUp ( ) {
fmt . Fprintf ( l . stderr , "logtail: internet down; waiting\n" )
fmt . Fprintf ( l g . stderr , "logtail: internet down; waiting\n" )
l . awaitInternetUp ( ctx )
l g . awaitInternetUp ( ctx )
continue
continue
}
}
// Only print the same message once.
// Only print the same message once.
if currError := err . Error ( ) ; lastError != currError {
if currError := err . Error ( ) ; lastError != currError {
fmt . Fprintf ( l . stderr , "logtail: upload: %v\n" , err )
fmt . Fprintf ( l g . stderr , "logtail: upload: %v\n" , err )
lastError = currError
lastError = currError
}
}
@ -407,55 +407,55 @@ func (l *Logger) uploading(ctx context.Context) {
} else {
} else {
// Only print a success message after recovery.
// Only print a success message after recovery.
if numFailures > 0 {
if numFailures > 0 {
fmt . Fprintf ( l . stderr , "logtail: upload succeeded after %d failures and %s\n" , numFailures , l . clock . Since ( firstFailure ) . Round ( time . Second ) )
fmt . Fprintf ( l g . stderr , "logtail: upload succeeded after %d failures and %s\n" , numFailures , l g . clock . Since ( firstFailure ) . Round ( time . Second ) )
}
}
break
break
}
}
}
}
select {
select {
case <- l . shutdownStart :
case <- l g . shutdownStart :
return
return
default :
default :
}
}
}
}
}
}
func ( l * Logger ) internetUp ( ) bool {
func ( l g * Logger ) internetUp ( ) bool {
select {
select {
case <- l . networkIsUp . Ready ( ) :
case <- l g . networkIsUp . Ready ( ) :
return true
return true
default :
default :
if l . netMonitor == nil {
if l g . netMonitor == nil {
return true // No way to tell, so assume it is.
return true // No way to tell, so assume it is.
}
}
return l . netMonitor . InterfaceState ( ) . AnyInterfaceUp ( )
return l g . netMonitor . InterfaceState ( ) . AnyInterfaceUp ( )
}
}
}
}
// onChangeDelta is an eventbus subscriber function that handles
// onChangeDelta is an eventbus subscriber function that handles
// [netmon.ChangeDelta] events to detect whether the Internet is expected to be
// [netmon.ChangeDelta] events to detect whether the Internet is expected to be
// reachable.
// reachable.
func ( l * Logger ) onChangeDelta ( delta * netmon . ChangeDelta ) {
func ( l g * Logger ) onChangeDelta ( delta * netmon . ChangeDelta ) {
if delta . New . AnyInterfaceUp ( ) {
if delta . New . AnyInterfaceUp ( ) {
fmt . Fprintf ( l . stderr , "logtail: internet back up\n" )
fmt . Fprintf ( l g . stderr , "logtail: internet back up\n" )
l . networkIsUp . Set ( )
l g . networkIsUp . Set ( )
} else {
} else {
fmt . Fprintf ( l . stderr , "logtail: network changed, but is not up\n" )
fmt . Fprintf ( l g . stderr , "logtail: network changed, but is not up\n" )
l . networkIsUp . Reset ( )
l g . networkIsUp . Reset ( )
}
}
}
}
func ( l * Logger ) awaitInternetUp ( ctx context . Context ) {
func ( l g * Logger ) awaitInternetUp ( ctx context . Context ) {
if l . eventClient != nil {
if l g . eventClient != nil {
select {
select {
case <- l . networkIsUp . Ready ( ) :
case <- l g . networkIsUp . Ready ( ) :
case <- ctx . Done ( ) :
case <- ctx . Done ( ) :
}
}
return
return
}
}
upc := make ( chan bool , 1 )
upc := make ( chan bool , 1 )
defer l . netMonitor . RegisterChangeCallback ( func ( delta * netmon . ChangeDelta ) {
defer l g . netMonitor . RegisterChangeCallback ( func ( delta * netmon . ChangeDelta ) {
if delta . New . AnyInterfaceUp ( ) {
if delta . New . AnyInterfaceUp ( ) {
select {
select {
case upc <- true :
case upc <- true :
@ -463,12 +463,12 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
}
}
}
}
} ) ( )
} ) ( )
if l . internetUp ( ) {
if l g . internetUp ( ) {
return
return
}
}
select {
select {
case <- upc :
case <- upc :
fmt . Fprintf ( l . stderr , "logtail: internet back up\n" )
fmt . Fprintf ( l g . stderr , "logtail: internet back up\n" )
case <- ctx . Done ( ) :
case <- ctx . Done ( ) :
}
}
}
}
@ -476,13 +476,13 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
// upload uploads body to the log server.
// upload uploads body to the log server.
// origlen indicates the pre-compression body length.
// origlen indicates the pre-compression body length.
// origlen of -1 indicates that the body is not compressed.
// origlen of -1 indicates that the body is not compressed.
func ( l * Logger ) upload ( ctx context . Context , body [ ] byte , origlen int ) ( retryAfter time . Duration , err error ) {
func ( l g * Logger ) upload ( ctx context . Context , body [ ] byte , origlen int ) ( retryAfter time . Duration , err error ) {
const maxUploadTime = 45 * time . Second
const maxUploadTime = 45 * time . Second
ctx = sockstats . WithSockStats ( ctx , l . sockstatsLabel . Load ( ) , l . Logf )
ctx = sockstats . WithSockStats ( ctx , l g . sockstatsLabel . Load ( ) , l g . Logf )
ctx , cancel := context . WithTimeout ( ctx , maxUploadTime )
ctx , cancel := context . WithTimeout ( ctx , maxUploadTime )
defer cancel ( )
defer cancel ( )
req , err := http . NewRequestWithContext ( ctx , "POST" , l . url , bytes . NewReader ( body ) )
req , err := http . NewRequestWithContext ( ctx , "POST" , l g . url , bytes . NewReader ( body ) )
if err != nil {
if err != nil {
// I know of no conditions under which this could fail.
// I know of no conditions under which this could fail.
// Report it very loudly.
// Report it very loudly.
@ -513,8 +513,8 @@ func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAft
compressedNote = "compressed"
compressedNote = "compressed"
}
}
l . httpDoCalls . Add ( 1 )
l g . httpDoCalls . Add ( 1 )
resp , err := l . httpc . Do ( req )
resp , err := l g . httpc . Do ( req )
if err != nil {
if err != nil {
return 0 , fmt . Errorf ( "log upload of %d bytes %s failed: %v" , len ( body ) , compressedNote , err )
return 0 , fmt . Errorf ( "log upload of %d bytes %s failed: %v" , len ( body ) , compressedNote , err )
}
}
@ -533,16 +533,16 @@ func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAft
//
//
// TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
// TODO(bradfitz): this apparently just returns nil, as of tailscale/corp@9c2ec35.
// Finish cleaning this up.
// Finish cleaning this up.
func ( l * Logger ) Flush ( ) error {
func ( l g * Logger ) Flush ( ) error {
return nil
return nil
}
}
// StartFlush starts a log upload, if anything is pending.
// StartFlush starts a log upload, if anything is pending.
//
//
// If l is nil, StartFlush is a no-op.
// If l is nil, StartFlush is a no-op.
func ( l * Logger ) StartFlush ( ) {
func ( l g * Logger ) StartFlush ( ) {
if l != nil {
if l g != nil {
l . tryDrainWake ( )
l g . tryDrainWake ( )
}
}
}
}
@ -558,41 +558,41 @@ var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
// tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
// tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
// It does not block.
// It does not block.
func ( l * Logger ) tryDrainWake ( ) {
func ( l g * Logger ) tryDrainWake ( ) {
l . flushPending . Store ( false )
l g . flushPending . Store ( false )
if debugWakesAndUploads ( ) {
if debugWakesAndUploads ( ) {
// Using println instead of log.Printf here to avoid recursing back into
// Using println instead of log.Printf here to avoid recursing back into
// ourselves.
// ourselves.
println ( "logtail: try drain wake, numHTTP:" , l . httpDoCalls . Load ( ) )
println ( "logtail: try drain wake, numHTTP:" , l g . httpDoCalls . Load ( ) )
}
}
select {
select {
case l . drainWake <- struct { } { } :
case l g . drainWake <- struct { } { } :
default :
default :
}
}
}
}
func ( l * Logger ) sendLocked ( jsonBlob [ ] byte ) ( int , error ) {
func ( l g * Logger ) sendLocked ( jsonBlob [ ] byte ) ( int , error ) {
tapSend ( jsonBlob )
tapSend ( jsonBlob )
if logtailDisabled . Load ( ) {
if logtailDisabled . Load ( ) {
return len ( jsonBlob ) , nil
return len ( jsonBlob ) , nil
}
}
n , err := l . buffer . Write ( jsonBlob )
n , err := l g . buffer . Write ( jsonBlob )
flushDelay := defaultFlushDelay
flushDelay := defaultFlushDelay
if l . flushDelayFn != nil {
if l g . flushDelayFn != nil {
flushDelay = l . flushDelayFn ( )
flushDelay = l g . flushDelayFn ( )
}
}
if flushDelay > 0 {
if flushDelay > 0 {
if l . flushPending . CompareAndSwap ( false , true ) {
if l g . flushPending . CompareAndSwap ( false , true ) {
if l . flushTimer == nil {
if l g . flushTimer == nil {
l . flushTimer = l . clock . AfterFunc ( flushDelay , l . tryDrainWake )
l g . flushTimer = l g . clock . AfterFunc ( flushDelay , l g . tryDrainWake )
} else {
} else {
l . flushTimer . Reset ( flushDelay )
l g . flushTimer . Reset ( flushDelay )
}
}
}
}
} else {
} else {
l . tryDrainWake ( )
l g . tryDrainWake ( )
}
}
return n , err
return n , err
}
}
@ -600,13 +600,13 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
// appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
// appendMetadata appends optional "logtail", "metrics", and "v" JSON members.
// This assumes dst is already within a JSON object.
// This assumes dst is already within a JSON object.
// Each member is comma-terminated.
// Each member is comma-terminated.
func ( l * Logger ) appendMetadata ( dst [ ] byte , skipClientTime , skipMetrics bool , procID uint32 , procSequence uint64 , errDetail string , errData jsontext . Value , level int ) [ ] byte {
func ( l g * Logger ) appendMetadata ( dst [ ] byte , skipClientTime , skipMetrics bool , procID uint32 , procSequence uint64 , errDetail string , errData jsontext . Value , level int ) [ ] byte {
// Append optional logtail metadata.
// Append optional logtail metadata.
if ! skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
if ! skipClientTime || procID != 0 || procSequence != 0 || errDetail != "" || errData != nil {
dst = append ( dst , ` "logtail": { ` ... )
dst = append ( dst , ` "logtail": { ` ... )
if ! skipClientTime {
if ! skipClientTime {
dst = append ( dst , ` "client_time":" ` ... )
dst = append ( dst , ` "client_time":" ` ... )
dst = l . clock . Now ( ) . UTC ( ) . AppendFormat ( dst , time . RFC3339Nano )
dst = l g . clock . Now ( ) . UTC ( ) . AppendFormat ( dst , time . RFC3339Nano )
dst = append ( dst , '"' , ',' )
dst = append ( dst , '"' , ',' )
}
}
if procID != 0 {
if procID != 0 {
@ -639,8 +639,8 @@ func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, pr
}
}
// Append optional metrics metadata.
// Append optional metrics metadata.
if ! skipMetrics && l . metricsDelta != nil {
if ! skipMetrics && l g . metricsDelta != nil {
if d := l . metricsDelta ( ) ; d != "" {
if d := l g . metricsDelta ( ) ; d != "" {
dst = append ( dst , ` "metrics":" ` ... )
dst = append ( dst , ` "metrics":" ` ... )
dst = append ( dst , d ... )
dst = append ( dst , d ... )
dst = append ( dst , '"' , ',' )
dst = append ( dst , '"' , ',' )
@ -660,10 +660,10 @@ func (l *Logger) appendMetadata(dst []byte, skipClientTime, skipMetrics bool, pr
}
}
// appendText appends a raw text message in the Tailscale JSON log entry format.
// appendText appends a raw text message in the Tailscale JSON log entry format.
func ( l * Logger ) appendText ( dst , src [ ] byte , skipClientTime bool , procID uint32 , procSequence uint64 , level int ) [ ] byte {
func ( l g * Logger ) appendText ( dst , src [ ] byte , skipClientTime bool , procID uint32 , procSequence uint64 , level int ) [ ] byte {
dst = slices . Grow ( dst , len ( src ) )
dst = slices . Grow ( dst , len ( src ) )
dst = append ( dst , '{' )
dst = append ( dst , '{' )
dst = l . appendMetadata ( dst , skipClientTime , false , procID , procSequence , "" , nil , level )
dst = l g . appendMetadata ( dst , skipClientTime , false , procID , procSequence , "" , nil , level )
if len ( src ) == 0 {
if len ( src ) == 0 {
dst = bytes . TrimRight ( dst , "," )
dst = bytes . TrimRight ( dst , "," )
return append ( dst , "}\n" ... )
return append ( dst , "}\n" ... )
@ -672,7 +672,7 @@ func (l *Logger) appendText(dst, src []byte, skipClientTime bool, procID uint32,
// Append the text string, which may be truncated.
// Append the text string, which may be truncated.
// Invalid UTF-8 will be mangled with the Unicode replacement character.
// Invalid UTF-8 will be mangled with the Unicode replacement character.
max := maxTextSize
max := maxTextSize
if l . lowMem {
if l g . lowMem {
max /= lowMemRatio
max /= lowMemRatio
}
}
dst = append ( dst , ` "text": ` ... )
dst = append ( dst , ` "text": ` ... )
@ -697,12 +697,12 @@ func appendTruncatedString(dst, src []byte, n int) []byte {
// appendTextOrJSONLocked appends a raw text message or a raw JSON object
// appendTextOrJSONLocked appends a raw text message or a raw JSON object
// in the Tailscale JSON log format.
// in the Tailscale JSON log format.
func ( l * Logger ) appendTextOrJSONLocked ( dst , src [ ] byte , level int ) [ ] byte {
func ( l g * Logger ) appendTextOrJSONLocked ( dst , src [ ] byte , level int ) [ ] byte {
if l . includeProcSequence {
if l g . includeProcSequence {
l . procSequence ++
l g . procSequence ++
}
}
if len ( src ) == 0 || src [ 0 ] != '{' {
if len ( src ) == 0 || src [ 0 ] != '{' {
return l . appendText ( dst , src , l . skipClientTime , l . procID , l . procSequence , level )
return l g . appendText ( dst , src , l g . skipClientTime , l g . procID , l g . procSequence , level )
}
}
// Check whether the input is a valid JSON object and
// Check whether the input is a valid JSON object and
@ -714,11 +714,11 @@ func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
// However, bytes.NewBuffer normally allocates unless
// However, bytes.NewBuffer normally allocates unless
// we immediately shallow copy it into a pre-allocated Buffer struct.
// we immediately shallow copy it into a pre-allocated Buffer struct.
// See https://go.dev/issue/67004.
// See https://go.dev/issue/67004.
l . bytesBuf = * bytes . NewBuffer ( src )
l g . bytesBuf = * bytes . NewBuffer ( src )
defer func ( ) { l . bytesBuf = bytes . Buffer { } } ( ) // avoid pinning src
defer func ( ) { l g . bytesBuf = bytes . Buffer { } } ( ) // avoid pinning src
dec := & l . jsonDec
dec := & l g . jsonDec
dec . Reset ( & l . bytesBuf )
dec . Reset ( & l g . bytesBuf )
if tok , err := dec . ReadToken ( ) ; tok . Kind ( ) != '{' || err != nil {
if tok , err := dec . ReadToken ( ) ; tok . Kind ( ) != '{' || err != nil {
return false
return false
}
}
@ -750,7 +750,7 @@ func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
// Treat invalid JSON as a raw text message.
// Treat invalid JSON as a raw text message.
if ! validJSON {
if ! validJSON {
return l . appendText ( dst , src , l . skipClientTime , l . procID , l . procSequence , level )
return l g . appendText ( dst , src , l g . skipClientTime , l g . procID , l g . procSequence , level )
}
}
// Check whether the JSON payload is too large.
// Check whether the JSON payload is too large.
@ -758,13 +758,13 @@ func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
// That's okay as the Tailscale log service limit is actually 2*maxSize.
// That's okay as the Tailscale log service limit is actually 2*maxSize.
// However, so long as logging applications aim to target the maxSize limit,
// However, so long as logging applications aim to target the maxSize limit,
// there should be no trouble eventually uploading logs.
// there should be no trouble eventually uploading logs.
maxLen := cmp . Or ( l . maxUploadSize , maxSize )
maxLen := cmp . Or ( l g . maxUploadSize , maxSize )
if len ( src ) > maxLen {
if len ( src ) > maxLen {
errDetail := fmt . Sprintf ( "entry too large: %d bytes" , len ( src ) )
errDetail := fmt . Sprintf ( "entry too large: %d bytes" , len ( src ) )
errData := appendTruncatedString ( nil , src , maxLen / len ( ` \uffff ` ) ) // escaping could increase size
errData := appendTruncatedString ( nil , src , maxLen / len ( ` \uffff ` ) ) // escaping could increase size
dst = append ( dst , '{' )
dst = append ( dst , '{' )
dst = l . appendMetadata ( dst , l . skipClientTime , true , l . procID , l . procSequence , errDetail , errData , level )
dst = l g . appendMetadata ( dst , l g . skipClientTime , true , l g . procID , l g . procSequence , errDetail , errData , level )
dst = bytes . TrimRight ( dst , "," )
dst = bytes . TrimRight ( dst , "," )
return append ( dst , "}\n" ... )
return append ( dst , "}\n" ... )
}
}
@ -781,7 +781,7 @@ func (l *Logger) appendTextOrJSONLocked(dst, src []byte, level int) []byte {
}
}
dst = slices . Grow ( dst , len ( src ) )
dst = slices . Grow ( dst , len ( src ) )
dst = append ( dst , '{' )
dst = append ( dst , '{' )
dst = l . appendMetadata ( dst , l . skipClientTime , true , l . procID , l . procSequence , errDetail , errData , level )
dst = l g . appendMetadata ( dst , l g . skipClientTime , true , l g . procID , l g . procSequence , errDetail , errData , level )
if logtailValLength > 0 {
if logtailValLength > 0 {
// Exclude original logtail member from the message.
// Exclude original logtail member from the message.
dst = appendWithoutNewline ( dst , src [ len ( "{" ) : logtailKeyOffset ] )
dst = appendWithoutNewline ( dst , src [ len ( "{" ) : logtailKeyOffset ] )
@ -808,8 +808,8 @@ func appendWithoutNewline(dst, src []byte) []byte {
}
}
// Logf logs to l using the provided fmt-style format and optional arguments.
// Logf logs to l using the provided fmt-style format and optional arguments.
func ( l * Logger ) Logf ( format string , args ... any ) {
func ( l g * Logger ) Logf ( format string , args ... any ) {
fmt . Fprintf ( l , format , args ... )
fmt . Fprintf ( l g , format , args ... )
}
}
// Write logs an encoded JSON blob.
// Write logs an encoded JSON blob.
@ -818,29 +818,29 @@ func (l *Logger) Logf(format string, args ...any) {
// then contents is fit into a JSON blob and written.
// then contents is fit into a JSON blob and written.
//
//
// This is intended as an interface for the stdlib "log" package.
// This is intended as an interface for the stdlib "log" package.
func ( l * Logger ) Write ( buf [ ] byte ) ( int , error ) {
func ( l g * Logger ) Write ( buf [ ] byte ) ( int , error ) {
if len ( buf ) == 0 {
if len ( buf ) == 0 {
return 0 , nil
return 0 , nil
}
}
inLen := len ( buf ) // length as provided to us, before modifications to downstream writers
inLen := len ( buf ) // length as provided to us, before modifications to downstream writers
level , buf := parseAndRemoveLogLevel ( buf )
level , buf := parseAndRemoveLogLevel ( buf )
if l . stderr != nil && l . stderr != io . Discard && int64 ( level ) <= atomic . LoadInt64 ( & l . stderrLevel ) {
if l g . stderr != nil && l g . stderr != io . Discard && int64 ( level ) <= atomic . LoadInt64 ( & l g . stderrLevel ) {
if buf [ len ( buf ) - 1 ] == '\n' {
if buf [ len ( buf ) - 1 ] == '\n' {
l . stderr . Write ( buf )
l g . stderr . Write ( buf )
} else {
} else {
// The log package always line-terminates logs,
// The log package always line-terminates logs,
// so this is an uncommon path.
// so this is an uncommon path.
withNL := append ( buf [ : len ( buf ) : len ( buf ) ] , '\n' )
withNL := append ( buf [ : len ( buf ) : len ( buf ) ] , '\n' )
l . stderr . Write ( withNL )
l g . stderr . Write ( withNL )
}
}
}
}
l . writeLock . Lock ( )
l g . writeLock . Lock ( )
defer l . writeLock . Unlock ( )
defer l g . writeLock . Unlock ( )
b := l . appendTextOrJSONLocked ( l . writeBuf [ : 0 ] , buf , level )
b := l g . appendTextOrJSONLocked ( l g . writeBuf [ : 0 ] , buf , level )
_ , err := l . sendLocked ( b )
_ , err := l g . sendLocked ( b )
return inLen , err
return inLen , err
}
}