Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customizable Performance Report #166

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

EvanLai88
Copy link

Customizable Performance Report

Description

  1. Add received, dropped_by_interface from struct nf_stat to NFStreamer's performance report
  2. Add a new class PerformanceStats in utils.py and make update_performances() into its own method.

Customize performance report by inheriting PerformanceStats and override update_performances()

Related #147

Type of change

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How Has This Been Tested?

Test Configuration:

  • OS version: Ubuntu 22.04.2 LTS
  • Python version: 3.11.0rc1
  • Hardware: Proxmox VE virtual machine on i3-8350k, 16GB ram.

Example code:

from nfstream import NFStreamer
from nfstream.utils import PerformanceStats


class CustomStats(PerformanceStats):
    def update_performances(self):
        received = 0
        drops = 0
        drops_if = 0
        processed = 0
        ignored = 0
        for meter in self.performances:
            received += meter["received"].value
            drops += meter["dropped"].value
            drops_if += meter["dropped_if"].value
            ignored += meter["ignored"].value
            processed += meter["processed"].value

        print(
            "received: {}, drop: {}, drop_if: {}, ignored: {}, processed: {}, debt: {}".format(
                received,
                drops,
                drops_if,
                ignored,
                processed,
                received - (drops + drops_if + ignored + processed),
            )
        )


def main():
    # streamer_default_logs = NFStreamer(
    #     source="ens18",
    #     idle_timeout=5,
    #     n_meters=2,
    #     performance_report=1,
    # )

    streamer_custom_logs = NFStreamer(
        source="ens18",
        idle_timeout=5,
        n_meters=2,
        performance_report=1,
        performance_stats=CustomStats,
    )

    for flow in streamer_custom_logs:
        pass


if __name__ == "__main__":
    main()

default log output:

{"flows_expired": 0, "packets_received": 0, "packets_processed": 0, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 0]}
{"flows_expired": 0, "packets_received": 8, "packets_processed": 1, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 1]}
{"flows_expired": 0, "packets_received": 14, "packets_processed": 10, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 10]}
{"flows_expired": 0, "packets_received": 36, "packets_processed": 15, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 15]}
{"flows_expired": 0, "packets_received": 69, "packets_processed": 43, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 43]}
{"flows_expired": 0, "packets_received": 104, "packets_processed": 102, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 102]}
{"flows_expired": 0, "packets_received": 109, "packets_processed": 108, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 108]}
{"flows_expired": 0, "packets_received": 114, "packets_processed": 113, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 113]}
{"flows_expired": 0, "packets_received": 116, "packets_processed": 115, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 115]}
{"flows_expired": 0, "packets_received": 124, "packets_processed": 119, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 119]}
{"flows_expired": 0, "packets_received": 132, "packets_processed": 129, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 129]}
{"flows_expired": 0, "packets_received": 136, "packets_processed": 135, "packets_ignored": 0, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 135]}
{"flows_expired": 0, "packets_received": 141, "packets_processed": 139, "packets_ignored": 1, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 139]}
{"flows_expired": 0, "packets_received": 143, "packets_processed": 141, "packets_ignored": 1, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 141]}
{"flows_expired": 0, "packets_received": 151, "packets_processed": 143, "packets_ignored": 1, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 143]}
{"flows_expired": 0, "packets_received": 155, "packets_processed": 151, "packets_ignored": 1, "packets_dropped_filtered_by_kernel": 0, "packets_dropped_filtered_by_interface": 0, "meters_packets_processing_balance": [0, 151]}
...

custom log output:

received: 0, drop: 0, drop_if: 0, ignored: 0, processed: 0, debt: 0
received: 1, drop: 0, drop_if: 0, ignored: 0, processed: 1, debt: 0
received: 5, drop: 0, drop_if: 0, ignored: 0, processed: 5, debt: 0
received: 8, drop: 0, drop_if: 0, ignored: 0, processed: 7, debt: 1
received: 14, drop: 0, drop_if: 0, ignored: 0, processed: 13, debt: 1
received: 20, drop: 0, drop_if: 0, ignored: 0, processed: 19, debt: 1
received: 22, drop: 0, drop_if: 0, ignored: 0, processed: 21, debt: 1
received: 24, drop: 0, drop_if: 0, ignored: 0, processed: 23, debt: 1
received: 26, drop: 0, drop_if: 0, ignored: 0, processed: 25, debt: 1
received: 33, drop: 0, drop_if: 0, ignored: 0, processed: 31, debt: 2
received: 38, drop: 0, drop_if: 0, ignored: 0, processed: 37, debt: 1
...

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules
  • I have checked my code and corrected any misspellings

Add 2 performance stats in libpcap to NFStreamer, and warp them in a new
class `PerformanceStats`
@EvanLai88
Copy link
Author

EvanLai88 commented Mar 9, 2023

Currently, my implementation stored each performance stat in a dictionary.

Edit: just noticed that meter_workflow() updates perf every second.

After a little bit of testing, if using a class instead of dictionary, it can be easier for people to implement custom stats (hint from class member vs dictionary key) with very little performance impact.

btw, are there better names for class PerformanceStats, and class Perf ?

class
class Perf:
    def __init__(self, context) -> None:
        self._received = context.Value("I", 0)
        self._dropped = context.Value("I", 0)
        self._dropped_if = context.Value("I", 0)
        self._processed = context.Value("I", 0)
        self._ignored = context.Value("I", 0)

    @property
    def received(self):
        return self._received.value

    @received.setter
    def received(self, value):
        self._received.value = value

    @property
    def dropped(self):
        return self._dropped.value

    @dropped.setter
    def dropped(self, value):
        self._dropped.value = value

    @property
    def dropped_if(self):
        return self._dropped_if.value

    @dropped_if.setter
    def dropped_if(self, value):
        self._dropped_if.value = value

    @property
    def processed(self):
        return self._processed.value

    @processed.setter
    def processed(self, value):
        self._processed.value = value

    @property
    def ignored(self):
        return self._ignored.value

    @ignored.setter
    def ignored(self, value):
        self._ignored.value = value


