Skip to content

zavitax/redis-replica-manager-go

Repository files navigation

redis-replica-manager

Group membership, sharding, replication and request routing manager relying on Redis for coordination.

TL;DR

This library allows building distributed applications where partitioning of work is important, replication is desired and the cost of moving a work partition from site to site is high.

It achieves it's goal by partitioning work into slots and assigning each site (cluster node) a set of slots to be responsible for.

A slot can be assigned to more than one site (replication scenario), yet only one site will be designated a primary for each slot.

The differentiation between a primary and a secondary site role for a slot is determined by the application.

Assignment of slots to sites is determined by Rendezvous hashing (see below for more details).

Guarantees

High Availability

The library allows replication of slots to more than one site.

A site is not allowed to relinquish responsibility of a slot before the slot has been migrated to enough replicas. The only exception to this rule is when a site is determined to be faulting.

Minimum Partition Migration

Removal of a site from the cluster, results in redistribution of the slots which that site was responsible for among other sites.

When a site is added to the cluster, slots which this site is now responsible for will move from other sites to the newly available site.

Application Control

Migration of slots between sites is requested by the library, and executed by the application.

No assumptions are made about whether migration has completed or not by the library.

The application determines when migration has completed, and notifies the library of that fact.

Faulty Sites Detection

A heartbeat and a watchdog timer guarantee that faulting sites are removed from the cluster.

Faulty Slot Handler Detection

If the application detects that a handler for a slot has failed, it should signal the cluster of that fact by calling LocalSiteManager.RemoveFailedSlot(context.Background(), slotId). This will immediately and unconditionally remove the slot from the local site, trigger failover to one of the secondary replicas, and eventually trigger a retry to add the slot to the local site.

Dynamic Routing

The library maintains a routing table that maps slots to a list of sites which is used to route requests for slots to the right site.

Rendezvous hashing

Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve distributed agreement on a set of k options out of a possible set of n options. A typical application is when clients need to agree on which sites (or proxies) objects are assigned to.

Rendezvous hashing is both much simpler and more general than consistent hashing, which becomes a special case (for k=1) of rendezvous hashing.

More information: https://en.wikipedia.org/wiki/Rendezvous_hashing

Example

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/rs/zerolog"
	redisReplicaManager "github.com/zavitax/redis-replica-manager-go"
)

var redisOptions = &redis.Options{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       0,
}

func createReplicaManagerOptions(
	testId string,
	siteId string,
) *redisReplicaManager.ReplicaManagerOptions {
	result := &redisReplicaManager.ReplicaManagerOptions{
		RedisOptions:   redisOptions,
		SiteTimeout:    time.Second * 5,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-replica-manager}::%v", testId),
		SiteID:         siteId,
	}

	return result
}

func createReplicaManagerClient(options *redisReplicaManager.ReplicaManagerOptions) (redisReplicaManager.ReplicaManagerClient, error) {
	return redisReplicaManager.NewRedisReplicaManagerClient(context.TODO(), options)
}

func main() {
	ctx := context.Background()

	options1 := createReplicaManagerOptions("main", "site1")

	client1, _ := createReplicaManagerClient(options1)

	balancerOptions := &redisReplicaManager.ReplicaBalancerOptions{
		TotalSlotsCount:   512,
		SlotReplicaCount:  1,
		MinimumSitesCount: 1,
	}

	balancer1, _ := redisReplicaManager.NewReplicaBalancer(ctx, balancerOptions)

	manager1, _ := redisReplicaManager.NewLocalSiteManager(ctx, &redisReplicaManager.ClusterNodeManagerOptions{
		ReplicaManagerClient: client1,
		ReplicaBalancer:      balancer1,
		RefreshInterval:      time.Second * 15,
		NotifyMissingSlotsHandler: func(ctx context.Context, manager redisReplicaManager.LocalSiteManager, slots *[]uint32) error {
			fmt.Printf("m1: missing slots to be added to local site: %v\n", len(*slots))

			for _, slotId := range *slots {
				// Perform necessary operations to be able to fully serve requests for `slotId`
			
				// Notify the cluster that the slot was added to the local site.
				// This should happen only when the slot is completely ready to be served.
				// Calling `manager.RequestAddSlot()` tells the site manager "I am now ready to serve all requests for `slotId`".
				manager.RequestAddSlot(ctx, slotId)
			}

			return nil
		},
		NotifyRedundantSlotsHandler: func(ctx context.Context, manager redisReplicaManager.LocalSiteManager, slots *[]uint32) error {
			fmt.Printf("m1: redundant slots to be removed from local site: %v\n", len(*slots))

			for _, slotId := range *slots {
				// Ask the cluster manager if we are allowed to remove a redundant slot
				// (if it satisfies minimum replica count on other sites)
				if allowed, _ := manager.RequestRemoveSlot(ctx, slotId); allowed {
					// Slot has been approved for removal by the cluster (and has been removed from the routing table)

					// Only after the cluster approved our request to remove the slot,
					// we can release resources which were allocated to serve requests for `slotId`
					fmt.Printf("m1: allowed to remove slot from local site: %v\n", allowed)
				}
			}

			return nil
		},
		NotifyPrimarySlotsChangedHandler: func(ctx context.Context, manager redisReplicaManager.LocalSiteManager) error {
			slots, _ := manager.GetAllSlotsLocalNodeIsPrimaryFor(ctx)

			fmt.Printf("m1: primary slots changed: %v\n", len(*slots))

			return nil
		},
	})

	slots1, _ := manager1.GetSlotIdentifiers(ctx)

	fmt.Printf("manager1 slots count: %v\n", len(*slots1))

	fmt.Printf("m1: sites for slot 1: %v\n", manager1.GetSlotRouteTable(ctx, 1))
	fmt.Printf("m1: sites for slot 497: %v\n", manager1.GetSlotRouteTable(ctx, 497))

	fmt.Printf("m1: primary site for slot 1: %v\n", manager1.GetSlotPrimarySiteRoute(ctx, 1))
	fmt.Printf("m1: primary site for slot 497: %v\n", manager1.GetSlotPrimarySiteRoute(ctx, 497))

	fmt.Printf("m1: slot for object abcdefg: %v\n", manager1.GetSlotForObject("abcdefg"))

	manager1.Close()
}