Skip to content

Commit

Permalink
Merge branch 'release/0.54.0' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Jul 6, 2023
2 parents 91dcafa + 68b19a0 commit a478a38
Show file tree
Hide file tree
Showing 193 changed files with 11,974 additions and 8,156 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Expand Up @@ -3,7 +3,7 @@
We love every form of contribution. By participating in this project, you
agree to abide to the `Airy` [code of conduct](/code_of_conduct.md).

Please read our [contributing guide](/docs/docs/guides/contributing.md) to
Please read our [contributing guide](/docs/docs/guides/contributing-to-airy.md) to
learn how to develop with the `Airy Core Platform` and what conventions we
follow.

Expand Down
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -69,7 +69,7 @@ real-time and historical data to wherever you need it.

Airy Core comes with all the components you need to stream historical and real-time data.

- 💬 Pre-built and easily configurable [connectors](https://airy.co/docs/core/sources/introduction)
- 💬 Pre-built and easily configurable [connectors](https://airy.co/docs/core/connectors/sources/introduction)

By ingesting all real-time events and continuously processing, aggregating and joining them in the stream, development time can be significantly reduced. Through integrations with pre-built and easily configured connectors, events are consumed from any source, including business systems such as ERP/CRM, conversational sources, third party APIs. Airy also comes with an SDK to build custom connectors to any source.

Expand All @@ -88,7 +88,7 @@ clients to receive near real-time updates about data flowing through the system.
A webhook integration server that allows its users to create actionable workflows (the webhook integration
exposes events users can "listen" to and react programmatically.)

- 💎[UI: From a control center to dashboards](https://airy.co/docs/core/apps/ui/introduction)
- 💎[UI](https://airy.co/docs/core/ui/overview) to access the data and the control center through a browser

No-code interfaces to manage and control Airy, your connectors and your streams.

Expand All @@ -97,7 +97,7 @@ No-code interfaces to manage and control Airy, your connectors and your streams.
We welcome (and love) every form of contribution! Good entry points to the
project are:

- Our [contributing guide](/docs/docs/guides/contributing.md)
- Our [contributing guide](/docs/docs/guides/contributing-to-airy.md)
- Issues with the tag
[gardening](https://github.com/airyhq/airy/issues?q=is%3Aissue+is%3Aopen+label%3Agardening)
- Issues with the tag [good first
Expand Down
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.53.0
0.54.0
1 change: 1 addition & 0 deletions backend/components/admin/BUILD
Expand Up @@ -25,6 +25,7 @@ app_deps = [
"//lib/java/spring/kafka/core:spring-kafka-core",
"//lib/java/spring/kafka/streams:spring-kafka-streams",
"//lib/java/tracking:tracking",
"@maven//:org_springframework_retry_spring_retry",
]

springboot(
Expand Down
@@ -0,0 +1,70 @@
package co.airy.core.admin;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.retry.annotation.Retryable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

import javax.servlet.http.HttpServletRequest;
import java.net.URI;
import java.util.Enumeration;

@RestController
public class RegistryProxy {
private final String upstreamHost;
private final int upstreamPort;
private final RestTemplate restTemplate;

public RegistryProxy(@Value("${KAFKA_REST_UPSTREAM_HOST}") String upstreamHost,
@Value("${KAFKA_REST_UPSTREAM_PORT:80}") int upstreamPort, RestTemplate restTemplate) {
this.upstreamHost = upstreamHost;
this.upstreamPort = upstreamPort;
this.restTemplate = restTemplate;
}


@RequestMapping("/kafka/**")
public ResponseEntity<?> proxyRequest(@RequestBody(required = false) String body, HttpMethod method, HttpServletRequest request) {
return executeRequest(body, method, request);
}

@Retryable(exclude = {
HttpStatusCodeException.class}, include = Exception.class, maxAttempts = 3)
public ResponseEntity<?> executeRequest(String body,
HttpMethod method, HttpServletRequest request) {
String requestUrl = request.getRequestURI();

URI uri = UriComponentsBuilder.newInstance()
.scheme("http")
.host(upstreamHost)
.port(upstreamPort)
.path(requestUrl.replace("/kafka", ""))
.query(request.getQueryString())
.build(true).toUri();

HttpHeaders headers = new HttpHeaders();
Enumeration<String> headerNames = request.getHeaderNames();

while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
headers.set(headerName, request.getHeader(headerName));
}

HttpEntity<String> httpEntity = new HttpEntity<>(body, headers);
try {
return restTemplate.exchange(uri, method, httpEntity, String.class);
} catch (HttpStatusCodeException e) {
return ResponseEntity.status(e.getRawStatusCode())
.headers(e.getResponseHeaders())
.body(e.getResponseBodyAsString());
}
}
}
2 changes: 2 additions & 0 deletions backend/components/admin/src/test/resources/test.properties
Expand Up @@ -2,3 +2,5 @@ kafka.cleanup=true
kafka.commit-interval-ms=100
kubernetes.namespace=default
kubernetes.app=api-admin
KAFKA_REST_UPSTREAM_HOST=schema-registry
KAFKA_REST_UPSTREAM_PORT=8082
54 changes: 54 additions & 0 deletions backend/components/streams/BUILD
@@ -0,0 +1,54 @@
load("@com_github_airyhq_bazel_tools//lint:buildifier.bzl", "check_pkg")

# gazelle:prefix github.com/airyhq/airy/backend/components/streams
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
load("@io_bazel_rules_docker//go:image.bzl", "go_image")
load("//tools/build:container_release.bzl", "container_release")

go_library(
name = "streams_lib",
srcs = [
"auth.go",
"cors.go",
"main.go",
"streams_create.go",
"streams_delete.go",
"streams_info.go",
"streams_list.go",
],
importpath = "github.com/airyhq/airy/backend/components/streams",
visibility = ["//visibility:private"],
deps = [
"//lib/go/httpclient",
"//lib/go/payloads",
"@com_github_golang_jwt_jwt//:jwt",
"@com_github_gorilla_mux//:mux",
"@io_k8s_klog//:klog",
],
)

go_binary(
name = "streams",
out = "streams",
embed = [":streams_lib"],
visibility = ["//visibility:public"],
)

genrule(
name = "streams_bin_rule",
srcs = [":streams"],
outs = ["streams_bin"],
cmd = "cp $(SRCS) $@",
)

go_image(
name = "image",
embed = [":streams_lib"],
)

container_release(
registry = "ghcr.io/airyhq/api/components",
repository = "streams",
)

check_pkg(name = "buildifier")
108 changes: 108 additions & 0 deletions backend/components/streams/auth.go
@@ -0,0 +1,108 @@
package main

import (
"context"
"encoding/base64"
"log"
"net/http"
"regexp"
"strings"

"github.com/golang-jwt/jwt"
"k8s.io/klog"
)

type EnableAuthMiddleware struct {
pattern *regexp.Regexp
}

// MustNewAuthMiddleware Only paths that match the regexp pattern will be authenticated
func MustNewAuthMiddleware(pattern string) EnableAuthMiddleware {
r, err := regexp.Compile(pattern)
if err != nil {
log.Fatal(err)
}
return EnableAuthMiddleware{pattern: r}
}

func (a EnableAuthMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if !a.pattern.MatchString(r.URL.Path) {
next.ServeHTTP(w, r)
return
}

// Auth middlewares attach a flag to the context indicating that authentication was successful
if val, ok := ctx.Value("auth").(bool); ok && val {
next.ServeHTTP(w, r)
} else {
http.Error(w, "Forbidden", http.StatusForbidden)
}
})
}

type SystemTokenMiddleware struct {
systemToken string
}

func NewSystemTokenMiddleware(systemToken string) SystemTokenMiddleware {
return SystemTokenMiddleware{systemToken: systemToken}
}

func (s SystemTokenMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authPayload := r.Header.Get("Authorization")
authPayload = strings.TrimPrefix(authPayload, "Bearer ")

if authPayload == s.systemToken {
ctx := context.WithValue(r.Context(), "auth", true)
next.ServeHTTP(w, r.WithContext(ctx))
return
}
next.ServeHTTP(w, r)
})
}

type JwtMiddleware struct {
jwtSecret []byte
}

func NewJwtMiddleware(jwtSecret string) JwtMiddleware {
data, err := base64.StdEncoding.DecodeString(jwtSecret)
if err != nil {
klog.Fatal("failed to base64 decode jwt secret: ", err)
}

return JwtMiddleware{jwtSecret: data}
}

func (j JwtMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authPayload := r.Header.Get("Authorization")
authPayload = strings.TrimPrefix(authPayload, "Bearer ")
if authPayload == "" {
authPayload = getAuthCookie(r)
}

token, err := jwt.Parse(authPayload, func(token *jwt.Token) (interface{}, error) {
return j.jwtSecret, nil
})

if err != nil || !token.Valid {
next.ServeHTTP(w, r)
return
}

ctx := context.WithValue(r.Context(), "auth", true)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

func getAuthCookie(r *http.Request) string {
cookie, err := r.Cookie("airy_auth_token")
if err != nil {
return ""
}
return cookie.Value
}
49 changes: 49 additions & 0 deletions backend/components/streams/cors.go
@@ -0,0 +1,49 @@
package main

import (
"net/http"
"strings"
)

type CORS struct {
allowedOrigins map[string]struct{}
}

func NewCORSMiddleware(allowedOrigins string) CORS {
cors := CORS{allowedOrigins: make(map[string]struct{})}

for _, origin := range strings.Split(allowedOrigins, ",") {
cors.allowedOrigins[origin] = struct{}{}
}

return cors
}

func (c *CORS) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

origin := r.Header.Get("Origin")
_, allowed := c.allowedOrigins[origin]

if !allowed && r.Method == "OPTIONS" {
w.WriteHeader(http.StatusForbidden)
return
}

if allowed {
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Methods", "POST, GET")
w.Header().Set(
"Access-Control-Allow-Headers",
"Accept, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, X-Requested-With, X-XSRF-Token",
)
}

if r.Method == "OPTIONS" {
return
}

next.ServeHTTP(w, r)
})
}
3 changes: 3 additions & 0 deletions backend/components/streams/helm/BUILD
@@ -0,0 +1,3 @@
load("//tools/build:helm.bzl", "helm_ruleset_core_version")

helm_ruleset_core_version()
5 changes: 5 additions & 0 deletions backend/components/streams/helm/Chart.yaml
@@ -0,0 +1,5 @@
apiVersion: v2
appVersion: "1.0"
description: A Helm chart for the streams backend
name: api-streams
version: 0.1
10 changes: 10 additions & 0 deletions backend/components/streams/helm/templates/configmap.yaml
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: "{{ .Values.name }}"
labels:
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.name }}"
annotations:
core.airy.co/enabled: "{{ .Values.enabled }}"

0 comments on commit a478a38

Please sign in to comment.