2012-11-12 46 views
1

我正在使用Carbon和Ceres作爲存儲方法的Graphite監控。我在糾正不良數據方面遇到了一些問題。看來(由於各種問題)我已經結束了重疊的文件。也就是說,由於Carbon/Ceres將數據存儲爲[email protected],因此我可以有兩個或多個具有重疊時間範圍的文件。石墨/碳/ Ceres節點重疊

有兩種重疊:

File A: +------------+  orig file 
File B:  +-----+   subset 
File C:   +---------+ overlap 

這是造成問題,因爲可用的現有工具(CERES-維護碎片整理和彙總)不與這些重疊應付。相反,他們跳過目錄並繼續前進。顯然這是一個問題。

回答

0

我已經創建了一個修復此問題的腳本,內容如下:

  1. 對於子集,只是刪除子文件。

  2. 對於重疊,在下一個文件開始的位置使用原始文件上的文件系統'truncate'。雖然可以切斷重疊文件的開始並重新命名,但我認爲這是充滿危險的。

我發現,這是可能做到這一點有兩種方法:

  1. 走在顯示目錄和遍歷文件,固定,當您去,並查找文件的子集,刪除它們;

  2. 在繼續之前,先行走dir並修復dir中的所有問題。這是BY FAR更快的方法,因爲dir walk非常耗時。

代碼:

#!/usr/bin/env python2.6 
################################################################################ 

import io 
import os 
import time 
import sys 
import string 
import logging 
import unittest 
import datetime 
import random 
import zmq 
import json 
import socket 
import traceback 
import signal 
import select 
import simplejson 
import cPickle as pickle 
import re 
import shutil 
import collections 
from pymongo import Connection 
from optparse import OptionParser 
from pprint import pprint, pformat 

################################################################################ 

class SliceFile(object): 
    def __init__(self, fname): 
     self.name  = fname 
     basename  = fname.split('/')[-1] 
     fnArray   = basename.split('@') 
     self.timeStart = int(fnArray[0]) 
     self.freq  = int(fnArray[1].split('.')[0]) 
     self.size  = None 
     self.numPoints = None 
     self.timeEnd = None 
     self.deleted = False 

    def __repr__(self): 
     out = "Name: %s, tstart=%s tEnd=%s, freq=%s, size=%s, npoints=%s." % (
      self.name, self.timeStart, self.timeEnd, self.freq, self.size, self.numPoints) 
     return out 

    def setVars(self): 
     self.size  = os.path.getsize(self.name) 
     self.numPoints = int(self.size/8) 
     self.timeEnd = self.timeStart + (self.numPoints * self.freq) 

################################################################################ 

