Skip to content

Commit 60d05a5

Browse files
author
pphust
committed
only use inode to identify files then taildirSource will support file rename/rotation
1 parent 59ffd4f commit 60d05a5

File tree

4 files changed

+63
-15
lines changed

4 files changed

+63
-15
lines changed

flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java

+38-14
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,17 @@ public class ReliableTaildirEventReader implements ReliableEventReader {
6060
private boolean committed = true;
6161
private final boolean annotateFileName;
6262
private final String fileNameHeader;
63+
private boolean inodeOnly = false;
6364

6465
/**
6566
* Create a ReliableTaildirEventReader to watch the given directory.
6667
*/
6768
private ReliableTaildirEventReader(Map<String, String> filePaths,
6869
Table<String, String, String> headerTable, String positionFilePath,
6970
boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
70-
boolean annotateFileName, String fileNameHeader) throws IOException {
71+
boolean annotateFileName, String fileNameHeader, boolean inodeOnly)
72+
throws IOException {
73+
this.inodeOnly = inodeOnly;
7174
// Sanity checks
7275
Preconditions.checkNotNull(filePaths);
7376
Preconditions.checkNotNull(positionFilePath);
@@ -134,7 +137,13 @@ public void loadPositionFile(String filePath) {
134137
+ "inode: " + inode + ", pos: " + pos + ", path: " + path);
135138
}
136139
TailFile tf = tailFiles.get(inode);
137-
if (tf != null && tf.updatePos(path, inode, pos)) {
140+
boolean isLoaded = false;
141+
if (inodeOnly) {
142+
isLoaded = tf != null && tf.updatePosAndPath(path, inode, pos);
143+
} else {
144+
isLoaded = tf != null && tf.updatePos(path, inode, pos);
145+
}
146+
if (isLoaded) {
138147
tailFiles.put(inode, tf);
139148
} else {
140149
logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
@@ -251,7 +260,14 @@ public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
251260
continue;
252261
}
253262
TailFile tf = tailFiles.get(inode);
254-
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
263+
264+
boolean isNewFile = false;
265+
if (inodeOnly) {
266+
isNewFile = tf == null;
267+
} else {
268+
isNewFile = tf == null || !tf.getPath().equals(f.getAbsolutePath());
269+
}
270+
if (isNewFile) {
255271
long startPos = skipToEnd ? f.length() : 0;
256272
tf = openFile(f, headers, inode, startPos);
257273
} else {
@@ -267,6 +283,7 @@ public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
267283
}
268284
}
269285
tf.setNeedTail(updated);
286+
tf.setPath(f.getAbsolutePath());
270287
}
271288
tailFiles.put(inode, tf);
272289
updatedInodes.add(inode);
@@ -285,19 +302,11 @@ private long getInode(File file) throws IOException {
285302
return inode;
286303
}
287304

288-
private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
289-
try {
290-
logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
291-
return new TailFile(file, headers, inode, pos);
292-
} catch (IOException e) {
293-
throw new FlumeException("Failed opening file: " + file, e);
294-
}
295-
}
296-
297305
/**
298306
* Special builder class for ReliableTaildirEventReader
299307
*/
300308
public static class Builder {
309+
301310
private Map<String, String> filePaths;
302311
private Table<String, String, String> headerTable;
303312
private String positionFilePath;
@@ -308,7 +317,8 @@ public static class Builder {
308317
TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
309318
private String fileNameHeader =
310319
TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
311-
320+
private Boolean inodeOnly =
321+
TaildirSourceConfigurationConstants.DEFAULT_INODE_ONLY;
312322
public Builder filePaths(Map<String, String> filePaths) {
313323
this.filePaths = filePaths;
314324
return this;
@@ -349,11 +359,25 @@ public Builder fileNameHeader(String fileNameHeader) {
349359
return this;
350360
}
351361

362+
public Builder inodeOnly(boolean inodeOnly) {
363+
this.inodeOnly = inodeOnly;
364+
return this;
365+
}
366+
352367
public ReliableTaildirEventReader build() throws IOException {
353368
return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
354369
addByteOffset, cachePatternMatching,
355-
annotateFileName, fileNameHeader);
370+
annotateFileName, fileNameHeader, inodeOnly);
356371
}
372+
357373
}
358374

375+
private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
376+
try {
377+
logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
378+
return new TailFile(file, headers, inode, pos);
379+
} catch (IOException e) {
380+
throw new FlumeException("Failed opening file: " + file, e);
381+
}
382+
}
359383
}

flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class TailFile {
4343
private static final int NEED_READING = -1;
4444

4545
private RandomAccessFile raf;
46-
private final String path;
46+
private String path;
4747
private final long inode;
4848
private long pos;
4949
private long lastUpdated;
@@ -107,6 +107,10 @@ public void setPos(long pos) {
107107
this.pos = pos;
108108
}
109109

110+
public void setPath(String path) {
111+
this.path = path;
112+
}
113+
110114
public void setLastUpdated(long lastUpdated) {
111115
this.lastUpdated = lastUpdated;
112116
}
@@ -128,6 +132,19 @@ public boolean updatePos(String path, long inode, long pos) throws IOException {
128132
}
129133
return false;
130134
}
135+
136+
public boolean updatePosAndPath(String path, long inode, long pos) throws IOException {
137+
if (this.inode == inode) {
138+
setPos(pos);
139+
setPath(path);
140+
updateFilePos(pos);
141+
logger.info("Updated position and path, file: " + path + ", inode: " + inode +
142+
", pos: " + pos);
143+
return true;
144+
}
145+
return false;
146+
}
147+
131148
public void updateFilePos(long pos) throws IOException {
132149
raf.seek(pos);
133150
lineReadPos = pos;

flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java

+3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class TaildirSource extends AbstractSource implements
8888
private boolean fileHeader;
8989
private String fileHeaderKey;
9090
private Long maxBatchCount;
91+
private boolean inodeOnly;
9192

9293
@Override
9394
public synchronized void start() {
@@ -102,6 +103,7 @@ public synchronized void start() {
102103
.cachePatternMatching(cachePatternMatching)
103104
.annotateFileName(fileHeader)
104105
.fileNameHeader(fileHeaderKey)
106+
.inodeOnly(inodeOnly)
105107
.build();
106108
} catch (IOException e) {
107109
throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
@@ -187,6 +189,7 @@ public synchronized void configure(Context context) {
187189
fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
188190
DEFAULT_FILENAME_HEADER_KEY);
189191
maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);
192+
inodeOnly = context.getBoolean(INODE_ONLY, DEFAULT_INODE_ONLY);
190193
if (maxBatchCount <= 0) {
191194
maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
192195
logger.warn("Invalid maxBatchCount specified, initializing source "

flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java

+4
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,8 @@ public class TaildirSourceConfigurationConstants {
6767
/** The max number of batch reads from a file in one loop */
6868
public static final String MAX_BATCH_COUNT = "maxBatchCount";
6969
public static final Long DEFAULT_MAX_BATCH_COUNT = Long.MAX_VALUE;
70+
71+
/** Whether to support file rotation in case it only checks inode nor both inode and file name */
72+
public static final String INODE_ONLY = "inodeOnly";
73+
public static final Boolean DEFAULT_INODE_ONLY = false;
7074
}

0 commit comments

Comments
 (0)