Skip to content

Commit

Permalink
ztunnel logs with multiple nodes (#7339)
Browse files Browse the repository at this point in the history
* Visualize ztunnel logs when there are multiple nodes
  • Loading branch information
josunect committed May 10, 2024
1 parent 792b516 commit 7cd66f0
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 124 deletions.
239 changes: 127 additions & 112 deletions business/workloads.go
Expand Up @@ -571,7 +571,7 @@ func parseLogLine(line string, isProxy bool, engardeParser *parser.Parser) *LogE
return &entry
}

func parseZtunnelLine(line string) *LogEntry {
func parseZtunnelLine(line, name string) *LogEntry {
entry := LogEntry{
Message: "",
Timestamp: "",
Expand All @@ -589,14 +589,14 @@ func parseZtunnelLine(line string) *LogEntry {

if len(msgSplit) < 5 {
log.Debugf("Error splitting log line [%s]", line)
entry.Message = line
entry.Message = fmt.Sprintf("[%s] %s", name, line)
return &entry
}

entry.Message = msgSplit[4]
entry.Message = fmt.Sprintf("[%s] %s", name, msgSplit[4])
if entry.Message == "" {
log.Debugf("Skipping empty log line [%s]", line)
entry.Message = line
entry.Message = fmt.Sprintf("[%s] %s", name, line)
return &entry
}

Expand Down Expand Up @@ -2072,7 +2072,7 @@ func (in *WorkloadService) GetWorkloadAppName(ctx context.Context, cluster, name
// streamParsedLogs fetches logs from a container in a pod, parses and decorates each log line with some metadata (if possible) and
// sends the processed lines to the client in JSON format. Results are sent as processing is performed, so in case of any error when
// doing processing the JSON document will be truncated.
func (in *WorkloadService) streamParsedLogs(cluster, namespace, name string, opts *LogOptions, w http.ResponseWriter) error {
func (in *WorkloadService) streamParsedLogs(cluster, namespace string, names []string, opts *LogOptions, w http.ResponseWriter) error {
userClient, ok := in.userClients[cluster]
if !ok {
return fmt.Errorf("user client for cluster [%s] not found", cluster)
Expand All @@ -2088,130 +2088,143 @@ func (in *WorkloadService) streamParsedLogs(cluster, namespace, name string, opt
// discard the logs after sinceTime+duration
isBounded := opts.Duration != nil

logsReader, err := userClient.StreamPodLogs(namespace, name, &k8sOpts)
if err != nil {
return err
}

defer func() {
e := logsReader.Close()
if e != nil {
log.Errorf("Error when closing the connection streaming logs of a pod: %s", e.Error())
firstEntry := true
firstWritter := true
for i, name := range names {
logsReader, err := userClient.StreamPodLogs(namespace, name, &k8sOpts)
if err != nil {
return err
}
}()

bufferedReader := bufio.NewReader(logsReader)

var startTime *time.Time
var endTime *time.Time
if k8sOpts.SinceTime != nil {
startTime = &k8sOpts.SinceTime.Time
if isBounded {
end := startTime.Add(*opts.Duration)
endTime = &end
}
}
defer func() {
e := logsReader.Close()
if e != nil {
log.Errorf("Error when closing the connection streaming logs of a pod: %s", e.Error())
}
}()

// To avoid high memory usage, the JSON will be written
// to the HTTP Response as it's received from the cluster API.
// That is, each log line is parsed, decorated with Kiali's metadata,
// marshalled to JSON and immediately written to the HTTP Response.
// This means that it is needed to push HTTP headers and start writing
// the response body right now and any errors at the middle of the log
// processing can no longer be informed to the client. So, starting
// these lines, the best we can do if some error happens is to simply
// log the error and stop/truncate the response, which will have the
// effect of sending an incomplete JSON document that the browser will fail
// to parse. Hopefully, the client/UI can catch the parsing error and
// properly show an error message about the failure retrieving logs.
w.Header().Set("Content-Type", "application/json")
_, writeErr := w.Write([]byte("{\"entries\":[")) // This starts the JSON document
if writeErr != nil {
return writeErr
}
bufferedReader := bufio.NewReader(logsReader)

firstEntry := true
line, readErr := bufferedReader.ReadString('\n')
linesWritten := 0
for ; readErr == nil || (readErr == io.EOF && len(line) > 0); line, readErr = bufferedReader.ReadString('\n') {
// Abort if we already reached the requested max-lines limit
if opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
break
var startTime *time.Time
var endTime *time.Time
if k8sOpts.SinceTime != nil {
startTime = &k8sOpts.SinceTime.Time
if isBounded {
end := startTime.Add(*opts.Duration)
endTime = &end
}
}

var entry *LogEntry
if opts.LogType == models.LogTypeZtunnel {
entry = parseZtunnelLine(line)
} else {
entry = parseLogLine(line, opts.LogType == models.LogTypeProxy, engardeParser)
var writeErr error

if firstWritter {
// To avoid high memory usage, the JSON will be written
// to the HTTP Response as it's received from the cluster API.
// That is, each log line is parsed, decorated with Kiali's metadata,
// marshalled to JSON and immediately written to the HTTP Response.
// This means that it is needed to push HTTP headers and start writing
// the response body right now and any errors at the middle of the log
// processing can no longer be informed to the client. So, starting
// these lines, the best we can do if some error happens is to simply
// log the error and stop/truncate the response, which will have the
// effect of sending an incomplete JSON document that the browser will fail
// to parse. Hopefully, the client/UI can catch the parsing error and
// properly show an error message about the failure retrieving logs.
w.Header().Set("Content-Type", "application/json")
_, writeErr = w.Write([]byte("{\"entries\":[")) // This starts the JSON document
if writeErr != nil {
return writeErr
}
firstWritter = false
}

if entry == nil {
continue
}
line, readErr := bufferedReader.ReadString('\n')
linesWritten := 0
for ; readErr == nil || (readErr == io.EOF && len(line) > 0); line, readErr = bufferedReader.ReadString('\n') {
// Abort if we already reached the requested max-lines limit
if opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
break
}

if opts.LogType == models.LogTypeZtunnel && !filterMatches(entry.Message, opts.filter) {
continue
}
var entry *LogEntry
if opts.LogType == models.LogTypeZtunnel {
entry = parseZtunnelLine(line, name)
} else {
entry = parseLogLine(line, opts.LogType == models.LogTypeProxy, engardeParser)
}

// If we are past the requested time window then stop processing
if startTime == nil {
startTime = &entry.OriginalTime
}
if entry == nil {
continue
}

if isBounded {
if endTime == nil {
end := entry.OriginalTime.Add(*opts.Duration)
endTime = &end
if opts.LogType == models.LogTypeZtunnel && !filterMatches(entry.Message, opts.filter) {
continue
}

if entry.OriginalTime.After(*endTime) {
break
// If we are past the requested time window then stop processing
if startTime == nil {
startTime = &entry.OriginalTime
}
}

// Send to client the processed log line
if isBounded {
if endTime == nil {
end := entry.OriginalTime.Add(*opts.Duration)
endTime = &end
}

response, err := json.Marshal(entry)
if err != nil {
// Remember that since the HTTP Response body is already being sent,
// it is not possible to change the response code. So, log the error
// and terminate early the response.
log.Errorf("Error when marshalling JSON while streaming pod logs: %s", err.Error())
return nil
}
if entry.OriginalTime.After(*endTime) {
break
}
}

if firstEntry {
firstEntry = false
} else {
_, writeErr = w.Write([]byte{','})
if writeErr != nil {
// Send to client the processed log line

response, err := json.Marshal(entry)
if err != nil {
// Remember that since the HTTP Response body is already being sent,
// it is not possible to change the response code. So, log the error
// and terminate early the response.
log.Errorf("Error when writing log entries separator: %s", writeErr.Error())
log.Errorf("Error when marshalling JSON while streaming pod logs: %s", err.Error())
return nil
}
}

_, writeErr = w.Write(response)
if writeErr != nil {
log.Errorf("Error when writing a processed log entry while streaming pod logs: %s", writeErr.Error())
return nil
}
if firstEntry {
firstEntry = false
} else {
_, writeErr = w.Write([]byte{','})
if writeErr != nil {
// Remember that since the HTTP Response body is already being sent,
// it is not possible to change the response code. So, log the error
// and terminate early the response.
log.Errorf("Error when writing log entries separator: %s", writeErr.Error())
return nil
}
}

linesWritten++
}
_, writeErr = w.Write(response)
if writeErr != nil {
log.Errorf("Error when writing a processed log entry while streaming pod logs: %s", writeErr.Error())
return nil
}

if readErr == nil && opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
// End the JSON document, setting the max-lines truncated flag
_, writeErr = w.Write([]byte("], \"linesTruncated\": true}"))
} else {
// End the JSON document
_, writeErr = w.Write([]byte("]}"))
}
if writeErr != nil {
log.Errorf("Error when writing the outro of the JSON document while streaming pod logs: %s", err.Error())
linesWritten++
}
if readErr == nil && opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
// End the JSON document, setting the max-lines truncated flag
_, writeErr = w.Write([]byte("], \"linesTruncated\": true}"))
if writeErr != nil {
log.Errorf("Error when writing the outro of the JSON document while streaming pod logs: %s", err.Error())
}
break
} else {
if i == len(names)-1 {
// End the JSON document
_, writeErr = w.Write([]byte("]}"))
if writeErr != nil {
log.Errorf("Error when writing the outro of the JSON document while streaming pod logs: %s", err.Error())
}
}
}
}

return nil
Expand All @@ -2220,26 +2233,28 @@ func (in *WorkloadService) streamParsedLogs(cluster, namespace, name string, opt
// StreamPodLogs streams pod logs to an HTTP Response given the provided options
func (in *WorkloadService) StreamPodLogs(cluster, namespace, name string, opts *LogOptions, w http.ResponseWriter) error {

names := []string{}
if opts.LogType == models.LogTypeZtunnel {
// First, get ztunnel namespace and containers
pods := in.cache.GetZtunnelPods(cluster)
// This is needed for the K8S client
opts.PodLogOptions.Container = models.IstioProxy
// The ztunnel line should include the pod and the namespace
fs := filterOpts{
destWk: fmt.Sprintf("dst.workload=\"%s\"", name),
destNs: fmt.Sprintf("dst.namespace=\"%s\"", namespace),
srcWk: fmt.Sprintf("src.workload=\"%s\"", name),
srcNs: fmt.Sprintf("src.namespace=\"%s\"", namespace),
destWk: fmt.Sprintf("dst.workload=%s", name),
destNs: fmt.Sprintf("dst.namespace=%s", namespace),
srcWk: fmt.Sprintf("src.workload=%s", name),
srcNs: fmt.Sprintf("src.namespace=%s", namespace),
}
opts.filter = fs
var streamErr error
for _, pod := range pods {
streamErr = in.streamParsedLogs(cluster, pod.Namespace, pod.Name, opts, w)
names = append(names, pod.Name)
}
return streamErr
// They should be all in the same ns
return in.streamParsedLogs(cluster, pods[0].Namespace, names, opts, w)
}
return in.streamParsedLogs(cluster, namespace, name, opts, w)
names = append(names, name)
return in.streamParsedLogs(cluster, namespace, names, opts, w)
}

// AND filter
Expand Down
2 changes: 1 addition & 1 deletion business/workloads_test.go
Expand Up @@ -735,7 +735,7 @@ func TestGetZtunnelPodLogsProxy(t *testing.T) {
require.Equal(1, len(podLogs.Entries))
entry := podLogs.Entries[0]

assert.Equal("src.addr=10.244.0.16:51748 src.workload=\"productpage-v1-87d54dd59-fzflt\" src.namespace=\"bookinfo\" src.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-productpage\" dst.addr=10.244.0.11:15008 dst.service=\"details.bookinfo.svc.cluster.local\" dst.workload=\"details-v1-cf74bb974-wg44w\" dst.namespace=\"bookinfo\" dst.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-details\" direction=\"outbound\" bytes_sent=200 bytes_recv=358 duration=\"1ms\"\n", entry.Message)
assert.Equal("[ztunnel] src.addr=10.244.0.16:51748 src.workload=productpage-v1-87d54dd59-fzflt src.namespace=bookinfo src.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-productpage\" dst.addr=10.244.0.11:15008 dst.service=details.bookinfo.svc.cluster.local dst.workload=details-v1-cf74bb974-wg44w dst.namespace=bookinfo dst.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-details\" direction=\"outbound\" bytes_sent=200 bytes_recv=358 duration=\"1ms\"\n", entry.Message)
assert.Equal("2024-04-12 10:31:51.078", entry.Timestamp)
assert.NotNil(entry.AccessLog)
assert.Equal("358", entry.AccessLog.BytesReceived)
Expand Down

0 comments on commit 7cd66f0

Please sign in to comment.