Мне нужно регулярно запускать конвейер потока данных. FAQ для Dataflow гласит следующее:
Вы можете автоматизировать выполнение конвейера с помощью Google App Engine или настраиваемых (CRON) процессов заданий на GCE. В будущих выпусках SDK будут поддерживаться параметры командной строки для более детального контроля над управлением заданиями.
Я попытался запустить очень простой конвейер из своего Java-приложения, используя этот код:
public class MyAnalyticsServlet extends HttpServlet {
@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
resp.setContentType("text/plain");
if (req.getRequestURI().equals("/dataflow/test")) {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("redacted");
options.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("TestInput").from("gs://redacted/test/in.txt"))
.apply(new TestTransform())
.apply(TextIO.Write.named("TestOutput")
.to("gs://redacted/test")
.withNumShards(0));
p.run();
} else {
resp.setStatus(404);
resp.getWriter().println("Not Found");
return;
}
resp.getWriter().println("OK");
}
}
Я получаю следующую ошибку:
java.lang.IllegalArgumentException: Methods [setRunner(Class), getRunner()] on [com.google.cloud.dataflow.sdk.options.PipelineOptions] do not conform to being bean properties.
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
at com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.validateClass(PipelineOptionsFactory.java:1059)
...
Любые идеи?