2014-01-20 70 views
1

請參閱下面的更新!marklogic刪除>插入>對新文檔的cpf操作

我有以下問題:我們正在將(百萬)文檔(推文)收集到ML中,並且在插入時我們有一個爲每個文檔創建元數據的cpf作業。更精確的說,它會根據位置添加地理標記(如果存在位置或座標)。

現在我們有一個數據庫,它已經收集了沒有激活geotagger的推文。我們希望通過刪除並重新插入每個尚未具有適當的元數據地理標籤元素的文檔來處理存儲的tweets和這個cpf作業。 然後cpf完成它的工作並對「新」文檔進行地理標記。

我們編寫了下面的代碼來刪除和插入文檔,但是我們得到了一個XDMP-CONFLICTUPDATES錯誤。我一直在閱讀有關交易並嘗試了幾件事情,「;」招。包裝在xdmp:eval中,或者分開刪除,並從xdmp:spawn中插入兩個獨立的函數調用。

仍然沒有運氣。

產卵,rename.xqy

xquery version "1.0-ml"; 

declare namespace j = "http://marklogic.com/xdmp/json/basic"; 
declare variable $to_process external; 

declare function local:document-rename(
    $old-uri as xs:string, $new-uri as xs:string) 
    as empty-sequence() 
{ 
    (:xdmp:set-transaction-mode("update"),:) 
    xdmp:eval(xdmp:document-delete($old-uri)), 
    (:xdmp:commit():) 

    let $permissions := xdmp:document-get-permissions($old-uri) 
    let $collections := xdmp:document-get-collections($old-uri) 
    return xdmp:document-insert(
     $new-uri, doc($old-uri), 
     if ($permissions) then $permissions 
     else xdmp:default-permissions(), 
     if ($collections) then $collections 
     else xdmp:default-collections(), 
     xdmp:document-get-quality($old-uri) 
    ) 
}; 

for $d in map:keys($to_process) 
let $rename := local:document-rename($d, map:get($to_process,$d)) 
return true() 

並運行一組特定的文件的工作,我們使用:

xquery version "1.0-ml"; 
declare namespace j = "http://marklogic.com/xdmp/json/basic"; 
declare namespace dikw = 'http://www.example.com/dikw_functions.xqy'; 
import module namespace json = "http://marklogic.com/xdmp/json" at "/MarkLogic/json/json.xqy"; 

let $foo := cts:uris((),(), cts:not-query(cts:element-query(xs:QName("j:dikwmetadata"), cts:element-query(xs:QName("j:data"), cts:and-query(()))))) 
let $items := cts:uri-match("/twitter/403580066367815680.json") (:any valid uri or set of uris:) 

let $map := map:map() 

    let $f := doc($items[1]) 
    let $id := $f/j:json/j:id/text() 
    let $oldUri := xdmp:node-uri($f) 
    let $newUri := fn:concat("/twitter/", $f/j:json/j:id/text(), ".json") 
    let $put := map:put($map,$oldUri,$newUri) 

    let $spawn := xdmp:spawn("/Modules/DIKW/spawn-rename-split.xqy", (xs:QName("to_process"), $map)) 

return ($oldUri, " - ", $newUri) 

問:

我如何設置代碼以便它在一個單獨的事務中首先刪除映射中的文檔並稍後插入它們以便cpf可以執行地理標記?


UPDATE

好了,所以每grtjn他的意見 我嘗試重寫我的代碼如下所示(THX爲止!):

xquery version "1.0-ml"; 
declare namespace j = "http://marklogic.com/xdmp/json/basic"; 

let $entries := cts:uri-match("//twitter/*") 
let $entry-count := fn:count($entries) 

let $transaction-size := 100 (: batch size $max :) 
let $total-transactions := ceiling($entry-count div $transaction-size) 

(: set total documents and total transactions so UI displays collecting :) 
(: skip 84 85 
let $set-total := infodev:ticket-set-total-documents($ticket-id, $entry-count) 
let $set-trans := infodev:ticket-set-total-transactions($ticket-id,$total-transactions) 
:) 
    (: create transactions by breaking document set into maps 
each maps's documents are saved to the db in their own transaction :) 
let $transactions := 
    for $i at $index in 1 to $total-transactions 
    let $map := map:map() 
    let $start := (($i -1) *$transaction-size) + 1 
    let $finish := min((($start - 1 + $transaction-size),$entry-count)) 
    let $put := 
     for $entry in ($entries)[$start to $finish] 
     (: 96 
     let $id := fn:concat(fn:string($entry/atom:id),".xml") 
     :) 
     let $id := fn:doc($entry)/j:json/j:id/text() 
     return map:put($map,$id,$entry) 
    return $map 

(: the callback function for ingest 
skip 101 let $function := xdmp:function(xs:QName("feed:process-file")) 
:) 
let $ingestion := 
    for $transaction at $index in $transactions 
    return true() 
    return $ingestion (: this second return statement seems odd? :) 
    (: do spawn here? :) 
    (: xdmp:spawn("/modules/spawn-move.xqy", (xs:QName("to_process"), $map)) :) 

現在,我百思不得其解,得到這個「工作'我需要補充最後的回報,這看起來不對。此外,我想弄清楚究竟發生了什麼,如果我運行查詢,它會返回一個超時錯誤。 我想先了解交易的實際情況。 對不起我的無知,但似乎執行一個(相對簡單)的任務,因爲重命名一些文檔看起來並不那麼簡單?

的完整性我重生,move.qry這裏:

xquery version "1.0-ml"; 

declare namespace j = "http://marklogic.com/xdmp/json/basic"; 
declare variable $to_process external; 


declare function local:document-move(
    $id as xs:string, $doc as xs:string) 
    as empty-sequence() 
{ 
    let $newUri := fn:concat("/twitter/", $id, ".json") 
    let $ins := xdmp:document-insert($newUri,fn:doc($doc)) 
    let $del := xdmp:document-delete($doc) 
    return true() 
}; 

for $d in map:keys($to_process) 
let $move := local:document-move($d, map:get($to_process,$d)) 
return true() 

回答

1

我懷疑你實際上並沒有重命名這些文件,而只是重新插入它們。如果$old-uri$new-uri相同,則您引用的rename函數不會預測該情況,並且會執行多餘的document-delete。在這種情況下,在刪除周圍添加一個if以跳過它。保留其他所有內容以保留權限,集合,質量和屬性。在實際插入之前,document-insert函數已經刪除了原有的文檔。另請參見:

http://docs.marklogic.com/xdmp:document-insert

您也可以考慮加入一點邏輯的做多產卵。根據硬件和森林配置,您希望理想地批量重新插入100到500個文檔的文檔。還有就是如何在GitHub上這個infostudio收藏家計算「交易」一個很好的例子(從線80開始):

https://github.com/marklogic/infostudio-plugins/blob/master/collectors/collector-feed.xqy

你也可以考慮做地質工作的交易裏面,而不是委託那對CPF。但是,如果您的地理位置查詢涉及外部呼叫(可能比較慢),請使用CPF。

HTH!

+0

thx grtjn,我有兩個用例,一個確實只是重新插入現有的文檔,但也有一個在其中有500.000文檔插入錯誤的uri,以「// twitter /」開頭而不是「/ twitter /」,因此看到的cpf作業「/ twitter /」錯過了這些文檔。我正在尋找一種方法來處理這些。我很困惑如何以毫秒爲單位完成這些工作,而不會遇到查詢超時等問題。使用指向第80行的指針我有一個問題,那就是我需要插入並運行它,以便我可以處理一次去500000文檔。 –

+1

@hugo-koopmans複製第80行到第104行,定義一個變量$包含所有uris的條目(您有$ foo/$條目),用條目數量定義變量$ entry-count,將$ max替換爲100(即你的批量大小),跳過第84/85/96/101行,編輯第97行,將$ entry作爲鍵放入地圖和一個虛擬值中,並在第104行後添加你的產卵。這樣你就可以在產卵之前跳過計算$ newURI ,但更好的是在產生的任務中做到這一點。 – grtjn

0

它看起來像你的樣品,你正試圖刪除和寫入文件相同的URI在同一介入。你可以用xdmp:commit()來解決這個問題。但是,另一種解決方案是首先將文檔重命名爲一批(將它們全部移出),然後在完成後將它們批量移回。

+0

thx for response ,,我試過了xdmp:commit()(在第一個代碼中我已經嘗試過,但註釋掉了)但是這不起作用,它需要一個分號來結束事務,成各種問題,我不明白。我正在研究你的第二個方向,但對我來說這似乎很遲鈍?也cpf過程似乎非常緩慢的任何提示加速cpf? –

+0

http://blakeley.com/blogofile/2013/06/21/introduction-to-multi-statement-transactions/可能會幫助你理解'xdmp:commit'和分號。 – mblakele

1

其實,如果你有你的公積金管線配置爲處理更新喜歡創造(這是默認配置),然後就重新插入文檔就足夠了:

xdmp:文檔插入($ d,DOC($ d ))