hadoop中的recordreader和split以及block的关系是怎样的

这篇文章主要讲解了“hadoop中的recordreader和split以及block的关系是怎样的”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“hadoop中的recordreader和split以及block的关系是怎样的”吧!

recordreader的作用不言而喻。

通常来讲,Inputformat会为没有一个split产生一个recordreader来提供给maptask使用,进而,MapTask能够读取属于自己管辖处理的那部分split。

这里面,我们以linerecordreader为例子进行讲解:

几个核心的方法

hadoop中的recordreader和split以及block的关系是怎样的  hadoop 第1张

定义了linerecordreader基本的作用,即,是否有下一对kv,获得下一个key,获得下一个value。

而这个三个方法的使用地方如下。

暂时忽略...


因为文件在hdfs上分块存放的,那么split和block什么鬼?为啥不直接按照block去处理就行了呗。原因呢,是block中的数据可能不是连续的。可能某个重要的信息被两个block分隔了。因此,我们使用逻辑上的概念,即split来处理。

而split并不是真的将文件split了...而是逻辑上的标记下start,length,filepath等即可。

hadoop中的recordreader和split以及block的关系是怎样的  hadoop 第2张

根据Path,可以过得到FileSystem

  final Path file = split.getPath();
  // open the file and seek to the start of the split
   final FileSystem fs = file.getFileSystem(job);


每个maptask呢,都会使用使用一个linerecordreader,处理对应的split,中间通过了

private FSDataInputStream fileIn;

来维护一个流。

切记:这里的流并不是只针对这个split的,我们之前说过,split只是标记而已,没有分隔。

因此,这个流fileIn其实是指向整个文件的。

并且呢,这个流呢,会实现jdk中标准的方法,啥read啊之类的。读取到缓冲区中,但是如果涉及到不同的block呢,这个流会自动帮我们去找对应的block的,这个太复杂。反正记住fileIn屏蔽了顶层的不同block之前的切换,对我们来讲就像处理一个大的文件一样。


既然是流,那么就能够定位了,因此,不同的maptask就可以根据自己的split中的start位置,通过fileIn流直接定位到要处理文件的那个地方。

fileIn.seek(start);
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;



可以看到其中的in对象,是借助fileIn生成,相比,in内部一定借助了这个fileIn流来实现某个功能。


典型的,readLine, 

in对象负责一行的读取逻辑,,而fileIn则负责从文件读取字符到byte缓冲区。

readline函数,最终会有一个这样的抵用,可以看到

 bufferLength = fillBuffer(in, buffer, prevCharCR);

调用fillbuffer函数,从in.read()中读取东西到buffer中,

private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    /* We're reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     * 1. No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     * 2. An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it's LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }



OK,那之后的linerecordreader三个主要的方法就简答了,读取就行了。略屌。

但是,有一个问题还没说。即一行信息如果被某个block分隔了咋办。

或者这个问题,这样说,我们知道Inputformat中的getSplit方法呢,就是根据文件的length等属性直接划分split的。

参照FileInputformat的getSplits方法

hadoop中的recordreader和split以及block的关系是怎样的  hadoop 第3张


那么一行数据,可能在不同的splits中,也可能在不同的block中。

在不同的block中呢,这个有fileIn对象帮我们处理的了,主要是读取read到缓冲区,属于物理上的问题,不是考虑的地方。

处于不同的split呢?这个情况有些问题,因为不同的split就是不同的划分,并且由不同的map task执行。

那么我们recordreader如何解决这个问题呢?

解决办法便是,突破split的start和end限制。

linerecordreader的解决办法:

只不start指向的位置不是文件的第一行,则默认的过滤掉一行(start位置可能是一行中的某一个位置)。

initialize()方法

if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
this.pos = start;


在nextKeyvalue方法中,多读取一些数据,补充完整的一行。

while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }

OK,通过过滤掉一行,和多读取一行,就能保证被split分隔的一行,能够完成的读取,同时也不会重复处理一些数据。因为,所有的mapTask的linerecordreader都遵循这个方法。

感谢各位的阅读,以上就是“hadoop中的recordreader和split以及block的关系是怎样的”的内容了,经过本文的学习后,相信大家对hadoop中的recordreader和split以及block的关系是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是蜗牛博客,小编将为大家推送更多相关知识点的文章,欢迎关注!

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

评论

有免费节点资源,我们会通知你!加入纸飞机订阅群

×
天气预报查看日历分享网页手机扫码留言评论Telegram