Skip to content

Commit

Permalink
[tracing] reduce locking section for accumulating span collector
Browse files Browse the repository at this point in the history
  • Loading branch information
rmannibucau committed Nov 21, 2023
1 parent 7012fa5 commit 107dcd4
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import static java.util.stream.Collectors.toList;

/**
* Creates a collector of span which triggers a flush when the buffer reaches its max size.
* You can combine it with a scheduled flushing if you need to but the scheduler handling is out of scope of this class.
* <p>
* You will generally set a {@code onFlush} callback to actually push somewhere the spans - by default nothing is done.
*/
public class AccumulatingSpanCollector implements Consumer<Span>, AutoCloseable {
private final Map<Span, Span> buffer = new ConcurrentHashMap<>();
private final Buffer<Span> buffer = new Buffer<>();
private final int bufferSize;
private Consumer<Collection<Span>> onFlush;
private volatile boolean closed = true;
private final ReentrantLock lock = new ReentrantLock();

public AccumulatingSpanCollector() {
this(4096);
Expand Down Expand Up @@ -65,32 +63,49 @@ public void accept(final Span span) {
return;
}

buffer.put(span, span);
buffer.add(span);

// prefer to flush after to ensure we flush on event to not have a pattern encouraging to have staled entries
// note: it can lead to not strictly respecting the buffer size, it is fine
if (buffer.size() > bufferSize) {
synchronized (this) {
Collection<Span> spans = List.of();
lock.lock();
try {
if (buffer.size() > bufferSize) {
flush();
spans = buffer.drain();
}
} finally {
lock.unlock();
}
if (!spans.isEmpty()) {
onFlush.accept(buffer.drain());
}
}
}

public void flush() {
if (onFlush == null) {
return;
if (onFlush != null) {
lock.lock();
try {
onFlush.accept(buffer.drain());
} finally {
lock.unlock();
}
}

final var spans = buffer.keySet().stream().limit(bufferSize).collect(toList());
onFlush.accept(spans);
spans.forEach(buffer::remove);
}

@Override
public void close() {
closed = true;
flush();
if (onFlush != null) {
lock.lock();
try {
while (buffer.size() > 0) {
onFlush.accept(buffer.drain());
}
} finally {
lock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2021-2023 - Yupiik SAS - https://www.yupiik.com
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.yupiik.uship.tracing.collector;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Buffer<T> {
private final Map<T, T> buffer = new ConcurrentHashMap<>();
private final AtomicInteger size = new AtomicInteger();

public void add(final T item) {
buffer.put(item, item);
size.incrementAndGet();
}

public int size() {
return size.get(); // faster than ConcurrentHashMap#size
}

// always called from a single thread
public Collection<T> drain() {
final var all = new ArrayList<>(buffer.keySet());
size.addAndGet(-all.size());
buffer.clear();
return all;
}
}

0 comments on commit 107dcd4

Please sign in to comment.