forked from twitter/the-algorithm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FavTfgBasedTopicEmbeddings.scala
172 lines (160 loc) · 8.3 KB
/
FavTfgBasedTopicEmbeddings.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package com.twitter.simclusters_v2.scalding.embedding.tfg
import com.twitter.dal.client.dataset.KeyValDALDataset
import com.twitter.dal.client.dataset.SnapshotDALDatasetBase
import com.twitter.scalding._
import com.twitter.scalding_internal.dalv2.DAL
import com.twitter.scalding_internal.dalv2.DALWrite.D
import com.twitter.scalding_internal.dalv2.DALWrite.WriteExtension
import com.twitter.scalding_internal.dalv2.remote_access.AllowCrossClusterSameDC
import com.twitter.scalding_internal.multiformat.format.keyval.KeyVal
import com.twitter.simclusters_v2.hdfs_sources.EntityEmbeddingsSources
import com.twitter.simclusters_v2.scalding.embedding.common.EmbeddingUtil
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
import com.twitter.simclusters_v2.thriftscala.ModelVersion
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
import com.twitter.simclusters_v2.thriftscala.TfgTopicEmbeddings
import com.twitter.simclusters_v2.thriftscala.UserToInterestedInClusterScores
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding => ThriftSimClustersEmbedding}
import com.twitter.wtf.scalding.jobs.common.AdhocExecutionApp
import com.twitter.wtf.scalding.jobs.common.ScheduledExecutionApp
import java.util.TimeZone
/**
* Jobs to generate Fav-based Topic-Follow-Graph (TFG) topic embeddings
* A topic's fav-based TFG embedding is the sum of its followers' fav-based InterestedIn
*/
/**
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings-adhoc
scalding remote run \
--user cassowary \
--keytab /var/lib/tss/keys/fluffy/keytabs/client/cassowary.keytab \
--principal service_acoount@TWITTER.BIZ \
--cluster bluebird-qus1 \
--main-class com.twitter.simclusters_v2.scalding.embedding.tfg.FavTfgTopicEmbeddingsAdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings-adhoc \
--hadoop-properties "scalding.with.reducers.set.explicitly=true mapreduce.job.reduces=4000" \
-- --date 2020-12-08
*/
object FavTfgTopicEmbeddingsAdhocApp extends TfgBasedTopicEmbeddingsBaseApp with AdhocExecutionApp {
override val isAdhoc: Boolean = true
override val embeddingType: EmbeddingType = EmbeddingType.FavTfgTopic
override val embeddingSource: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
] = EntityEmbeddingsSources.FavTfgTopicEmbeddingsDataset
override val pathSuffix: String = "fav_tfg_topic_embedding"
override val modelVersion: ModelVersion = ModelVersion.Model20m145kUpdated
override val parquetDataSource: SnapshotDALDatasetBase[TfgTopicEmbeddings] =
EntityEmbeddingsSources.FavTfgTopicEmbeddingsParquetDataset
override def scoreExtractor: UserToInterestedInClusterScores => Double = scores =>
scores.favScore.getOrElse(0.0)
}
/**
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings
capesospy-v2 update --build_locally --start_cron fav_tfg_topic_embeddings src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object FavTfgTopicEmbeddingsScheduledApp
extends TfgBasedTopicEmbeddingsBaseApp
with ScheduledExecutionApp {
override val isAdhoc: Boolean = false
override val embeddingType: EmbeddingType = EmbeddingType.FavTfgTopic
override val embeddingSource: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
] = EntityEmbeddingsSources.FavTfgTopicEmbeddingsDataset
override val pathSuffix: String = "fav_tfg_topic_embedding"
override val modelVersion: ModelVersion = ModelVersion.Model20m145kUpdated
override val parquetDataSource: SnapshotDALDatasetBase[TfgTopicEmbeddings] =
EntityEmbeddingsSources.FavTfgTopicEmbeddingsParquetDataset
override def scoreExtractor: UserToInterestedInClusterScores => Double = scores =>
scores.favScore.getOrElse(0.0)
override val firstTime: RichDate = RichDate("2020-05-25")
override val batchIncrement: Duration = Days(1)
}
/**
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings_2020-adhoc
scalding remote run \
--user cassowary \
--keytab /var/lib/tss/keys/fluffy/keytabs/client/cassowary.keytab \
--principal service_acoount@TWITTER.BIZ \
--cluster bluebird-qus1 \
--main-class com.twitter.simclusters_v2.scalding.embedding.tfg.FavTfgTopicEmbeddings2020AdhocApp \
--target src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings_2020-adhoc \
--hadoop-properties "scalding.with.reducers.set.explicitly=true mapreduce.job.reduces=4000" \
-- --date 2020-12-08
*/
object FavTfgTopicEmbeddings2020AdhocApp
extends TfgBasedTopicEmbeddingsBaseApp
with AdhocExecutionApp {
override val isAdhoc: Boolean = true
override val embeddingType: EmbeddingType = EmbeddingType.FavTfgTopic
override val embeddingSource: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
] = EntityEmbeddingsSources.FavTfgTopicEmbeddings2020Dataset
override val pathSuffix: String = "fav_tfg_topic_embedding"
override val modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override val parquetDataSource: SnapshotDALDatasetBase[TfgTopicEmbeddings] =
EntityEmbeddingsSources.FavTfgTopicEmbeddings2020ParquetDataset
override def scoreExtractor: UserToInterestedInClusterScores => Double = scores =>
scores.favScore.getOrElse(0.0)
}
/**
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings_2020
capesospy-v2 update --build_locally --start_cron fav_tfg_topic_embeddings_2020 src/scala/com/twitter/simclusters_v2/capesos_config/atla_proc3.yaml
*/
object FavTfgTopicEmbeddings2020ScheduledApp
extends TfgBasedTopicEmbeddingsBaseApp
with ScheduledExecutionApp {
override val isAdhoc: Boolean = false
override val embeddingType: EmbeddingType = EmbeddingType.FavTfgTopic
override val embeddingSource: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
] = EntityEmbeddingsSources.FavTfgTopicEmbeddings2020Dataset
override val pathSuffix: String = "fav_tfg_topic_embedding"
override val modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override val parquetDataSource: SnapshotDALDatasetBase[TfgTopicEmbeddings] =
EntityEmbeddingsSources.FavTfgTopicEmbeddings2020ParquetDataset
override def scoreExtractor: UserToInterestedInClusterScores => Double = scores =>
scores.favScore.getOrElse(0.0)
override val firstTime: RichDate = RichDate("2021-03-10")
override val batchIncrement: Duration = Days(1)
}
/**
./bazel bundle src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings_2020_copy
scalding scalding remote run --target src/scala/com/twitter/simclusters_v2/scalding/embedding/tfg:fav_tfg_topic_embeddings_2020_copy
*/
/**
* This is a copy job where we copy the previous version of TFG and write to a new one.
* The dependent dataset for TFG has been deleted.
* Instead of restarting the entire job, we create this temp hacky solution to keep TFG dataset alive until we deprecate topics.
* Having a table TFG doesn't lead to a big quality concern b/c TFG is built from topic follows, which is relative stable
* and we don't have new topics anymore.
*/
object FavTfgTopicEmbeddings2020CopyScheduledApp extends ScheduledExecutionApp {
val isAdhoc: Boolean = false
val embeddingType: EmbeddingType = EmbeddingType.FavTfgTopic
val embeddingSource: KeyValDALDataset[
KeyVal[SimClustersEmbeddingId, ThriftSimClustersEmbedding]
] = EntityEmbeddingsSources.FavTfgTopicEmbeddings2020Dataset
val pathSuffix: String = "fav_tfg_topic_embedding"
val modelVersion: ModelVersion = ModelVersion.Model20m145k2020
override val firstTime: RichDate = RichDate("2023-01-20")
override val batchIncrement: Duration = Days(3)
def runOnDateRange(
args: Args
)(
implicit dateRange: DateRange,
timeZone: TimeZone,
uniqueID: UniqueID
): Execution[Unit] = {
DAL
.readMostRecentSnapshotNoOlderThan(
EntityEmbeddingsSources.FavTfgTopicEmbeddings2020Dataset,
Days(21))
.withRemoteReadPolicy(AllowCrossClusterSameDC)
.toTypedPipe
.writeDALVersionedKeyValExecution(
EntityEmbeddingsSources.FavTfgTopicEmbeddings2020Dataset,
D.Suffix(
EmbeddingUtil
.getHdfsPath(isAdhoc = isAdhoc, isManhattanKeyVal = true, modelVersion, pathSuffix))
)
}
}