Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using zipped file, is there any reason , why the zipped file is not removed from the directoty once unzipped? #603

Open
rouellet99 opened this issue Feb 29, 2024 · 12 comments
Labels
question Further information is requested

Comments

@rouellet99
Copy link
Contributor

rouellet99 commented Feb 29, 2024

FilePulse version 2.13, docker desktop 4.22.1

When using zipped file, is there any reason why the zipped file is not removed from the directoty once unzipped?

The reason of my question is when using
"offset.attributes.string": "name+lastModified",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LocalMoveCleanupPolicy",

since the zipped file is not removed, once the unzipped file has been processed, the file that was zipped will be unzipped and processed again since the lastModified value is different than the previous one. This will end up to in an infinit loop.

Is there a way to make sure that the zipped file is removed once it is unzipped to avoid this situation?

@rouellet99 rouellet99 added the question Further information is requested label Feb 29, 2024
@rouellet99 rouellet99 changed the title When using zip file, is there any reason , why the zip file is not removed from the directoty once unzipped? When using zipped file, is there any reason , why the zipped file is not removed from the directoty once unzipped? Feb 29, 2024
@Markynice
Copy link

Hi, try "offset.attributes.string": "name+hash".

@Markynice
Copy link

In my opinion, this is an issue with lastModified, this parameter is used as a key by which the processed files are located, but the lastModified parameter is overwritten every time a message is sent to the "tasks.file.status.storage.topic" topic, so it thinks that it is a new file since the key is different.

@rouellet99
Copy link
Contributor Author

Thanks for your answers, but I am already aware that if I remove the lastModified within the config offset.attributes.string" will not produced this issue. However, my question if more about the possibity to remove the zipped file once unzipped. Is it something doable?

@Markynice
Copy link

ok, try
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.DeleteCleanupPolicy"

@rouellet99
Copy link
Contributor Author

rouellet99 commented Mar 1, 2024

Thanks for the suggestion, but I just tested it and the same issue is occuring, the csv file is deleted once processed, but since the zipped file is not removed, event with this configuration, the file that was zipped will be unzipped and processed again if the lastModified config is used with name in "offset.attributes.string" . This ends up to in an infinit loop as well.

I think this is an issue when procssing zipped file, this file should be removed as as as it is unzipped.

@Markynice
Copy link

if you set the reader to process csv files, then yes, it will not delete the zip, it is not provided for.
There was already such a question #225

@rouellet99
Copy link
Contributor Author

Thanks for letting me know. As I can see, the issue #225 was closed with no action taken. Maybe, it wasn't considered as an issue, but, in my opinion, it is one.

Concening your comment, what you mean by this "if you set the reader to process csv files, then yes, it will not delete the zip, it is not provided for." ? Looking at the code, this behavior is for all supported file types when using LocalFSDirectoryListing and SftpFilesystemListing.

@rouellet99
Copy link
Contributor Author

Hello @fhussonnois

Within the method: listEligibleFiles of the class: LocalFSDirectoryListing, wolud it be an option to add this code
try {
final Path decompressed = codec.decompress(file).toPath();
listingLocalFiles.addAll(listEligibleFiles(decompressed));
decompressedDirs.add(decompressed);
if (decompressed.toFile().exists()) {
file.delete();
}
LOG.debug("Compressed file deleted successfully : {}", path);
} catch (IOException | SecurityException e) {
if (e instanceof IOException) {
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
} else if (e instanceof SecurityException) {
LOG.warn("Error while deleting input file '{}'. Skip and continue.", path, e);
}
}

instead of this
try {
final Path decompressed = codec.decompress(file).toPath();
listingLocalFiles.addAll(listEligibleFiles(decompressed));
decompressedDirs.add(decompressed);
} catch (IOException e {
LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e);
}
}

In this way, the compressed file will be removed once decompressed successfully, unless it is mandatory to keep the compressed file within the directory.

@fhussonnois
Copy link
Member

Hi @rouellet99, it should be OK to remove the compressed file once uncompress. But this behavior should be enabled though a connector's config property to keep the current behavior if needed. Maybe we should add an property fs.delete.compress.files.enabled (default value: false). Contributions are welcome; thanks :)

@rouellet99
Copy link
Contributor Author

Hello @fhussonnois, for this improvement, do I create a new branch from master?

@fhussonnois
Copy link
Member

Hi @rouellet99, yes you can create a branch from the master and push your pull request for review. Thank you very much :)

@rouellet99
Copy link
Contributor Author

Hello @fhussonnois, I had to create a fork since I didn't have the write permission on the repo. I hope that this is not an issue. The following PR has been created: #629.

rouellet99 added a commit to rouellet99/kafka-connect-file-pulse that referenced this issue Apr 3, 2024
fix: (plugin): change the code for the configuration to delete the compressed file after extraction

LocalFSDirectoryListing.java:[104,5] (metrics) CyclomaticComplexity: Cyclomatic Complexity is 16 (max allowed is 15).

When using zipped file, is there any reason , why the compressed file is not removed from the directory once extracted? streamthoughts#603
fhussonnois pushed a commit that referenced this issue May 7, 2024
fix: (plugin): change the code for the configuration to delete the compressed file after extraction

LocalFSDirectoryListing.java:[104,5] (metrics) CyclomaticComplexity: Cyclomatic Complexity is 16 (max allowed is 15).

When using zipped file, is there any reason , why the compressed file is not removed from the directory once extracted? #603
github-actions bot added a commit that referenced this issue May 7, 2024
fix: (plugin): change the code for the configuration to delete the compressed file after extraction

LocalFSDirectoryListing.java:[104,5] (metrics) CyclomaticComplexity: Cyclomatic Complexity is 16 (max allowed is 15).

When using zipped file, is there any reason , why the compressed file is not removed from the directory once extracted? #603 695c6f2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants