Skip to content

Commit

Permalink
finagle-memcached: Remove Compression Toggle
Browse files Browse the repository at this point in the history
Remove the toggle for enabling and disabling Lz4 compression as the compression has been tested on few services

Differential Revision: https://phabricator.twitter.biz/D1137993
  • Loading branch information
ctutika authored and jenkins committed Apr 18, 2024
1 parent a166c47 commit f9391d9
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 162 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
{
"toggles": [
{
"id": "com.twitter.finagle.filter.CompressingMemcached",
"description": "Enable compressing filter for memcached values",
"fraction": 1.0
}
]
"toggles": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import com.twitter.finagle.memcached.protocol.StorageCommand
import com.twitter.finagle.memcached.protocol.Value
import com.twitter.finagle.memcached.protocol.Values
import com.twitter.finagle.memcached.protocol.ValuesAndErrors
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.toggle.Toggle
import com.twitter.io.Buf
import com.twitter.util.Future
import com.twitter.util.Return
Expand Down Expand Up @@ -59,14 +57,10 @@ private[finagle] final class CompressingMemcachedFilter(
private final val compressionFactory =
CompressionProvider(compressionScheme, statsReceiver)

private val toggle: Toggle = Toggles("com.twitter.finagle.filter.CompressingMemcached")

private val serverInfo = ServerInfo()

override def apply(command: Command, service: Service[Command, Response]): Future[Response] = {
command match {
case storageCommand: StorageCommand =>
if (compressionScheme == Uncompressed || !toggle.isEnabled(serverInfo.id.hashCode)) {
if (compressionScheme == Uncompressed) {
service(storageCommand)
} else { service(compress(storageCommand)) }
case nonStorageCommand: NonStorageCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter {
val clientName = "test_client"
val Timeout: Duration = 15.seconds
val stats = new InMemoryStatsReceiver
private val useCompressionFilerToggleKey = "com.twitter.finagle.filter.CompressingMemcached"

def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, Timeout)

Expand All @@ -49,108 +48,53 @@ class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter {

test("withCompressionScheme Lz4 and toggled on") {

com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 1) {
val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))
val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress])

val client = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.withCompressionScheme(Lz4)
.newRichClient(Name.bound(address), clientName)

awaitResult(client.set("foobar", alwaysCompressedData)) // will be compressed
awaitResult(client.set("baz", neverCompressedData)) // won't be compressed

val alwaysCompressedServiceResponse: Response =
awaitResult[Response](server.service(Gets(Seq(Buf.Utf8("foobar")))))
val neverCompressedServiceResponse = awaitResult(server.service(Gets(Seq(Buf.Utf8("baz")))))

val alwaysCompressedServiceData = getResponseBuf(alwaysCompressedServiceResponse).head
val neverCompressedServiceData = getResponseBuf(neverCompressedServiceResponse).head

val results = awaitResult(
client.gets(Seq("foobar", "baz"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}

val deletedResult = awaitResult {
for {
_ <- client.delete("foobar")
_ <- client.delete("baz")
r <- client.gets(Seq("foobar", "baz"))
} yield r
}

assert(results("foobar") === alwaysCompressedData)
assert(results("baz") === neverCompressedData)
assert(deletedResult.isEmpty)
assert(alwaysCompressedData.length > alwaysCompressedServiceData.length)
assert(neverCompressedData.length == neverCompressedServiceData.length)

assert(stats.counters(Seq(clientName, "lz4", "decompression", "attempted")) == 1)
assert(stats.counters(Seq(clientName, "lz4", "compression", "attempted")) == 1)
assert(stats.counters(Seq(clientName, "lz4", "compression", "skipped")) == 1)

client.close()
server.stop()
val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))
val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress])

val client = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.withCompressionScheme(Lz4)
.newRichClient(Name.bound(address), clientName)

awaitResult(client.set("foobar", alwaysCompressedData)) // will be compressed
awaitResult(client.set("baz", neverCompressedData)) // won't be compressed

val alwaysCompressedServiceResponse: Response =
awaitResult[Response](server.service(Gets(Seq(Buf.Utf8("foobar")))))
val neverCompressedServiceResponse = awaitResult(server.service(Gets(Seq(Buf.Utf8("baz")))))

val alwaysCompressedServiceData = getResponseBuf(alwaysCompressedServiceResponse).head
val neverCompressedServiceData = getResponseBuf(neverCompressedServiceResponse).head

val results = awaitResult(
client.gets(Seq("foobar", "baz"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}
}

test("withCompressionScheme Lz4 and toggled off") {

com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 0) {
val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))
val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress])

val client = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.withCompressionScheme(Lz4)
.newRichClient(Name.bound(address), clientName)

awaitResult(client.set("foobar", alwaysCompressedData)) // will be compressed
awaitResult(client.set("baz", neverCompressedData)) // won't be compressed

val alwaysCompressedServiceResponse: Response =
awaitResult[Response](server.service(Gets(Seq(Buf.Utf8("foobar")))))
val neverCompressedServiceResponse = awaitResult(server.service(Gets(Seq(Buf.Utf8("baz")))))

val alwaysCompressedServiceData = getResponseBuf(alwaysCompressedServiceResponse).head
val neverCompressedServiceData = getResponseBuf(neverCompressedServiceResponse).head

val results = awaitResult(
client.gets(Seq("foobar", "baz"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}

val deletedResult = awaitResult {
for {
_ <- client.delete("foobar")
_ <- client.delete("baz")
r <- client.gets(Seq("foobar", "baz"))
} yield r
}

assert(results("foobar") === alwaysCompressedData)
assert(results("baz") === neverCompressedData)
assert(deletedResult.isEmpty)
assert(alwaysCompressedData.length == alwaysCompressedServiceData.length)
assert(neverCompressedData.length == neverCompressedServiceData.length)

assert(stats.counters(Seq(clientName, "lz4", "decompression", "attempted")) == 0)
assert(stats.counters(Seq(clientName, "lz4", "compression", "attempted")) == 0)
assert(stats.counters(Seq(clientName, "lz4", "compression", "skipped")) == 0)

client.close()
server.stop()
val deletedResult = awaitResult {
for {
_ <- client.delete("foobar")
_ <- client.delete("baz")
r <- client.gets(Seq("foobar", "baz"))
} yield r
}

assert(results("foobar") === alwaysCompressedData)
assert(results("baz") === neverCompressedData)
assert(deletedResult.isEmpty)
assert(alwaysCompressedData.length > alwaysCompressedServiceData.length)
assert(neverCompressedData.length == neverCompressedServiceData.length)

assert(stats.counters(Seq(clientName, "lz4", "decompression", "attempted")) == 1)
assert(stats.counters(Seq(clientName, "lz4", "compression", "attempted")) == 1)
assert(stats.counters(Seq(clientName, "lz4", "compression", "skipped")) == 1)

client.close()
server.stop()
}

test("withCompressionScheme Uncompressed") {
Expand Down Expand Up @@ -207,51 +151,49 @@ class MemcachedCompressingClientTest extends AnyFunSuite with BeforeAndAfter {

test("Clients with different compression schemes can decompress each other's values") {

com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 1) {
val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))
val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress])

val compressionClient = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.withCompressionScheme(Lz4)
.newRichClient(Name.bound(address), clientName)

val uncompressedClient = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.newRichClient(Name.bound(address), clientName)

awaitResult(uncompressedClient.set("foobar", alwaysCompressedData))
awaitResult(uncompressedClient.set("baz", neverCompressedData))

awaitResult(compressionClient.set("foo", alwaysCompressedData))
awaitResult(compressionClient.set("bazbar", neverCompressedData))

val compressionClientResults = awaitResult(
compressionClient.gets(Seq("foobar", "baz"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}

val clientResults = awaitResult(
uncompressedClient.gets(Seq("foo", "bazbar"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}

assert(compressionClientResults("foobar") === alwaysCompressedData)
assert(compressionClientResults("baz") === neverCompressedData)
assert(clientResults("foo") === alwaysCompressedData)
assert(clientResults("bazbar") === neverCompressedData)

uncompressedClient.close()
compressionClient.close()
server.stop()
val server = new InProcessMemcached(new InetSocketAddress(InetAddress.getLoopbackAddress, 0))
val address = Address(server.start().boundAddress.asInstanceOf[InetSocketAddress])

val compressionClient = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.withCompressionScheme(Lz4)
.newRichClient(Name.bound(address), clientName)

val uncompressedClient = Memcached.client
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.connectionsPerEndpoint(1)
.withStatsReceiver(stats)
.newRichClient(Name.bound(address), clientName)

awaitResult(uncompressedClient.set("foobar", alwaysCompressedData))
awaitResult(uncompressedClient.set("baz", neverCompressedData))

awaitResult(compressionClient.set("foo", alwaysCompressedData))
awaitResult(compressionClient.set("bazbar", neverCompressedData))

val compressionClientResults = awaitResult(
compressionClient.gets(Seq("foobar", "baz"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}

val clientResults = awaitResult(
uncompressedClient.gets(Seq("foo", "bazbar"))
).flatMap {
case (key, (value1, _)) =>
Map((key, value1))
}

assert(compressionClientResults("foobar") === alwaysCompressedData)
assert(compressionClientResults("baz") === neverCompressedData)
assert(clientResults("foo") === alwaysCompressedData)
assert(clientResults("bazbar") === neverCompressedData)

uncompressedClient.close()
compressionClient.close()
server.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class CompressingMemcachedFilterTest
with ScalaCheckPropertyChecks {

val TimeOut = 15.seconds
private val useCompressionFilerToggleKey = "com.twitter.finagle.filter.CompressingMemcached"

private def awaitResult[T](awaitable: Awaitable[T]): T = Await.result(awaitable, TimeOut)

Expand All @@ -43,7 +42,6 @@ class CompressingMemcachedFilterTest
stats.clear()
}

com.twitter.finagle.toggle.flag.overrides.let(useCompressionFilerToggleKey, 1) {

test("Add is correctly processed and compressed") {

Expand Down Expand Up @@ -323,5 +321,4 @@ class CompressingMemcachedFilterTest
}
}
}
}
}

0 comments on commit f9391d9

Please sign in to comment.