Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Watch Live View . #123

Open
mbtolou opened this issue Feb 2, 2022 · 3 comments
Open

Support Watch Live View . #123

mbtolou opened this issue Feb 2, 2022 · 3 comments
Assignees
Labels
enhancement New feature or request

Comments

@mbtolou
Copy link

mbtolou commented Feb 2, 2022

Hi.

I implement code that open http port and monitor watch live view changes.
can you add this code to your library ,and return result in object ?

public class LiveViewWatcher : IDisposable
{
	private readonly string _downloadUrl;
	private readonly string _watchName;
	private HttpClient? _httpClient = null;

	public delegate void EvChangeLiveViewResult(string watchName, string data);

	public event EvChangeLiveViewResult OnChangeLiveViewResult = delegate { };

	public LiveViewWatcher(string ip_address, string user, string pwd, string watchName, int port = 8123, string? sessionId = null)
	{
		sessionId ??= Guid.NewGuid().ToString();
		_downloadUrl = $"http://{ip_address}:8123/?user={user}&password={pwd}&session_id={sessionId}&allow_experimental_live_view=1&query=WATCH%20{watchName}%20FORMAT%20JSONEachRow";
		_watchName = watchName;
	}

	public async Task StartWatch()
	{
		_httpClient = new HttpClient { Timeout = TimeSpan.FromDays(1) };

		using (var response = await _httpClient.GetAsync(_downloadUrl, HttpCompletionOption.ResponseHeadersRead))
			await DownloadFileFromHttpResponseMessage(response);
	}

	private async Task DownloadFileFromHttpResponseMessage(HttpResponseMessage response)
	{

		using (var contentStream = await response.Content.ReadAsStreamAsync())
			await ProcessContentStream(contentStream);
	}

	private async Task ProcessContentStream(Stream contentStream)
	{
		var totalBytesRead = 0L;
		var buffer = new byte[1024];

		var memResponse = new MemoryStream();
		do
		{
			var bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length);
			if (bytesRead == 0)
			{
				System.Threading.Thread.Sleep(2000);
				continue;
			}

			await memResponse.WriteAsync(buffer, 0, bytesRead);
			totalBytesRead += bytesRead;

			if (bytesRead < buffer.Length && buffer[bytesRead - 1] == 10)
			{
				OnChangeLiveViewResult(_watchName, System.Text.Encoding.UTF8.GetString(memResponse.ToArray()));
				memResponse = new MemoryStream();
				totalBytesRead = 0;
				buffer = new byte[1024];
				continue;
			}

		}
		while (true);
	}

	public void Dispose()
	{
		_httpClient?.Dispose();
	}
}
var downloadMem = new LiveViewWatcher("192.168.5.10", "default", "alireza", "test.lv");
downloadMem.OnChangeLiveViewResult += TestFunc;
await downloadMem.StartWatch();

Console.ReadKey();

void TestFunc(string watchName, string stringResults)
{
	Console.WriteLine(stringResults);
}

output result is :

{"sum(a)":"1008","_version":"1"}

{"sum(a)":"1014","_version":"2"}

{"sum(a)":"1020","_version":"3"}

{"sum(a)":"1026","_version":"4"}

{"sum(a)":"1032","_version":"5"}

@mbtolou
Copy link
Author

mbtolou commented Feb 2, 2022

SET allow_experimental_live_view = 1;

DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;

CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;

WATCH test.lv LIMIT 0;

INSERT INTO test.mt VALUES (1),(2),(3);


WATCH test.lv LIMIT 0;

@DarkWanderer
Copy link
Owner

Hi,

Thanks for contribution, will definitely look into this

@DarkWanderer DarkWanderer self-assigned this Feb 3, 2022
@DarkWanderer DarkWanderer added the enhancement New feature or request label Feb 3, 2022
@mbtolou
Copy link
Author

mbtolou commented Jun 25, 2022

I have been using this code for some time and it works for me without any problems.
One of the features of this code is that it continues to work if the connection is disconnected and connected, and one of its advantages is that it is written with a socket.
Another feature is keeping the connection alive. Because of the interval.

@DarkWanderer

using System.IO;
using System.Net.Sockets;
using System.Text;
using SpanJson;


public class LiveViewWatcher : IDisposable
{

  public static LiveViewWatcher Watch(string watchName, string ip_address = null, string user = null, string pwd = null, int port = 8123, string sessionId = null, bool only_event = false) =>
   new LiveViewWatcher(watchName, ip_address, user, pwd, 8123, sessionId, only_event);

  public LiveViewWatcher(string watchName, string ip_address = null, string user = null, string pwd = null, int? port = null, string sessionId = null, bool only_event = false)
  {
    ip_address ??= MachineConstants.CLICKHOUSE_SERVER.IP;
    user ??= MachineConstants.CLICKHOUSE_SERVER.USER;
    pwd ??= MachineConstants.CLICKHOUSE_SERVER.PASSWORD;
    port ??= Convert.ToInt32(MachineConstants.CLICKHOUSE_SERVER.PORT);

    SessionId = sessionId ??= Guid.NewGuid().ToString();
    ServerIpAddres = ip_address;
    ServerPort = port.Value;
    Username = user;
    Password = pwd;

    WatchName = watchName;
    OnlyEvents = only_event;

  }

