0

我試圖在ElasticSearch(AWS上的5.5,本地5.6)上使用Ingest Attachment Processor Plugin時遇到問題。我正在使用Python(3.6)進行開發,並使用elasticsearch-dls library如何使用Elasticsearch Ingest附件處理器插件和Python包elasticsearch-dsl

我使用Persistence,並有我的課建立這樣的:

import base64 
from elasticsearch_dsl.field import Attachment, Text 
from elasticsearch_dsl import DocType, analyzer 

lower_keyword = analyzer('keyword', tokenizer="keyword", filter=["lowercase"]) 

class ExampleIndex(DocType): 
class Meta: 
    index = 'example' 
    doc_type = 'Example' 

    id = Text() 
    name = Text(analyzer=lower_keyword) 
    my_file = Attachment() 

我再有這樣的,我稱之爲創建索引並保存文檔的功能。

def index_doc(a_file): 
    # Ensure that the Index is created before any documents are saved 
    try: 
     i = Index('example') 
     i.doc_type(ExampleIndex) 
     i.create() 

     # todo - Pipeline creation needs to go here - But how do you do it? 

    except Exception: 
     pass 

    # Check for existing index 
    indices = ExampleIndex() 
    try: 
     s = indices.search() 
     r = s.query('match', name=a_file.name).execute() 
     if r.success(): 
      for h in r: 
       indices = ExampleIndex.get(id=h.meta.id) 
       break 
    except NotFoundError: 
     pass 
    except Exception: 
     logger.exception("Something went wrong") 
     raise 

    # Populate the document 
    indices.name = a_file.name 
    with open(a_file.path_to_file, 'rb') as f: 
     contents = f.read() 
    indices.my_file = base64.b64encode(contents).decode("ascii") 

    indices.save(pipeline="attachment") if indices.my_file else indices.save() 

我與內容這是一個測試文檔的文本文件。當它的內容base64編碼它們成爲VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK

如果我使用捲曲直接那麼它的工作原理:

創建pipline:

curl -XPUT 'localhost:9200/_ingest/pipeline/attachment?pretty' -H 'Content-Type: application/json' -d' { "description" : "Extract attachment information", "processors" : [ 
    { 
     "attachment" : { 
     "field" : "my_file" 
     } 
    } ] } 

把數據

curl -XPUT 'localhost:9200/example/Example/AV9nkyJMZAQ2lQ3CtsLb?pipeline=attachment&pretty'\ 
-H 'Content-Type: application/json' \ 
-d '{"my_file": "VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK"}' 

取數據 http://localhost:9200/example/Example/AV9nkyJMZAQ2lQ3CtsLb?pretty

{ 
    "_index" : "example", 
    "_type" : "Example", 
    "_id" : "AV9nkyJMZAQ2lQ3CtsLb", 
    "_version" : 4, 
    "found" : true, 
    "_source" : { 
     "my_file" : "VGhpcyBpcyBhIHRlc3QgZG9jdW1lbnQK", 
     "attachment" : { 
      "content_type" : "text/plain; charset=ISO-8859-1", 
      "language" : "en", 
      "content" : "This is a test document", 
      "content_length" : 25 
     } 
    } 
} 

麻煩的是,我不能看到如何使用elasticsearch-DSL Python庫

UPDATE重新創建此: 我能得到一切比初始創建的管道現在工作等。如果我使用CURL創建管道,那麼我可以通過簡單地將.save()方法調用爲.save(pipeline =「attachment」)來使用它。我已經更新了我之前的功能,以顯示這一點,並對創建管線的位置進行評論。

這裏是捲曲的實現創造了流水線的一個例子

curl - XPUT 'localhost:9200/_ingest/pipeline/attachment?pretty' \ 
    - H 'Content-Type: application/json' \ 
    - d '"description": "Extract attachment information","processors": [{"attachment": {"field": "my_field"}}]}' 

回答

0

的問題的答案是使用從下級elasticseatch.py​​庫IngestClient創建管道之後再使用。

from elasticsearch.client.ingest import IngestClient 
p = IngestClient(es_connection) 
p.put_pipeline(id='attachment', body={ 
    'description': "Extract attachment information", 
    'processors': [ 
     {"attachment": {"field": "cv"}} 
    ] 
}) 

使用elasticsearch-DSL持久性流量(的DocType)創建一個管道,索引和文件內ElasticSearch的完整工作示例是:

import base64 
from uuid import uuid4 
from elasticsearch.client.ingest import IngestClient 
from elasticsearch.exceptions import NotFoundError 
from elasticsearch_dsl import analyzer, DocType, Index 
from elasticsearch_dsl.connections import connections 
from elasticsearch_dsl.field import Attachment, Text 


# Establish a connection 
host = '127.0.0.1' 
port = 9200 
es = connections.create_connection(host=host, port=port) 

# Some custom analyzers 
html_strip = analyzer('html_strip', tokenizer="standard", filter=["standard", "lowercase", "stop", "snowball"], 
         char_filter=["html_strip"]) 
lower_keyword = analyzer('keyword', tokenizer="keyword", filter=["lowercase"]) 


class ExampleIndex(DocType): 
    class Meta: 
     index = 'example' 
     doc_type = 'Example' 

    id = Text() 
    uuid = Text() 
    name = Text() 
    town = Text(analyzer=lower_keyword) 
    my_file = Attachment(analyzer=html_strip) 


def save_document(doc): 
    """ 

    :param obj doc: Example object containing values to save 
    :return: 
    """ 
    try: 
     # Create the Pipeline BEFORE creating the index 
     p = IngestClient(es) 
     p.put_pipeline(id='myattachment', body={ 
      'description': "Extract attachment information", 
      'processors': [ 
       { 
        "attachment": { 
         "field": "my_file" 
        } 
       } 
      ] 
     }) 

     # Create the index. An exception will be raise if it already exists 
     i = Index('example') 
     i.doc_type(ExampleIndex) 
     i.create() 
    except Exception: 
     # todo - should be restricted to the expected Exception subclasses 
     pass 

    indices = ExampleIndex() 
    try: 
     s = indices.search() 
     r = s.query('match', uuid=doc.uuid).execute() 
     if r.success(): 
      for h in r: 
       indices = ExampleIndex.get(id=h.meta.id) 
       break 
    except NotFoundError: 
     # New record 
     pass 
    except Exception: 
     print("Unexpected error") 
     raise 

    # Now set the doc properties 
    indices.uuid = doc.uuid 
    indices.name = doc.name 
    indices.town = doc.town 
    if doc.my_file: 
     with open(doc.my_file, 'rb') as f: 
      contents = f.read() 
     indices.my_file = base64.b64encode(contents).decode("ascii") 

    # Save the index, using the Attachment pipeline if a file was attached 
    return indices.save(pipeline="myattachment") if indices.my_file else indices.save() 


class MyObj(object): 
    uuid = uuid4() 
    name = '' 
    town = '' 
    my_file = '' 

    def __init__(self, name, town, file): 
     self.name = name 
     self.town = town 
     self.my_file = file 


me = MyObj("Steve", "London", '/home/steve/Documents/test.txt') 

res = save_document(me) 
相關問題