Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

12540 post dup docs2 #59

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 80 additions & 69 deletions apps/couch/src/couch_db.erl
Expand Up @@ -73,16 +73,16 @@ open_int(DbName, Options) ->
% it ensures that the http userCtx is a valid reader
open(DbName, Options) ->
case couch_server:open(DbName, Options) of
{ok, Db} ->
try
check_is_reader(Db),
{ok, Db}
catch
throw:Error ->
close(Db),
throw(Error)
end;
Else -> Else
{ok, Db} ->
try
check_is_reader(Db),
{ok, Db}
catch
throw:Error ->
close(Db),
throw(Error)
end;
Else -> Else
end.

reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
Expand Down Expand Up @@ -154,7 +154,7 @@ apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
Else.

apply_open_options2(Doc,[]) ->
{ok, Doc};
apply_open_options2(#doc{atts=Atts,revs=Revs}=Doc,
Expand Down Expand Up @@ -421,22 +421,22 @@ update_docs(Db, Docs) ->
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
Sorted = lists:sort(fun({#doc{id=A},_},{#doc{id=B},_})-> A < B end, Docs),
group_alike_docs(Sorted, []).

group_alike_docs([], Buckets) ->
lists:reverse(Buckets);
group_alike_docs([Doc|Rest], []) ->
group_alike_docs(Rest, [[Doc]]);
group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
[#doc{id=BucketId}|_] = Bucket,
group_alike_docs([{Doc,Ref}|Rest], [Bucket|RestBuckets]) ->
[{#doc{id=BucketId},_Ref}|_] = Bucket,
case Doc#doc.id == BucketId of
true ->
% add to existing bucket
group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
group_alike_docs(Rest, [[{Doc,Ref}|Bucket]|RestBuckets]);
false ->
% add to new bucket
group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
group_alike_docs(Rest, [[{Doc,Ref}]|[Bucket|RestBuckets]])
end.

validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
Expand Down Expand Up @@ -523,10 +523,8 @@ prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
{AccPrepped, AccFatalErrors};
prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
[#doc{id=Id}|_]=DocBucket,
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
fun({#doc{revs=Revs}=Doc, Ref}, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
Expand All @@ -536,13 +534,13 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccBucket], AccErrors2};
{[{Doc, Ref} | AccBucket], AccErrors2};
Error ->
{AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
{AccBucket, [{Ref, Error} | AccErrors2]}
end;
_ ->
% old revs specified but none exist, a conflict
{AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
{AccBucket, [{Ref, conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
Expand All @@ -556,14 +554,14 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
LeafRevsDict = dict:from_list([{{Start, RevId}, {Del, Ptr, Revs}} ||
{#leaf{deleted=Del, ptr=Ptr}, {Start, [RevId|_]}=Revs} <- Leafs]),
{PreppedBucket, AccErrors3} = lists:foldl(
fun(Doc, {Docs2Acc, AccErrors2}) ->
fun({Doc,Ref}, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
{[Doc2 | Docs2Acc], AccErrors2};
{Error, #doc{id=Id,revs=Revs}} ->
{[{Doc2, Ref} | Docs2Acc], AccErrors2};
{Error, #doc{}} ->
% Record the error
{Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
{Docs2Acc, [{Ref, Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
Expand All @@ -583,15 +581,15 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
fun(Doc, {AccPrepped2, AccErrors2}) ->
fun({Doc,Ref}, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccPrepped2], AccErrors2};
{[{Doc,Ref} | AccPrepped2], AccErrors2};
Error ->
{AccPrepped2, [{Doc, Error} | AccErrors2]}
end
Expand All @@ -600,7 +598,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
{ok, #full_doc_info{rev_tree=OldTree}} ->
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
fun({NewDoc,_Ref}, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_db:doc_to_tree(NewDoc), Db#db.revs_limit),
NewTree
Expand All @@ -610,16 +608,16 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
{ValidatedBucket, AccErrors3} =
lists:foldl(
fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
fun({#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, Ref}, {AccValidated, AccErrors2}) ->
case dict:find({Pos, RevId}, LeafRevsFullDict) of
{ok, {Start, Path}} ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.

LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
end,

case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = LoadPrevRevFun(),
Expand All @@ -629,10 +627,10 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
Doc2 = Doc,
GetDiskDocFun = LoadPrevRevFun
end,

case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
{[Doc2 | AccValidated], AccErrors2};
{[{Doc2, Ref} | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
Expand Down Expand Up @@ -664,10 +662,10 @@ new_revs([], OutBuckets, IdRevsAcc) ->
{lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
{NewBucket, IdRevsAcc3} = lists:mapfoldl(
fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
fun({#doc{revs={Start, RevIds}}=Doc, Ref}, IdRevsAcc2)->
NewRevId = new_revid(Doc),
{Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
[{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
{{Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, Ref},
[{Ref, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).

Expand All @@ -686,16 +684,21 @@ check_dup_atts2(_) ->

update_docs(Db, Docs, Options, replicated_changes) ->
increment_stat(Db, {couchdb, database_writes}),
DocBuckets = group_alike_docs(Docs),

% associate reference with each doc in order to track duplicates
Docs2 = lists:map(fun(Doc) ->
{Doc, make_ref()}
end,Docs),
DocBuckets = group_alike_docs(Docs2),

case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
(#doc{atts=Atts}) ->
fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>},_Ref}) -> true;
({#doc{atts=Atts},_Ref}) ->
Atts /= []
end, Docs) of
end, Docs2) of
true ->
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocs = get_full_doc_infos(Db, Ids),

{DocBuckets2, DocErrors} =
Expand All @@ -705,8 +708,8 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
DocBuckets4 = [[{doc_flush_atts(check_dup_atts(Doc), Db#db.fd), Ref}
|| {Doc, Ref} <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};

Expand All @@ -715,28 +718,33 @@ update_docs(Db, Docs, Options, interactive_edit) ->
AllOrNothing = lists:member(all_or_nothing, Options),
% go ahead and generate the new revision ids for the documents.
% separate out the NonRep documents from the rest of the documents
{Docs2, NonRepDocs} = lists:foldl(
fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->

% associate reference with each doc in order to track duplicates
Docs2 = lists:map(fun(Doc) ->
{Doc, make_ref()}
end,Docs),
{Docs3, NonRepDocs} = lists:foldl(
fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) ->
case Id of
<<?LOCAL_DOC_PREFIX, _/binary>> ->
{DocsAcc, [Doc | NonRepDocsAcc]};
Id->
{[Doc | DocsAcc], NonRepDocsAcc}
end
end, {[], []}, Docs),
DocBuckets = group_alike_docs(Docs2),
end, {[], []}, Docs2),

DocBuckets = group_alike_docs(Docs3),

case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
fun({#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>},_Ref}) ->
true;
(#doc{atts=Atts}) ->
({#doc{atts=Atts},_Ref}) ->
Atts /= []
end, Docs2) of
end, Docs3) of
true ->
% lookup the doc by id and get the most recent
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
Ids = [Id || [{#doc{id=Id}, _Ref}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),

{DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
Expand All @@ -750,29 +758,32 @@ update_docs(Db, Docs, Options, interactive_edit) ->
end,

if (AllOrNothing) and (PreCommitFailures /= []) ->
{aborted, lists:map(
fun({{Id,{Pos, [RevId|_]}}, Error}) ->
{{Id, {Pos, RevId}}, Error};
({{Id,{0, []}}, Error}) ->
{{Id, {0, <<>>}}, Error}
end, PreCommitFailures)};
{aborted,
lists:foldl(fun({#doc{id=Id,revs={Pos, RevIds}}, Ref},Acc) ->
case lists:keyfind(Ref,1,PreCommitFailures) of
{Ref, Error} ->
[{{Id,{Pos,RevIds}}, Error} | Acc];
false ->
Acc
end
end,[],Docs3)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
check_dup_atts(Doc)), Db#db.fd)
|| Doc <- B] || B <- DocBuckets2],
{doc_flush_atts(set_new_att_revpos(
check_dup_atts(Doc)), Db#db.fd), Ref}
|| {Doc, Ref} <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),

{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),

ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
{ok, lists:map(
fun(#doc{id=Id,revs={Pos, RevIds}}) ->
{ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
fun({#doc{}, Ref}) ->
{ok, Result} = dict:find(Ref, ResultsDict),
Result
end, Docs)}
end, Docs2)}
end.

% Returns the first available document on disk. Input list is a full rev path
Expand Down Expand Up @@ -831,7 +842,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
DocBuckets2 = [[{doc_flush_atts(Doc, Db2#db.fd), Ref} || {Doc,Ref} <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
Expand All @@ -852,7 +863,7 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
(Att) ->
Att#att{revpos=RevPos+1}
end, Atts)}.


doc_flush_atts(Doc, Fd) ->
Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
Expand Down