From 470cd3f4144f7938bd0d6858f14156f2c2379581 Mon Sep 17 00:00:00 2001 From: MickayG Date: Thu, 13 Jul 2017 22:47:21 +0100 Subject: [PATCH 1/2] Support for aggregating partial messages prior to writing to Kafka --- driver.go | 26 ++++++++++++++++++++++++-- driver_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/driver.go b/driver.go index 0d340495..c184ca36 100644 --- a/driver.go +++ b/driver.go @@ -20,6 +20,7 @@ import ( "encoding/json" "strconv" "math" + "strings" ) // An mapped version of logger.Message where Line is a String, not a byte array @@ -161,12 +162,14 @@ func writeLogsToKafka(lf *logPair, topic string, keyStrategy KeyStrategy, tag st dec := protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6) defer dec.Close() var buf logdriver.LogEntry + + var partialBuffer string for { // Check if there are any Kafka errors thus far select { case kafkaErr := <- lf.producer.Errors(): // In the event of an error, continue to attempt to write messages - logrus.Error("error recieved from Kafka", kafkaErr) + logrus.Error("error received from Kafka", kafkaErr) default: //No errors, continue } @@ -180,8 +183,27 @@ func writeLogsToKafka(lf *logPair, topic string, keyStrategy KeyStrategy, tag st dec = protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6) } + lineAsString := string(buf.Line) + + // In the event that the message is partial, we attempt to aggregate if possible + select { + case buf.Partial: + case len(partialBuffer) > 0 && !strings.HasSuffix(lineAsString, "\n"): + { + partialBuffer += lineAsString + continue + } + case len(partialBuffer) > 0 && strings.HasSuffix(lineAsString, "\n"): + { + // Add the remaining line to the buffer and reset the line to be the total buffer size + partialBuffer += lineAsString + lineAsString = partialBuffer + partialBuffer = "" + } + } + var msg LogMessage - msg.Line = string(buf.Line) + msg.Line = lineAsString msg.Source = buf.Source msg.Partial = buf.Partial msg.Timestamp = time.Unix(0, buf.TimeNano) diff --git a/driver_test.go b/driver_test.go index d4005eed..5f4c88dd 100644 --- a/driver_test.go +++ b/driver_test.go @@ -63,6 +63,24 @@ func TestConsumesMultipleLogMessagesFromDocker(t *testing.T) { assertLineMatch(t, "delta", <-producer.Successes()) } +func TestAggregatesPartialLogMessagesFromDocker(t *testing.T) { + producer := NewProducer(t) + defer producer.Close() + + stream := createBufferForLogMessages([]logdriver.LogEntry{ + newPartialLogEntry("alpha"), + newLogEntry("beta"), + newLogEntry("charlie\n"), + }) + + lf := createLogPair(producer, stream) + + producer.ExpectInputAndSucceed() + writeLogsToKafka(&lf, "topic", KEY_BY_TIMESTAMP, TAG) + + assertLineMatch(t, "alphabetacharlie", <-producer.Successes()) +} + func TestJsonIncludesContainerInformation(t *testing.T) { expectedContainerId := "containerid1" expectedContainerName := "containername1" @@ -440,3 +458,10 @@ func newLogEntry(line string) logdriver.LogEntry { le.TimeNano = time.Now().UnixNano() return le } + +func newPartialLogEntry(line string) logdriver.LogEntry { + le := newLogEntry(line) + le.Partial = true + return le +} + From 94e50f1d7294bb83d7140a10107c98531ab005cc Mon Sep 17 00:00:00 2001 From: MickayG Date: Thu, 13 Jul 2017 23:42:01 +0100 Subject: [PATCH 2/2] Support for aggregating partial messages prior to writing to Kafka --- driver.go | 23 ++++++++++++----------- driver_test.go | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/driver.go b/driver.go index c184ca36..dc3ebdc3 100644 --- a/driver.go +++ b/driver.go @@ -180,26 +180,27 @@ func writeLogsToKafka(lf *logPair, topic string, keyStrategy KeyStrategy, tag st lf.stream.Close() return } + logrus.WithField("id", lf.info.ContainerID).WithError(err).Error("error whilst reading from log") dec = protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6) } lineAsString := string(buf.Line) // In the event that the message is partial, we attempt to aggregate if possible - select { + switch { case buf.Partial: + logrus.WithField("line", lineAsString).WithField("buffer", partialBuffer).Debug("Received partial message, start reconstruction") + fallthrough case len(partialBuffer) > 0 && !strings.HasSuffix(lineAsString, "\n"): - { - partialBuffer += lineAsString - continue - } + logrus.WithField("line", lineAsString).WithField("buffer", partialBuffer).Debug("Received partial message, continue reconstruction") + partialBuffer += lineAsString + continue case len(partialBuffer) > 0 && strings.HasSuffix(lineAsString, "\n"): - { - // Add the remaining line to the buffer and reset the line to be the total buffer size - partialBuffer += lineAsString - lineAsString = partialBuffer - partialBuffer = "" - } + logrus.WithField("line", lineAsString).WithField("buffer", partialBuffer).Debug("Received partial message, finish reconstruction") + // Add the remaining line to the buffer and reset the line to be the total buffer size + partialBuffer += lineAsString + lineAsString = partialBuffer + partialBuffer = "" } var msg LogMessage diff --git a/driver_test.go b/driver_test.go index 5f4c88dd..ec67df06 100644 --- a/driver_test.go +++ b/driver_test.go @@ -78,7 +78,7 @@ func TestAggregatesPartialLogMessagesFromDocker(t *testing.T) { producer.ExpectInputAndSucceed() writeLogsToKafka(&lf, "topic", KEY_BY_TIMESTAMP, TAG) - assertLineMatch(t, "alphabetacharlie", <-producer.Successes()) + assertLineMatch(t, "alphabetacharlie\n", <-producer.Successes()) } func TestJsonIncludesContainerInformation(t *testing.T) {