class CeresOverlapFixup(object): 

    def __del__(self): 
     import datetime 
     self.writeLog("Ending at %s" % (str(datetime.datetime.today()))) 
     self.LOGFILE.flush() 
     self.LOGFILE.close() 

    def __init__(self): 
     self.verbose   = False 
     self.debug    = False 
     self.LOGFILE   = open("ceresOverlapFixup.log", "a") 
     self.badFilesList  = set() 
     self.truncated   = 0 
     self.subsets   = 0 
     self.dirsExamined  = 0    
     self.lastStatusTime  = 0 

    def getOptionParser(self): 
     return OptionParser() 

    def getOptions(self): 
     parser = self.getOptionParser() 
     parser.add_option("-d", "--debug",  action="store_true",     dest="debug", default=False, help="debug mode for this program, writes debug messages to logfile.") 
     parser.add_option("-v", "--verbose", action="store_true",     dest="verbose", default=False, help="verbose mode for this program, prints a lot to stdout.") 
     parser.add_option("-b", "--basedir", action="store",  type="string", dest="basedir", default=None, help="base directory location to start converting.") 
     (options, args)  = parser.parse_args() 
     self.debug   = options.debug 
     self.verbose  = options.verbose 
     self.basedir  = options.basedir 
     assert self.basedir, "must provide base directory." 

    # Examples: 
    # ./updateOperations/[email protected] 
    # ./updateOperations/[email protected] 
    # ./updateOperations/[email protected] 

    def getFileData(self, inFilename): 
     ret = SliceFile(inFilename) 
     ret.setVars() 
     return ret 

    def removeFile(self, inFilename): 
     os.remove(inFilename) 
     #self.writeLog("removing file: %s" % (inFilename)) 
     self.subsets += 1 

    def truncateFile(self, fname, newSize): 
     if self.verbose: 
      self.writeLog("Truncating file, name=%s, newsize=%s" % (pformat(fname), pformat(newSize))) 
     IFD = None 
     try: 
      IFD = os.open(fname, os.O_RDWR|os.O_CREAT) 
      os.ftruncate(IFD, newSize) 
      os.close(IFD) 
      self.truncated += 1 
     except: 
      self.writeLog("Exception during truncate: %s" % (traceback.format_exc())) 
     try: 
      os.close(IFD) 
     except: 
      pass 
     return 

    def printStatus(self): 
     now = self.getNowTime() 
     if ((now - self.lastStatusTime) > 10): 
      self.writeLog("Status: time=%d, Walked %s dirs, subsetFilesRemoved=%s, truncated %s files." % (now, self.dirsExamined, self.subsets, self.truncated)) 
      self.lastStatusTime = now 

    def fixupThisDir(self, inPath, inFiles): 

     # self.writeLog("Fixing files in dir: %s" % (inPath)) 
     if not '.ceres-node' in inFiles: 
      # self.writeLog("--> Not a slice directory, skipping.") 
      return 

     self.dirsExamined += 1    

     sortedFiles = sorted(inFiles) 
     sortedFiles = [x for x in sortedFiles if ((x != '.ceres-node') and (x.count('@') > 0)) ] 
     lastFile = None 
     fileObjList = [] 
     for thisFile in sortedFiles: 
      wholeFilename = os.path.join(inPath, thisFile) 
      try: 
       curFile = self.getFileData(wholeFilename) 
       fileObjList.append(curFile) 
      except: 
       self.badFilesList.add(wholeFilename) 
       self.writeLog("ERROR: file %s, %s" % (wholeFilename, traceback.format_exc())) 

     # name is timeStart, really. 
     fileObjList = sorted(fileObjList, key=lambda thisObj: thisObj.name) 

     while fileObjList: 

      self.printStatus() 

      changes = False 
      firstFile = fileObjList[0] 
      removedFiles = [] 
      for curFile in fileObjList[1:]: 
       if (curFile.timeEnd <= firstFile.timeEnd): 
        # have subset file. elim. 
        self.removeFile(curFile.name) 
        removedFiles.append(curFile.name) 
        self.subsets += 1 
        changes = True 
        if self.verbose: 
         self.writeLog("Subset file situation. First=%s, overlap=%s" % (firstFile, curFile)) 
      fileObjList = [x for x in fileObjList if x.name not in removedFiles] 
      if (len(fileObjList) < 2): 
       break 
      secondFile = fileObjList[1] 

      # LT is right. FirstFile's timeEnd is always the first open time after first is done. 
      # so, first [email protected], len=2, end=102, positions used=100,101. second [email protected] == OK. 
      if (secondFile.timeStart < firstFile.timeEnd): 
       # truncate first file. 
       # file_A (last): +---------+ 
       # file_B (curr):   +----------+ 
       # solve by truncating previous file at startpoint of current file. 
       newLenFile_A_seconds = int(secondFile.timeStart - firstFile.timeStart) 
       newFile_A_datapoints = int(newLenFile_A_seconds/firstFile.freq) 
       newFile_A_bytes  = int(newFile_A_datapoints) * 8 
       if (not newFile_A_bytes): 
        fileObjList = fileObjList[1:] 
        continue 
       assert newFile_A_bytes, "Must have size. newLenFile_A_seconds=%s, newFile_A_datapoints=%s, newFile_A_bytes=%s." % (newLenFile_A_seconds, newFile_A_datapoints, newFile_A_bytes) 
       self.truncateFile(firstFile.name, newFile_A_bytes) 
       if self.verbose: 
        self.writeLog("Truncate situation. First=%s, overlap=%s" % (firstFile, secondFile)) 
       self.truncated += 1 
       fileObjList = fileObjList[1:] 
       changes = True 

      if not changes: 
       fileObjList = fileObjList[1:] 


    def getNowTime(self): 
     return time.time() 


    def walkDirStructure(self): 

     startTime   = self.getNowTime() 
     self.lastStatusTime = startTime 
     updateStatsDict  = {} 
     self.okayFiles  = 0 
     emptyFiles   = 0 

     for (thisPath, theseDirs, theseFiles) in os.walk(self.basedir): 
      self.printStatus() 
      self.fixupThisDir(thisPath, theseFiles) 
      self.dirsExamined += 1 

     endTime = time.time() 
     # time.sleep(11) 
     self.printStatus() 
     self.writeLog("now = %s, started at %s, elapsed time = %s seconds." % (startTime, endTime, endTime - startTime)) 
     self.writeLog("Done.") 


    def writeLog(self, instring): 
     print instring 
     print >> self.LOGFILE, instring 
     self.LOGFILE.flush() 

    def main(self): 
     self.getOptions() 
     self.walkDirStructure() 
+0

導入列表看起來有點粗放...... –

+0

沒錯。可以清理,同意。但是,whatchagonnado?我可以花很長時間讓這個更緊。如果您有更好的版本,請發佈。 –