Skip to content

Commit

Permalink
Log errors during message ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed May 13, 2024
1 parent 398b31b commit d4327af
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 97 deletions.
6 changes: 3 additions & 3 deletions crates/common/src/manager/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Core {

for (async_handle, sync_handle) in [
self.backup_properties(&dest),
self.backup_term_index(&dest),
self.backup_fts_index(&dest),
self.backup_acl(&dest),
self.backup_blob(&dest),
self.backup_config(&dest),
Expand Down Expand Up @@ -223,9 +223,9 @@ impl Core {
)
}

fn backup_term_index(&self, dest: &Path) -> TaskHandle {
fn backup_fts_index(&self, dest: &Path) -> TaskHandle {
let store = self.storage.data.clone();
let (handle, writer) = spawn_writer(dest.join("term_index"));
let (handle, writer) = spawn_writer(dest.join("fts_index"));
(
tokio::spawn(async move {
writer
Expand Down
31 changes: 25 additions & 6 deletions crates/jmap/src/services/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ impl JMAP {
.await
{
Ok(Some(raw_message)) => raw_message,
_ => {
result => {
tracing::error!(
context = "ingest",
rcpts = ?message.recipients,
error = ?result,
"Failed to fetch message blob."
);

return (0..message.recipients.len())
.map(|_| DeliveryResult::TemporaryFailure {
reason: "Temporary I/O error.".into(),
Expand All @@ -53,15 +60,27 @@ impl JMAP {
let mut recipients = Vec::with_capacity(message.recipients.len());
let mut deliver_names = AHashMap::with_capacity(message.recipients.len());
for rcpt in &message.recipients {
let uids = self
match self
.core
.email_to_ids(&self.core.storage.directory, rcpt)
.await
.unwrap_or_default();
for uid in &uids {
deliver_names.insert(*uid, (DeliveryResult::Success, rcpt));
{
Ok(uids) => {
for uid in &uids {
deliver_names.insert(*uid, (DeliveryResult::Success, rcpt));
}
recipients.push(uids);
}
Err(err) => {
tracing::error!(
context = "ingest",
error = ?err,
rcpt = rcpt,
"Failed to lookup recipient"
);
recipients.push(vec![]);
}
}
recipients.push(uids);
}

// Deliver to each recipient
Expand Down
88 changes: 2 additions & 86 deletions crates/store/src/fts/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,35 +206,16 @@ impl Store {
return Ok(());
}

// Serialize tokens
//let mut term_index = KeySerializer::new(tokens.len() * U64_LEN * 2);
// Serialize keys
let mut keys = Vec::with_capacity(tokens.len());

// Write index keys
for (hash, postings) in tokens.into_iter() {
//term_index = term_index.write(hash.hash.as_slice()).write(hash.len);
keys.push(Operation::Value {
class: ValueClass::FtsIndex(hash),
op: ValueOp::Set(postings.serialize().into()),
});
}

// Write term index
/*let mut batch = BatchBuilder::new();
let mut term_index = lz4_flex::compress_prepend_size(&term_index.finalize());
term_index.insert(0, TERM_INDEX_VERSION);
batch
.with_account_id(document.account_id)
.with_collection(document.collection)
.update_document(document.document_id)
.set(
ValueClass::FtsIndex(BitmapHash {
hash: [u8::MAX; 8],
len: u8::MAX,
}),
term_index,
);
self.write(batch.build()).await?;*/
// Commit index
let mut batch = BatchBuilder::new();
batch
.with_account_id(document.account_id)
Expand Down Expand Up @@ -358,68 +339,3 @@ impl Store {
Ok(())
}
}

/*
struct TermIndex {
ops: Vec<Operation>,
}
impl Deserialize for TermIndex {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
if bytes.first().copied().unwrap_or_default() != TERM_INDEX_VERSION {
return Err(Error::InternalError(
"Unsupported term index version".to_string(),
));
}
let bytes = lz4_flex::decompress_size_prepended(bytes.get(1..).unwrap_or_default())
.map_err(|_| Error::InternalError("Failed to decompress term index".to_string()))?;
let mut ops = Vec::new();
// Skip bigrams
let (num_items, pos) =
bytes
.as_slice()
.read_leb128::<usize>()
.ok_or(Error::InternalError(
"Failed to read term index marker".to_string(),
))?;
let mut bytes = bytes
.get(pos + (num_items * 8)..)
.unwrap_or_default()
.iter()
.peekable();
while bytes.peek().is_some() {
let mut hash = BitmapHash {
hash: [0; 8],
len: 0,
};
for byte in hash.hash.iter_mut() {
*byte = *bytes.next().ok_or(Error::InternalError(
"Unexpected EOF reading term index".to_string(),
))?;
}
hash.len = *bytes.next().ok_or(Error::InternalError(
"Unexpected EOF reading term index".to_string(),
))?;
let num_fields = *bytes.next().ok_or(Error::InternalError(
"Unexpected EOF reading term index".to_string(),
))?;
for _ in 0..num_fields {
let field = *bytes.next().ok_or(Error::InternalError(
"Unexpected EOF reading term index".to_string(),
))?;
ops.push(Operation::Bitmap {
class: BitmapClass::Text { field, token: hash },
set: false,
});
}
}
Ok(Self { ops })
}
}
*/
4 changes: 2 additions & 2 deletions tests/src/jmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ password = "password"
type = "elasticsearch"
url = "https://localhost:9200"
user = "elastic"
password = "RtQ-Lu6+o4rxx=XJplVJ"
allow-invalid-certs = true
password = "changeme"
tls.allow-invalid-certs = true
disable = true
[certificate.default]
Expand Down

0 comments on commit d4327af

Please sign in to comment.