class PerformanceStats:
    def __init__(self, n_meters, context, is_linux, flows_count) -> None:
        self.is_linux = is_linux
        self.flows_count = flows_count
        self.perfs = [Perf(context) for _ in range(n_meters)]

    def __getitem__(self, idx) -> Perf:
        return self.perfs[idx]
test code
from time import time_ns
from multiprocessing import get_context
from nfstream.utils import RepeatedTimer


class Perf:
    def __init__(self, context) -> None:
        self._received = context.Value("I", 0)
        self._dropped = context.Value("I", 0)
        self._dropped_if = context.Value("I", 0)
        self._processed = context.Value("I", 0)
        self._ignored = context.Value("I", 0)

    @property
    def received(self):
        return self._received.value

    @received.setter
    def received(self, value):
        self._received.value = value

    @property
    def dropped(self):
        return self._dropped.value

    @dropped.setter
    def dropped(self, value):
        self._dropped.value = value

    @property
    def dropped_if(self):
        return self._dropped_if.value

    @dropped_if.setter
    def dropped_if(self, value):
        self._dropped_if.value = value

    @property
    def processed(self):
        return self._processed.value

    @processed.setter
    def processed(self, value):
        self._processed.value = value

    @property
    def ignored(self):
        return self._ignored.value

    @ignored.setter
    def ignored(self, value):
        self._ignored.value = value


class PerformanceStatsWithClass:
    def __init__(self, n_meters, context, is_linux, flows_count) -> None:
        self.is_linux = is_linux
        self.flows_count = flows_count
        self.perfs = [Perf(context) for _ in range(n_meters)]

    def __getitem__(self, idx) -> Perf:
        return self.perfs[idx]


class PerformanceStatsWithDict:
    def __init__(self, n_meters, context, is_linux, flows_count) -> None:
        self.is_linux = is_linux
        self.flows_count = flows_count
        self.performances = []
        for _ in range(n_meters):
            self.performances.append(
                {
                    "received": context.Value("I", 0),
                    "dropped": context.Value("I", 0),
                    "dropped_if": context.Value("I", 0),
                    "processed": context.Value("I", 0),
                    "ignored": context.Value("I", 0),
                }
            )

    def __getitem__(self, idx) -> dict:
        return self.performances[idx]


def update_perf(perf: Perf | dict):
    if isinstance(perf, dict):
        for i in range(1_000_000):
            print(i, end="\r")
            perf["received"].value += 1
            perf["dropped"].value += 2
            perf["dropped_if"].value += 3
            perf["processed"].value += 4
            perf["ignored"].value += 5
        print(
            perf["received"].value,
            perf["dropped"].value,
            perf["dropped_if"].value,
            perf["processed"].value,
            perf["ignored"].value,
        )
    elif isinstance(perf, Perf):
        for i in range(1_000_000):
            print(i, end="\r")
            perf.received += 1
            perf.dropped += 2
            perf.dropped_if += 3
            perf.processed += 4
            perf.ignored += 5
        print(
            perf.received,
            perf.dropped,
            perf.dropped_if,
            perf.processed,
            perf.ignored,
        )


def watch(perfs: list[Perf | dict]):
    if isinstance(perfs[0], dict):
        for p in perfs:
            a = p["received"].value
            a = p["dropped"].value
            a = p["dropped_if"].value
            a = p["processed"].value
            a = p["ignored"].value
    elif isinstance(perfs[0], Perf):
        for p in perfs:
            a = p.received
            a = p.dropped
            a = p.dropped_if
            a = p.processed
            a = p.ignored


def time_func(func):
    def wapper(n_meters):
        start = time_ns()
        func(n_meters)
        print(f"{(time_ns()-start)/(10**9)} seconds")

    return wapper


@time_func
def test_class(n_meters):
    print("class")
    ctx = get_context()
    meters = []
    p = PerformanceStatsWithClass(n_meters, ctx, True, 0)
    for i in range(n_meters):
        meters.append(ctx.Process(target=update_perf, args=(p[i],)))
    rt = RepeatedTimer(0.001, watch, p)
    for meter in meters:
        meter.start()

    for i in range(n_meters):
        if meters[i].is_alive():
            meters[i].join()
    rt.stop()


@time_func
def test_dict(n_meters):
    print("dict")
    ctx = get_context()
    meters = []
    p = PerformanceStatsWithDict(n_meters, ctx, True, 0)
    for i in range(n_meters):
        meters.append(ctx.Process(target=update_perf, args=(p[i],)))

    rt = RepeatedTimer(0.001, watch, p)

    for meter in meters:
        meter.start()

    for i in range(n_meters):
        if meters[i].is_alive():
            meters[i].join()
    rt.stop()


if __name__ == "__main__":
    test_class(4)
    test_dict(4)

writing to context.Value 1,000,000 time while reading from it every 0.01 seconds

class
1000000 2000000 3000000 4000000 5000000
1000000 2000000 3000000 4000000 5000000
1000000 2000000 3000000 4000000 5000000
1000000 2000000 3000000 4000000 5000000
11.975047249 seconds
dict
1000000 2000000 3000000 4000000 5000000
1000000 2000000 3000000 4000000 5000000
1000000 2000000 3000000 4000000 5000000
1000000 2000000 3000000 4000000 5000000
11.891510533 seconds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant