2016-01-13 53 views
0

我想通過使用MrJob和Hadoop 2.7.1的映射程序來填充Postgresql的數據庫。我目前使用下面的代碼:如何使用Mrjob和Hadoop填充postgresql數據庫

# -*- coding: utf-8 -*- 
#Script for storing the sparse data into a database by using Hadoop 
import psycopg2 
import re 
from mrjob.job import MRJob 

args_d = False 
args_c = True 
args_s = True 
args_n = 'es_word_space' 


def unicodize(segment): 
    if re.match(r'\\u[0-9a-f]{4}', segment): 
     return segment.decode('unicode-escape') 
    return segment.decode('utf-8') 

def create_tables(cr): 
    cr.execute("create table word_list(id serial primary key, word character varying not null)") 
    cr.execute("""create table word_sparse(
     id serial primary key, 
     word_id integer references word_list(id) not null, 
     pos integer not null, 
     val float not null)""") 

def delete_tables(cr): 
    cr.execute("drop table word_sparse") 
    cr.execute("drop table word_list") 

class MRwordStore(MRJob): 
    def mapper(self, _, line): 
     global cr 

     item = line.strip().split('\t') 
     replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0]))) 
     key = u''.join((c for c in replaced if c != '"')) 

     cr.execute("insert into word_list(word) values(%s) returning id", (key,)) 
     word_id = cr.fetchone()[0] 

      #Parse the list, literal_eval is avoided because of memory issues 
     inside = False 
     number = "" 
     pos = 0 
     val = 0 
     for c in item[1]: 
      if c == '[': 
       inside = True 
      elif c.isdigit(): 
       number += c 
      elif c == ',': 
       if inside: 
        pos = int(number) 
        number = "" 
      elif c == ']': 
       if inside: 
        val = int(number) 
        number = "" 
        cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val)) 
       inside = False 

if __name__ == "__main__": 
    """ 
    Stores words in the database. 

    The first time, run with the arguments -cs. 
    If the database has to be recreated, run again with the d argument (-dcs) 

    It also asumes the owner of the database is a user named semeval with password semeval 
    """ 
    global cr 

    conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n) 
    cr = conn.cursor() 
    if args_d: 
     delete_tables(cr) 
    if args_c: 
     create_tables(cr) 
    if args_s: 
     MRwordStore().run() 

    conn.commit() 
    conn.close() 

我試過用不減速器。通過調用我的腳本我有這個輸出:

$ python db_store_hadoop.py -r hadoop /almac/ignacio/data/wdSp_sparse.txt 
no configs found; falling back on auto-configuration 
no configs found; falling back on auto-configuration 
creating tmp directory /tmp/db_store_hadoop.hduser.20160113.012419.718376 
writing wrapper script to /tmp/db_store_hadoop.hduser.20160113.012419.718376/setup-wrapper.sh 
Using Hadoop version 2.7.1 
Copying local files into hdfs:///user/hduser/tmp/mrjob/db_store_hadoop.hduser.20160113.012419.718376/files/ 

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols 

並沒有更多,它似乎被絞死。下面是我的輸入文件的示例:

"\u00e1gil" [[1572, 1], [1590, 1], [4, 1], [774, 1]] 
"\u00e1guila" [[10, 5], [1116, 2], [15, 1], [1590, 1], [1641, 2], [1704, 1], [1740, 3], [183, 1], [3, 1], [428, 2], [900, 3]] 
"\u00e1guilas" [[1043, 1], [248, 1], [618, 1], [701, 2], [862, 2], [864, 2]] 
"\u00e1lava" [[1572, 1], [1576, 2], [1590, 1], [726, 2]] 

這是1.5GB的長度。我已經創建了數據庫,它是空的。非常感謝您的幫助,因爲我認爲可能存在很多誤解。

回答

0

每個映射器都需要自己的數據庫連接。在mapper_init()中創建數據庫連接並在mapper_final()中關閉它。您需要獨立於mrjob腳本創建數據庫。您應該先嚐試一些非常簡單的mrjob腳本。你還沒有開始正確的方式。通過文檔中的示例進行操作。