Skip to content

Commit

Permalink
Cloud Storage Pathing (Azure Blob Storage) (#947)
Browse files Browse the repository at this point in the history
Co-authored-by: ahmadnazeri <42309296+ahmadnazeri@users.noreply.github.com>
  • Loading branch information
2 people authored and sdreyer committed Aug 17, 2023
1 parent 9b30842 commit a28ff65
Show file tree
Hide file tree
Showing 13 changed files with 519 additions and 77 deletions.
4 changes: 3 additions & 1 deletion client/src/featureform/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def dataframe(
source: Union[SourceRegistrar, LocalSource, SubscriptableTransformation, str],
variant: Union[str, None] = None,
limit=NO_RECORD_LIMIT,
asynchronous=False,
):
"""
Compute a dataframe from a registered source or transformation
Expand All @@ -62,6 +63,7 @@ def dataframe(
source (Union[SourceRegistrar, LocalSource, SubscriptableTransformation, str]): The source or transformation to compute the dataframe from
variant (str): The source variant; defaults to a Docker-style random name and is ignored if source argument is not a string
limit (int): The maximum number of records to return; defaults to NO_RECORD_LIMIT
asynchronous (bool): @param asynchronous: Flag to determine whether the client should wait for resources to be in either a READY or FAILED state before returning. Defaults to False to ensure that newly registered resources are in a READY state prior to serving them as dataframes.
**Example:**
```py title="definitions.py"
Expand All @@ -70,7 +72,7 @@ def dataframe(
avg_user_transaction_df = transactions_df.groupby("CustomerID")["TransactionAmount"].mean()
```
"""
self.apply()
self.apply(asynchronous=asynchronous)
if isinstance(
source, (SourceRegistrar, LocalSource, SubscriptableTransformation)
):
Expand Down
2 changes: 1 addition & 1 deletion client/src/featureform/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -3835,7 +3835,7 @@ def apply(self, asynchronous=True):
"""
Apply all definitions, creating and retrieving all specified resources.
@param asynchronous: Wait for all resources to be ready before returning.
@param asynchronous: Flag to determine whether the client should wait for resources to be in either a READY or FAILED state before returning. Defaults to True to avoid blocking the client.
"""

print(f"Applying Run: {get_run()}")
Expand Down
19 changes: 16 additions & 3 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,9 @@ func (c *Coordinator) runDFTransformationJob(transformSource *metadata.SourceVar
return fmt.Errorf("map name: %v sources: %v", err, sources)
}

sourceMapping := []provider.SourceMapping{}
for nameVariantClient, transformationTableName := range sourceMap {
sourceMapping = append(sourceMapping, provider.SourceMapping{Template: nameVariantClient, Source: transformationTableName})
sourceMapping, err := getOrderedSourceMappings(sources, sourceMap)
if err != nil {
return fmt.Errorf("failed to get ordered source mappings due to %v", err)
}

c.Logger.Debugw("Created transformation query")
Expand All @@ -556,6 +556,19 @@ func (c *Coordinator) runDFTransformationJob(transformSource *metadata.SourceVar
return nil
}

func getOrderedSourceMappings(sources []metadata.NameVariant, sourceMap map[string]string) ([]provider.SourceMapping, error) {
sourceMapping := make([]provider.SourceMapping, len(sources))
for i, nv := range sources {
sourceKey := nv.ClientString()
tableName, hasKey := sourceMap[sourceKey]
if !hasKey {
return nil, fmt.Errorf("key %s not in source map", sourceKey)
}
sourceMapping[i] = provider.SourceMapping{Template: sourceKey, Source: tableName}
}
return sourceMapping, nil
}

func (c *Coordinator) runPrimaryTableJob(transformSource *metadata.SourceVariant, resID metadata.ResourceID, offlineStore provider.OfflineStore, schedule string) error {
c.Logger.Info("Running primary table job on resource: ", resID)
providerResourceID := provider.ResourceID{Name: resID.Name, Variant: resID.Variant, Type: provider.Primary}
Expand Down
96 changes: 96 additions & 0 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,3 +1806,99 @@ func TestGetSourceMappingError(t *testing.T) {
t.Fatalf("getSourceMapping did not catch error: templateString {%v} and wrongReplacement {%v}", templateString, wrongReplacements)
}
}

func TestGetOrderedSourceMappings(t *testing.T) {
type testCase struct {
name string
sources []metadata.NameVariant
sourceMap map[string]string
expectedSourceMap []provider.SourceMapping
expectError bool
}

testCases := []testCase{
{
name: "test ordered source mappings",
sources: []metadata.NameVariant{
{Name: "name1", Variant: "variant1"},
{Name: "name2", Variant: "variant2"},
},
sourceMap: map[string]string{
"name1.variant1": "tableA",
"name2.variant2": "tableB",
},
expectedSourceMap: []provider.SourceMapping{
{
Template: "name1.variant1",
Source: "tableA",
},
{
Template: "name2.variant2",
Source: "tableB",
},
},
expectError: false,
},
{
name: "test unordered source mappings",
sources: []metadata.NameVariant{
{Name: "name1", Variant: "variant1"},
{Name: "name2", Variant: "variant2"},
{Name: "name3", Variant: "variant3"},
{Name: "name4", Variant: "variant4"},
},
sourceMap: map[string]string{
"name2.variant2": "tableB",
"name4.variant4": "tableD",
"name1.variant1": "tableA",
"name3.variant3": "tableC",
},
expectedSourceMap: []provider.SourceMapping{
{
Template: "name1.variant1",
Source: "tableA",
},
{
Template: "name2.variant2",
Source: "tableB",
},
{
Template: "name3.variant3",
Source: "tableC",
},
{
Template: "name4.variant4",
Source: "tableD",
},
},
expectError: false,
},
{
name: "test missing key in source map",
sources: []metadata.NameVariant{
{Name: "name1", Variant: "variant1"},
{Name: "name2", Variant: "variant2"},
},
sourceMap: map[string]string{
"name1.variant1": "tableA",
},
expectedSourceMap: nil,
expectError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
sourceMap, err := getOrderedSourceMappings(tc.sources, tc.sourceMap)
if tc.expectError && err == nil {
t.Fatalf("Expected error, but did not get one")
}
if !tc.expectError && err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(sourceMap, tc.expectedSourceMap) {
t.Fatalf("source mapping did not generate the SourceMapping correctly. Expected %v, got %v", sourceMap, tc.expectedSourceMap)
}
})
}
}
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/prometheus/client_golang v1.12.2
github.com/prometheus/client_model v0.2.0
github.com/redis/rueidis v1.0.4-go1.18
github.com/segmentio/parquet-go v0.0.0-20221005185849-771b3e358a03
github.com/segmentio/parquet-go v0.0.0-20230712180008-5d42db8f0d47
github.com/snowflakedb/gosnowflake v1.6.8
github.com/stretchr/testify v1.8.3
go.etcd.io/etcd/api/v3 v3.5.6
Expand Down Expand Up @@ -108,7 +108,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -120,7 +120,7 @@ require (
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.7.0
golang.org/x/sys v0.8.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down Expand Up @@ -152,7 +152,7 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v11 v11.0.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.9 // indirect
Expand Down Expand Up @@ -185,19 +185,19 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
github.com/segmentio/encoding v0.3.5 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/segmentio/encoding v0.3.6 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.37.1-0.20220607072126-8a320890c08d // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ=
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
Expand Down Expand Up @@ -1125,6 +1127,8 @@ github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e
github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
Expand Down Expand Up @@ -1203,6 +1207,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
Expand Down Expand Up @@ -1375,6 +1381,8 @@ github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuR
github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0/go.mod h1:4xpMLz7RBWyB+ElzHu8Llua96TRCB3YwX+l5EP1wmHk=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
Expand Down Expand Up @@ -1459,6 +1467,9 @@ github.com/redis/rueidis v1.0.4-go1.18 h1:N/q9FhQiSevyo5XpPYBOSnmbKamB+B3XZkffbg
github.com/redis/rueidis v1.0.4-go1.18/go.mod h1:aJiezBQL+bZKAZ+d7YOuj6xKQhrXvEPBiOfotEhG5R8=
github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -1488,8 +1499,12 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod
github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg=
github.com/segmentio/encoding v0.3.5 h1:UZEiaZ55nlXGDL92scoVuw00RmiRCazIEmvPSbSvt8Y=
github.com/segmentio/encoding v0.3.5/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM=
github.com/segmentio/encoding v0.3.6 h1:E6lVLyDPseWEulBmCmAKPanDd3jiyGDo5gMcugCRwZQ=
github.com/segmentio/encoding v0.3.6/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM=
github.com/segmentio/parquet-go v0.0.0-20221005185849-771b3e358a03 h1:kUWYzrhMsSyiwngg2Qzn5LrpgKsmgGavydHd8qC/g2M=
github.com/segmentio/parquet-go v0.0.0-20221005185849-771b3e358a03/go.mod h1:SclLlCfB7c7CH0YerV+OtYmZExyK5rhVOd6UT90erVw=
github.com/segmentio/parquet-go v0.0.0-20230712180008-5d42db8f0d47 h1:5am1AKPVBj3ncaEsqsGQl/cvsW5mSrO9NSPqWWhH8OA=
github.com/segmentio/parquet-go v0.0.0-20230712180008-5d42db8f0d47/go.mod h1:+J0xQnJjm8DuQUHBO7t57EnmPbstT6+b45+p3DC9k1Q=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down Expand Up @@ -2110,6 +2125,8 @@ golang.org/x/sys v0.0.0-20220731174439-a90be440212d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
50 changes: 39 additions & 11 deletions provider/filepath.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ import (
)

type Filepath interface {
// Returns the name of the bucket (S3) or container (Azure Blob Storage)
Bucket() string
Prefix() string
// Returns the key to the object (S3) or blob (Azure Blob Storage)
Path() string
FullPathWithBucket() string
FullPathWithoutBucket() string
// Consumes a URI (e.g. abfss://<container>@<storage_account>/path/to/file) and parses it into
// the specific parts that the implementation expects.
ParseFullPath(path string) error
Ext() (FileType, error)
IsDir() bool
}

// TODO: Add support for additional params, such as service account (Azure Blob Storage)
func NewFilepath(storeType pc.FileStoreType, bucket string, prefix string, path string) (Filepath, error) {
switch storeType {
case S3:
Expand All @@ -32,6 +39,14 @@ func NewFilepath(storeType pc.FileStoreType, bucket string, prefix string, path
path: strings.TrimPrefix(path, "/"),
},
}, nil
case Azure:
return &AzureFilepath{
filePath: filePath{
bucket: strings.Trim(bucket, "/"),
prefix: strings.Trim(prefix, "/"),
path: strings.Trim(path, "/"),
},
}, nil
default:
return nil, fmt.Errorf("unknown store type '%s'", storeType)
}
Expand Down Expand Up @@ -109,6 +124,25 @@ func (fp *filePath) ParseFullPath(fullPath string) error {
return nil
}

// Get the file extension from the path; if no extension is found, default to Parquet.
// This method will fail for extensions that are not currently supported (e.g. .avro)
func (fp *filePath) Ext() (FileType, error) {
pathComponents := strings.Split(fp.path, ".")
if len(pathComponents) < 2 {
return Parquet, nil
}
ext := pathComponents[len(pathComponents)-1]
if !IsValidFileType(ext) {
return "", fmt.Errorf("invalid file type '%s'", ext)
}
return FileType(ext), nil
}

func (fp *filePath) IsDir() bool {
pathComponents := strings.Split(fp.path, ".")
return len(pathComponents) == 1
}

type S3Filepath struct {
filePath
}
Expand All @@ -133,19 +167,13 @@ func (azure *AzureFilepath) FullPathWithBucket() string {

func (azure *AzureFilepath) ParseFullPath(fullPath string) error {
abfssRegex := regexp.MustCompile(`abfss://(.+?)@(.+?)\.dfs.core.windows.net/(.+)`)
matches := abfssRegex.FindStringSubmatch(fullPath)

// If the regex matches all parts of the ABFS path, then we can parse the
// bucket, storage account, and path components. Otherwise, we can just set
// the path for standard Azure Blob Storage paths.
if len(matches) == 4 {
azure.filePath.bucket = matches[1]
azure.storageAccount = matches[2]
azure.filePath.path = matches[3]
if matches := abfssRegex.FindStringSubmatch(fullPath); len(matches) != 4 {
return fmt.Errorf("invalid path '%s'; expected format abfss://<container/bucket>@<storage_account>.dfs.core.windows.net/path", fullPath)
} else {
azure.filePath.path = fullPath
azure.filePath.bucket = strings.Trim(matches[1], "/")
azure.storageAccount = strings.Trim(matches[2], "/")
azure.filePath.path = strings.Trim(matches[3], "/")
}

return nil
}

Expand Down

0 comments on commit a28ff65

Please sign in to comment.