  private readonly string WatchName;
  private readonly bool OnlyEvents;

  public int KeepAliveInterval { get; private set; } = 240;
  public delegate void EvLiveViewResult(LiveViewWatcher watchObj, List<Dictionary<string, string>> data);
  public delegate void EvLiveViewJsonResult(LiveViewWatcher watchObj, string data);
  public event EvLiveViewResult OnLiveViewResult = null;
  public event EvLiveViewJsonResult OnLiveViewJsonResult = null;


  private void ProcessDataBuffer(byte[] bytes)
  {
    var json_data = "";
    var all_content = "";
    StringBuilder str_builder = new StringBuilder();

    try
    {
      all_content = bytes.ToUTF8();

      var splited_chunk = all_content.Split("\r\n", StringSplitOptions.RemoveEmptyEntries).Where(c => c.Length > 6).ToArray();

      foreach (var ch_line in splited_chunk)
        str_builder.Append(ch_line);

      all_content = str_builder.ToString();
      str_builder = new StringBuilder();

      var lines = all_content.Split("\n", StringSplitOptions.RemoveEmptyEntries).Where(c => c.Length > 6).ToArray();

      str_builder.Append("[");

      foreach (var line in lines)
      {

        if (line[0] != '{' || line.StartsWith("{\"progress\":"))
          continue;

        str_builder.Append(line.Substring(7, line.Length - 8));
        str_builder.Append(",");
      }

      str_builder = str_builder.Remove(str_builder.Length - 1, 1);
      str_builder.Append("]");

      json_data = str_builder.ToString();
      if (json_data.Length < 2)
        return;

      Task.Run(() =>
      {
        OnLiveViewJsonResult?.Invoke(this, json_data);
        OnLiveViewResult?.Invoke(this, JsonSerializer.Generic.Utf16.Deserialize<List<Dictionary<string, string>>>(json_data));
      });

    }
    catch (Exception expMsg)
    {
      Console.WriteLine(expMsg.StackTrace);
      // System.IO.File.WriteAllText($"live_data_j_{DateTime.Now.ToFileTime()}.txt", json_data);
      // System.IO.File.WriteAllText($"live_data_c_{DateTime.Now.ToFileTime()}.txt", all_content);
    }
  }


  #region Scoket Methods

  public string ServerIpAddres { get; }
  public int ServerPort { get; } = 8123;
  public string Password { get; }
  public string Username { get; }
  public string SessionId { get; set; }
  public bool IsDisposed { get; private set; } = false;

  public TcpClient socket;


  public async Task StartWatch()
  {
    try
    {
      SessionId = Guid.NewGuid().ToString();
      string url = $"http://{ServerIpAddres}:{ServerPort}/?user={Username}&password={Password}&session_id={SessionId}&allow_experimental_live_view=1&live_view_heartbeat_interval=120&query=WATCH%20{WatchName}{(OnlyEvents ? "%20EVENTS" : "")}%20FORMAT%20JSONEachRowWithProgress";
      var request = $"GET {url} HTTP/1.1\r\nHost: {ServerIpAddres}\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\nKeep-Alive: timeout=3600, max=100\r\n\r\n";

      socket = new TcpClient(ServerIpAddres, ServerPort);
      socket.ReceiveBufferSize = 4096 * 2;
      socket.SendBufferSize = 4096 * 2;
      socket.ReceiveTimeout = 30000;
      socket.SendTimeout = 30000;
      using var stream = socket.GetStream();

      await stream.WriteAsync(request.ToBytes());
      await stream.FlushAsync();

      int byte_read = 0;
      MemoryStream LiveViewBuffer = new MemoryStream();
      DateTime lastMessageTime = DateTime.Now;
      do
      {
        byte[] bytes = new byte[1024 * 32];
        byte_read = await stream.ReadAsync(bytes, 0, bytes.Length);

        if (byte_read <= 0)
        {
          if ((DateTime.Now - lastMessageTime).TotalSeconds > KeepAliveInterval)
            throw new Exception($"[{WatchName}] [{DateTime.Now.ToMiniStr()}] [{lastMessageTime.ToMiniStr()}] http session is killed!");

          await stream.WriteAsync(new byte[0]);
          System.Threading.Thread.Sleep(2000);
          continue;
        }

        lastMessageTime = DateTime.Now;

        LiveViewBuffer.Write(bytes, 0, byte_read);

        if (byte_read >= 3 && bytes[byte_read - 3] == 10 && bytes[byte_read - 2] == 13 && bytes[byte_read - 1] == 10)
        {
          LiveViewBuffer.Flush();
          ProcessDataBuffer(LiveViewBuffer.ToArray());
          LiveViewBuffer = new MemoryStream();
          continue;
        }
      } while (true);

    }
    catch (Exception expMsg)
    {
      Console.WriteLine(expMsg.Message);
      if (!IsDisposed)
      {
        ClearSocket();
        System.Threading.Thread.Sleep(20000);
        _ = StartWatch();
      }
    }
  }


  private void ClearSocket()
  {
    if (socket != null)
    {
      try
      {
        if (socket.Connected)
          socket?.Close();

        socket?.Dispose();
      }
      catch
      {
        socket = null;
      }
    }
  }


  #endregion

  public void Dispose() => IsDisposed = true;

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants