New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/fi reindex integration #2352
base: integration
Are you sure you want to change the base?
Conversation
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Show resolved
Hide resolved
warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java
Outdated
Show resolved
Hide resolved
warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java
Outdated
Show resolved
Hide resolved
warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java
Outdated
Show resolved
Hide resolved
warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java
Outdated
Show resolved
Hide resolved
warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java
Outdated
Show resolved
Hide resolved
warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java
Show resolved
Hide resolved
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Outdated
Show resolved
Hide resolved
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Outdated
Show resolved
Hide resolved
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Show resolved
Hide resolved
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Show resolved
Hide resolved
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Outdated
Show resolved
Hide resolved
...ouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexMapper.java
Show resolved
Hide resolved
if (tableProperties == null) { | ||
throw new IllegalArgumentException("configured output table: " + table + " does not exist"); | ||
} | ||
} catch (TableNotFoundException tnfe) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way than catching one exception just to throw another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay with this pattern since we're converting a checked exception into an unchecked one. This is us saying that if the table fails, there is no reasonable recovery. The conversion to an unchecked means you don't have to string throws TableNotFoundException
up your entire call stack.
One thing I would do is add the originating exception to the causedBy of the one we're throwing for stacktrace traceability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds reasonable to me.
|
||
ByteSequence cf = key.getColumnFamilyData(); | ||
String keyType; | ||
if (isKeyD(cf)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be valuable to have a getKeyType(cf) that returns a string to assign to keyType and then do switch statements on this like:
switch (keyType) {
case "d":
processDKey(key, value, context);
break;
case "tf":
processTFKey(key, value, context);
break;
case "fi":
processFIKey(key, value, context);
break;
default:
processEventKey(key, value, context);
break;
}
or even better if the case of the d, tf, and fi keys are just an integer we can do the case statement on the length of the bytes in the cf:
int keyTypeCode = cf.length()
switch (keyTypeCode) {
case 1:
processDKey(key, value, context);
break;
case 2:
processTFKey(key, value, context);
break;
case 4:
//possible extra check on starting bytes
processFIKey(key, value, context);
break;
default:
processEventKey(key, value, context);
break;
It could eliminate the need for the isKey* methods and might be more efficient.
Text indexCq = null; | ||
boolean indexed = false; | ||
|
||
if (key.isDeleted() && !this.propagateDeletes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider changing to:
if (key.isDeleted()) {
if (!this.propagateDeletes) {
incrementCounter("deletes", "skipped");
return;
}
incrementCounter("deletes", "propagated");
}
} | ||
|
||
// if the field is indexed and index only or events aren't being reprocessed | ||
if (helper.isIndexedField(this.normalizedFieldName) && (!this.reprocessEvents || helper.isIndexOnlyField(this.normalizedFieldName))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to move:
(!this.reprocessEvents || helper.isIndexOnlyField(this.normalizedFieldName))
to a single if statment tha encompasses the (helper.isIndexedField(this.normalizedFieldName)
and (helper.isReverseIndexedField(this.normalizedFieldName)
checks.
* @param timestamp | ||
* @return the original timestamp if this.floorTimestamps is false, otherwise the timestamp set to the same day at 0:0:0.000 | ||
*/ | ||
private long floorTimestamp(long timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to use java.time instead of Calendar.
private long floorTimestamp(long timestamp) {
if (!this.floorTimestamps) {
return timestamp;
}
return java.time.Instant.ofEpochMilli(timestamp)
.atZone(java.time.ZoneId.systemDefault()) // Adjust the zone as necessary
.toLocalDate()
.atStartOfDay(java.time.ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
}
Added support for batch processing, fixed bugs with tf processing offsets for multivalued fields, and extended unit test coverage