diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..8731f78
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,54 @@
+
+*.class
+.*.swp
+.beamer
+# Package Files #
+*.jar
+*.war
+*.ear
+*.versionsBackup
+
+# Intellij Files & Dir #
+*.iml
+*.ipr
+*.iws
+atlassian-ide-plugin.xml
+out/
+.DS_Store
+./lib/
+.idea
+
+# Gradle Files & Dir #
+build/
+.gradle/
+.stickyStorage
+.build/
+target/
+
+# Node log
+npm-*.log
+logs/
+
+# Singlenode and test data files.
+/templates/
+/data/
+/data-fabric-tests/data/
+
+# ANTLR4
+/core/gen
+*.tokens
+DirectivesLexer.java
+DirectivesParser.java
+DirectivesBaseListener.java
+DirectivesBaseVisitor.java
+DirectivesListener.java
+DirectivesVisitor.java
+
+# generated by docs build
+*.py
+
+# Remove release.properties
+release.properties
+
+# Remove dev directory.
+dev
\ No newline at end of file
diff --git a/checkstyle.xml b/checkstyle.xml
new file mode 100644
index 0000000..04c6eda
--- /dev/null
+++ b/checkstyle.xml
@@ -0,0 +1,397 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/MarketoReportingPlugin-batchsource.md b/docs/MarketoReportingPlugin-batchsource.md
new file mode 100644
index 0000000..5451214
--- /dev/null
+++ b/docs/MarketoReportingPlugin-batchsource.md
@@ -0,0 +1,26 @@
+# Marketo Reporting Batch Source
+
+Description
+-----------
+This plugin is used to query Leads or Activities entities for specified date range from Marketo.
+
+Properties
+----------
+### General
+
+**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
+
+**Rest API endpoint:** Marketo rest API endpoint, unique for each client.
+### Authentication
+
+**Client ID:** Client ID.
+
+**Client Secret:** Client secret.
+
+### Report
+
+**Report Type:** Type of report. One of 'leads' or 'activities'.
+
+**Start Date:** Start date of report. In ISO 8601 format(1997-07-16T19:20:30+01:00).
+
+**End Date:** End date of report. In ISO 8601 format(1997-07-16T19:20:30+01:00).
\ No newline at end of file
diff --git a/icons/MarketoReportingPlugin-batchsource.png b/icons/MarketoReportingPlugin-batchsource.png
new file mode 100644
index 0000000..4abe7e7
Binary files /dev/null and b/icons/MarketoReportingPlugin-batchsource.png differ
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f7a867b
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,439 @@
+
+
+
+ 4.0.0
+
+ io.cdap.plugin
+ marketo-entity-plugin
+ 1.0.0-SNAPSHOT
+
+
+
+ sonatype
+ https://oss.sonatype.org/content/groups/public
+
+
+ sonatype-snapshots
+ https://oss.sonatype.org/content/repositories/snapshots
+
+
+
+
+ 6.1.0-SNAPSHOT
+ 2.8.0
+ 4.5.9
+ 2.3.0-SNAPSHOT
+ 2.1.3
+
+
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-etl-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-formats
+ ${cdap.version}
+
+
+ org.apache.avro
+ avro
+
+
+ io.thekraken
+ grok
+
+
+
+
+ io.cdap.plugin
+ hydrator-common
+ ${hydrator.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ provided
+
+
+ commons-logging
+ commons-logging
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ com.google.guava
+ guava
+
+
+ jersey-core
+ com.sun.jersey
+
+
+ jersey-json
+ com.sun.jersey
+
+
+ jersey-server
+ com.sun.jersey
+
+
+ servlet-api
+ javax.servlet
+
+
+ org.mortbay.jetty
+ jetty
+
+
+ org.mortbay.jetty
+ jetty-util
+
+
+ jasper-compiler
+ tomcat
+
+
+ jasper-runtime
+ tomcat
+
+
+ jsp-api
+ javax.servlet.jsp
+
+
+ slf4j-api
+ org.slf4j
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+ provided
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ com.google.inject.extensions
+ guice-servlet
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-server
+
+
+ com.sun.jersey
+ jersey-json
+
+
+ com.sun.jersey.contribs
+ jersey-guice
+
+
+ javax.servlet
+ servlet-api
+
+
+ com.google.guava
+ guava
+
+
+
+
+
+ io.cdap.cdap
+ cdap-etl-api-spark
+ ${cdap.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming_2.11
+ ${spark2.version}
+ provided
+
+
+ org.apache.spark
+ spark-core_2.11
+ ${spark2.version}
+ provided
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ com.esotericsoftware.reflectasm
+ reflectasm
+
+
+ org.apache.curator
+ curator-recipes
+
+
+
+ org.scala-lang
+ scala-compiler
+
+
+ org.scala-lang
+ scala-reflect
+
+
+ org.eclipse.jetty.orbit
+ javax.servlet
+
+
+
+ net.java.dev.jets3t
+ jets3t
+
+
+ asm
+ asm
+
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpcomponents.version}
+
+
+ com.google.code.gson
+ gson
+ 2.8.6
+
+
+ org.apache.commons
+ commons-csv
+ 1.7
+
+
+ commons-io
+ commons-io
+ 2.6
+
+
+ org.awaitility
+ awaitility
+ 4.0.1
+
+
+
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ com.github.tomakehurst
+ wiremock-jre8-standalone
+ 2.25.1
+ test
+
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.13
+
+
+ org.apache.maven.doxia
+ doxia-core
+ 1.6
+
+
+ xerces
+ xercesImpl
+
+
+
+
+
+
+ rat-check
+ validate
+
+ check
+
+
+
+ LICENSE*.txt
+
+ *.rst
+ *.md
+ **/*.cdap
+ **/*.yaml
+ **/*.md
+ logs/**
+ .git/**
+ .idea/**
+ **/grok/patterns/**
+ conf/**
+ data/**
+ plugins/**
+ **/*.patch
+ **/logrotate.d/**
+ **/limits.d/**
+ **/*.json
+ **/*.json.template
+ **/MANIFEST.MF
+
+ **/org/apache/hadoop/**
+
+ **/resources/**
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.17
+
+
+ validate
+ process-test-classes
+
+ checkstyle.xml
+ suppressions.xml
+ UTF-8
+ true
+ true
+ true
+ **/org/apache/cassandra/**,**/org/apache/hadoop/**
+
+
+ check
+
+
+
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 8.18
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+
+
+
+ io.cdap
+ cdap-maven-plugin
+ 1.1.0
+
+
+ system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
+
+
+
+
+ create-artifact-config
+ prepare-package
+
+ create-plugin-json
+
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 3.5.0
+ true
+
+
+ <_exportcontents>io.cdap.plugin.marketo.*
+ *;inline=false;scope=compile
+ true
+ lib
+
+
+
+
+ package
+
+ bundle
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/Helpers.java b/src/main/java/io/cdap/plugin/marketo/common/api/Helpers.java
new file mode 100644
index 0000000..bda2387
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/Helpers.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import io.cdap.plugin.marketo.common.api.entities.DateRange;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URIBuilder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Various helper methods.
+ */
+public class Helpers {
+ public static String streamToString(InputStream inputStream) {
+ try {
+ return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Failed to read stream completely due to '%s'", e.getMessage()));
+ }
+ }
+
+ public static T streamToObject(InputStream inputStream, Class cls) {
+ return Marketo.GSON.fromJson(new InputStreamReader(inputStream), cls);
+ }
+
+ public static RuntimeException failForMethodAndUri(String method, URI uri, Exception ex) {
+ String message = ex.getMessage();
+ if (Strings.isNullOrEmpty(message)) {
+ if (ex.getCause() != null) {
+ message = ex.getCause().getMessage();
+ if (Strings.isNullOrEmpty(message)) {
+ message = "Unknown failure";
+ }
+ }
+ }
+
+ URIBuilder uriBuilder = new URIBuilder(uri);
+ List queryParameters = uriBuilder.getQueryParams();
+ queryParameters.removeIf(queryParameter -> queryParameter.getName().equals("access_token"));
+ uriBuilder.setParameters(queryParameters);
+ try {
+ String uriString = uriBuilder.build().toString();
+ return new RuntimeException(String.format("Failed '%s' '%s' - '%s'", method, uriString, message));
+ } catch (URISyntaxException e) {
+ // this will never happen since we rebuilding already validated uri, just make compiler happy
+ return new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Splits date range in 30 day ranges.
+ * Return date range as is if difference is less or equals to 30 days.
+ *
+ * @param beginDate
+ * @param endDate
+ * @return
+ */
+ public static List getDateRanges(String beginDate, String endDate) {
+ OffsetDateTime start = OffsetDateTime.parse(beginDate);
+ OffsetDateTime end = OffsetDateTime.parse(endDate);
+
+ if (start.compareTo(end) > 0) {
+ throw new IllegalArgumentException("Start date cannot be greater than the end date.");
+ }
+
+ int compareResult = start.plusDays(30).compareTo(end);
+ if (compareResult >= 0) {
+ // we are in range of 30 days, dates are okay
+ return ImmutableList.of(new DateRange(start.toString(), end.toString()));
+ } else {
+ List result = new ArrayList<>();
+ OffsetDateTime currentStart = start;
+
+ while (currentStart.compareTo(end) < 0) {
+ OffsetDateTime nextEnd = currentStart.plusDays(30);
+ result.add(new DateRange(currentStart.toString(),
+ min(nextEnd.minusSeconds(1), end).toString()));
+ currentStart = nextEnd;
+ }
+
+ return result;
+ }
+ }
+
+ public static > T min(T o1, T o2) {
+ if (o1.compareTo(o2) < 0) {
+ return o1;
+ } else {
+ return o2;
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/Marketo.java b/src/main/java/io/cdap/plugin/marketo/common/api/Marketo.java
new file mode 100644
index 0000000..4ccd799
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/Marketo.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExport;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportRequest;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportResponse;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivityTypeResponse;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsDescribeResponse;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExport;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportRequest;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportResponse;
+import io.cdap.plugin.marketo.common.api.job.ActivitiesExportJob;
+import io.cdap.plugin.marketo.common.api.job.LeadsExportJob;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Class that expose marketo rest api endpoints.
+ */
+public class Marketo extends MarketoHttp {
+ private static final Logger LOG = LoggerFactory.getLogger(Marketo.class);
+ static final Gson GSON = new Gson();
+ public static final List ACTIVITY_FIELDS = ImmutableList.of("marketoGUID", "leadId", "activityDate",
+ "activityTypeId", "campaignId",
+ "primaryAttributeValueId",
+ "primaryAttributeValue", "attributes");
+ /**
+ * Job queue will be checked every 60 seconds.
+ */
+ private static final long JOB_QUEUE_POLL_INTERVAL = 60;
+ /**
+ * Wait for 10 seconds before trying to enqueue job, this will minimize chance of race condition.
+ */
+ private static final long JOB_QUEUE_POLL_DELAY = 10;
+
+ public Marketo(String marketoEndpoint, String clientId, String clientSecret) {
+ super(marketoEndpoint, clientId, clientSecret);
+ }
+
+ public List describeLeads() {
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
+ iteratePage(Urls.LEADS_DESCRIBE, LeadsDescribeResponse.class, LeadsDescribeResponse::getResult),
+ Spliterator.ORDERED), false).collect(Collectors.toList());
+ }
+
+ public List describeBuildInActivities() {
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
+ iteratePage(Urls.BUILD_IN_ACTIVITIES_TYPES, ActivityTypeResponse.class, ActivityTypeResponse::getResult),
+ Spliterator.ORDERED), false).collect(Collectors.toList());
+ }
+
+ public LeadsExportJob exportLeads(LeadsExportRequest request) {
+ LeadsExportResponse export = validatedPost(Urls.BULK_EXPORT_LEADS_CREATE, Collections.emptyMap(),
+ Marketo::streamToLeadsExport,
+ request,
+ GSON::toJson);
+ return new LeadsExportJob(export.singleExport(), this);
+ }
+
+ public LeadsExport leadsExportJobStatus(String jobId) {
+ LeadsExportResponse currentResp = validatedGet(
+ String.format(Urls.BULK_EXPORT_LEADS_STATUS, jobId),
+ Collections.emptyMap(), Marketo::streamToLeadsExport);
+ return currentResp.singleExport();
+ }
+
+ public ActivitiesExportJob exportActivities(ActivitiesExportRequest request) {
+ ActivitiesExportResponse export = validatedPost(Urls.BULK_EXPORT_ACTIVITIES_CREATE, Collections.emptyMap(),
+ Marketo::streamToActivitiesExport,
+ request,
+ GSON::toJson);
+ return new ActivitiesExportJob(export.singleExport(), this);
+ }
+
+ public ActivitiesExport activitiesExportJobStatus(String jobId) {
+ ActivitiesExportResponse currentResp = validatedGet(
+ String.format(Urls.BULK_EXPORT_ACTIVITIES_STATUS, jobId),
+ Collections.emptyMap(), Marketo::streamToActivitiesExport);
+ return currentResp.singleExport();
+ }
+
+ /**
+ * Waits until bulk extract queue has available slot and executes given action.
+ *
+ * @param action action to execute once slot is available
+ * @param timeoutSeconds timeout in seconds
+ */
+ public void onBulkExtractQueueAvailable(Callable action, long timeoutSeconds) {
+ try {
+ Awaitility.given()
+ .ignoreException(TooManyJobsException.class) // ignore exception in case another reader took our slot
+ .atMost(timeoutSeconds, TimeUnit.SECONDS)
+ .pollInterval(JOB_QUEUE_POLL_INTERVAL, TimeUnit.SECONDS)
+ .pollDelay(JOB_QUEUE_POLL_DELAY, TimeUnit.SECONDS)
+ .until(action);
+ } catch (ConditionTimeoutException ex) {
+ throw new RuntimeException("Failed to get slot in bulk export queue due to timeout");
+ }
+ }
+
+ public static LeadsExportResponse streamToLeadsExport(InputStream inputStream) {
+ return Helpers.streamToObject(inputStream, LeadsExportResponse.class);
+ }
+
+ public static ActivitiesExportResponse streamToActivitiesExport(InputStream inputStream) {
+ return Helpers.streamToObject(inputStream, ActivitiesExportResponse.class);
+ }
+
+ /**
+ * Check if job can be enqueued.
+ *
+ * @return true, if job can be enqueued
+ */
+ public boolean canEnqueueJob() {
+ LeadsExportResponse leadsExportResponseJobs = validatedGet(Urls.BULK_EXPORT_LEADS_LIST,
+ ImmutableMap.of("status", "queued,processing"),
+ Marketo::streamToLeadsExport
+ );
+
+ int jobsInQueue = leadsExportResponseJobs.getResult().size();
+
+ ActivitiesExportResponse activitiesExportResponceJobs = validatedGet(Urls.BULK_EXPORT_ACTIVITIES_LIST,
+ ImmutableMap.of("status", "queued,processing"),
+ Marketo::streamToActivitiesExport
+ );
+ jobsInQueue += activitiesExportResponceJobs.getResult().size();
+
+ LOG.debug("Jobs in queue: {}", jobsInQueue);
+
+ return jobsInQueue < 10;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/MarketoHttp.java b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoHttp.java
new file mode 100644
index 0000000..44c8698
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoHttp.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+import io.cdap.plugin.marketo.common.api.entities.Error;
+import io.cdap.plugin.marketo.common.api.entities.MarketoToken;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Class that encapsulates common http functions for marketo rest api.
+ */
+class MarketoHttp {
+ private static final Logger LOG = LoggerFactory.getLogger(Marketo.class);
+ private static final Gson GSON = new Gson();
+ private String marketoEndpoint;
+ private String clientId;
+ private String clientSecret;
+ private MarketoToken token;
+ private HttpClientContext httpClientContext = HttpClientContext.create();
+
+ MarketoHttp(String marketoEndpoint, String clientId, String clientSecret) {
+ this.marketoEndpoint = marketoEndpoint;
+ this.clientId = clientId;
+ this.clientSecret = clientSecret;
+ token = refreshToken();
+ }
+
+ private T getPage(String queryUrl, Class pageClass) {
+ return validatedGet(queryUrl, Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, pageClass));
+ }
+
+ T getNextPage(T currentPage, String queryUrl, Class pageClass) {
+ if (!Strings.isNullOrEmpty(currentPage.getNextPageToken())) {
+ return validatedGet(queryUrl,
+ ImmutableMap.of("nextPageToken", currentPage.getNextPageToken()),
+ inputStream -> Helpers.streamToObject(inputStream, pageClass));
+ }
+ return null;
+ }
+
+ MarketoPageIterator iteratePage(String queryUrl,
+ Class pageClass,
+ Function> resultsGetter) {
+ return new MarketoPageIterator<>(getPage(queryUrl, pageClass), this, queryUrl, pageClass, resultsGetter);
+ }
+
+ T validatedGet(String queryUrl, Map parameters,
+ Function deserializer) {
+ String logUri = "GET " + buildUri(queryUrl, parameters, false).toString();
+ return retryableValidate(logUri, () -> {
+ URI queryUri = buildUri(queryUrl, parameters, true);
+ return get(queryUri, deserializer);
+ });
+ }
+
+ public T validatedPost(String queryUrl, Map parameters,
+ Function deserializer,
+ B body, Function qSerializer) {
+ String logUri = "POST " + buildUri(queryUrl, parameters, false).toString();
+ return retryableValidate(logUri, () -> {
+ URI queryUri = buildUri(queryUrl, parameters, true);
+ return post(queryUri, deserializer, body, qSerializer);
+ });
+ }
+
+ // code: 1029, message: Too many jobs (10) in queue
+ private T retryableValidate(String logUri, Supplier tryQuery) {
+ T result = tryQuery.get();
+ // check for expired token
+ if (!result.isSuccess()) {
+ for (Error error : result.getErrors()) {
+ if (error.getCode() == 602 && error.getMessage().equals("Access token expired")) {
+ // refresh token and retry
+ token = refreshToken();
+ LOG.info("Refreshed token");
+ return tryQuery.get();
+ }
+ }
+ }
+
+ // log warnings if required
+ if (result.getWarnings().size() > 0) {
+ String warnings = result.getWarnings().stream()
+ .map(error -> String.format("code: %s, message: %s", error.getCode(), error.getMessage()))
+ .collect(Collectors.joining("; "));
+ LOG.warn("Warnings when calling '{}' - {}", logUri, warnings);
+ }
+
+ if (!result.isSuccess()) {
+ String msg = String.format("Errors when calling '%s'", logUri);
+ // log errors if required
+ if (result.getErrors().size() > 0) {
+ String errors = result.getErrors().stream()
+ .map(error -> String.format("code: %s, message: %s", error.getCode(), error.getMessage()))
+ .collect(Collectors.joining("; "));
+ msg = msg + " - " + errors;
+ LOG.error(msg);
+ }
+ throw mapErrorsToException(result.getErrors(), msg);
+ }
+ return result;
+ }
+
+ private RuntimeException mapErrorsToException(List errors, String defaultMessage) {
+ if (errors.size() == 1) {
+ Error e = errors.get(0);
+ String message = e.getMessage();
+ if (e.getCode() == 1029 && message != null && message.contains("many jobs")) {
+ return new TooManyJobsException();
+ } else {
+ // this error don't require specific handling
+ return new RuntimeException(defaultMessage);
+ }
+ } else {
+ // something outstanding happened and we have more than one error, we can't handle this in specific way
+ return new RuntimeException(defaultMessage);
+ }
+ }
+
+ public T get(URI uri, Function deserializer) {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ HttpGet request = new HttpGet(uri);
+ try (CloseableHttpResponse response = httpClient.execute(request, httpClientContext)) {
+ checkResponseCode(response);
+ return deserializer.apply(response.getEntity().getContent());
+ }
+ } catch (Exception e) {
+ throw Helpers.failForMethodAndUri("GET", uri, e);
+ }
+ }
+
+ private T post(URI uri, Function respDeserializer, B body, Function qSerializer) {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ HttpPost request = new HttpPost(uri);
+ if (body != null) {
+ Objects.requireNonNull(qSerializer, "body serializer must be specified with body");
+ request.setEntity(new StringEntity(qSerializer.apply(body), ContentType.APPLICATION_JSON));
+ }
+ try (CloseableHttpResponse response = httpClient.execute(request, httpClientContext)) {
+ checkResponseCode(response);
+ return respDeserializer.apply(response.getEntity().getContent());
+ }
+ } catch (Exception e) {
+ throw Helpers.failForMethodAndUri("POST", uri, e);
+ }
+ }
+
+ private static void checkResponseCode(CloseableHttpResponse response) throws IOException {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode >= 300) {
+ String responseBody = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+ throw new RuntimeException(String.format("Http code '%s', response '%s'", statusCode, responseBody));
+ }
+ }
+
+ public URI buildUri(String queryUrl, Map parameters) {
+ return buildUri(queryUrl, parameters, true);
+ }
+
+ URI buildUri(String queryUrl, Map parameters, boolean includeToken) {
+ try {
+ URIBuilder builder = new URIBuilder(marketoEndpoint + queryUrl);
+ parameters.forEach(builder::setParameter);
+ if (includeToken) {
+ builder.setParameter("access_token", token.getAccessToken());
+ }
+ return builder.build();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(String.format("'%s' is invalid URI", marketoEndpoint + queryUrl));
+ }
+ }
+
+ MarketoToken getCurrentToken() {
+ return this.token;
+ }
+
+ private MarketoToken refreshToken() {
+ LOG.debug("Requesting marketo token");
+ URI getTokenUri = buildUri("/identity/oauth/token",
+ ImmutableMap.of("grant_type", "client_credentials", "client_id", clientId,
+ "client_secret", clientSecret), false);
+ return get(getTokenUri, inputStream -> GSON.fromJson(new InputStreamReader(inputStream), MarketoToken.class));
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/MarketoPageIterator.java b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoPageIterator.java
new file mode 100644
index 0000000..1de07bf
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/MarketoPageIterator.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+
+/**
+ * Marketo page iterator.
+ *
+ * @param type of page response
+ * @param type of page item entity
+ */
+public class MarketoPageIterator implements Iterator {
+ private T currentPage;
+ private MarketoHttp marketo;
+ private String queryUrl;
+ private Class pageClass;
+ private Function> resultsGetter;
+ private Iterator currentPageResultIterator;
+
+ MarketoPageIterator(T page, MarketoHttp marketo, String queryUrl, Class pageClass,
+ Function> resultsGetter) {
+ this.currentPage = page;
+ this.marketo = marketo;
+ this.queryUrl = queryUrl;
+ this.pageClass = pageClass;
+ this.resultsGetter = resultsGetter;
+ currentPageResultIterator = resultsGetter.apply(this.currentPage).iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentPageResultIterator.hasNext()) {
+ return true;
+ } else {
+ T nextPage = marketo.getNextPage(currentPage, queryUrl, pageClass);
+ if (nextPage != null) {
+ currentPage = nextPage;
+ currentPageResultIterator = resultsGetter.apply(this.currentPage).iterator();
+ return hasNext();
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public I next() {
+ if (hasNext()) {
+ return currentPageResultIterator.next();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/TooManyJobsException.java b/src/main/java/io/cdap/plugin/marketo/common/api/TooManyJobsException.java
new file mode 100644
index 0000000..ca23321
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/TooManyJobsException.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+/**
+ * Exception thrown if too many jobs already queued.
+ */
+public class TooManyJobsException extends RuntimeException {
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/Urls.java b/src/main/java/io/cdap/plugin/marketo/common/api/Urls.java
new file mode 100644
index 0000000..32479bb
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/Urls.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+/**
+ * Marketo API urls.
+ */
+public class Urls {
+ public static final String LEADS_DESCRIBE = "/rest/v1/leads/describe.json";
+ public static final String BULK_EXPORT_LEADS_LIST = "/bulk/v1/leads/export.json";
+ public static final String BULK_EXPORT_LEADS_CREATE = "/bulk/v1/leads/export/create.json";
+ public static final String BULK_EXPORT_LEADS_ENQUEUE = "/bulk/v1/leads/export/%s/enqueue.json";
+ public static final String BULK_EXPORT_LEADS_STATUS = "/bulk/v1/leads/export/%s/status.json";
+ public static final String BULK_EXPORT_LEADS_FILE = "/bulk/v1/leads/export/%s/file.json";
+ public static final String BULK_EXPORT_ACTIVITIES_LIST = "/bulk/v1/activities/export.json";
+ public static final String BULK_EXPORT_ACTIVITIES_CREATE = "/bulk/v1/activities/export/create.json";
+ public static final String BULK_EXPORT_ACTIVITIES_ENQUEUE = "/bulk/v1/activities/export/%s/enqueue.json";
+ public static final String BULK_EXPORT_ACTIVITIES_STATUS = "/bulk/v1/activities/export/%s/status.json";
+ public static final String BULK_EXPORT_ACTIVITIES_FILE = "/bulk/v1/activities/export/%s/file.json";
+ public static final String BUILD_IN_ACTIVITIES_TYPES = "/rest/v1/activities/types.json";
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/BaseResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/BaseResponse.java
new file mode 100644
index 0000000..2f679ca
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/BaseResponse.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents common parts for all responses.
+ */
+public class BaseResponse {
+
+ private boolean success = false;
+ private List errors = Collections.emptyList();
+ private List warnings = Collections.emptyList();
+ private String requestId;
+ private boolean moreResult = false;
+ private String nextPageToken;
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public List getErrors() {
+ return errors;
+ }
+
+ public void setErrors(List errors) {
+ this.errors = errors;
+ }
+
+ public List getWarnings() {
+ return warnings;
+ }
+
+ public void setWarnings(List warnings) {
+ this.warnings = warnings;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public boolean isMoreResult() {
+ return moreResult;
+ }
+
+ public void setMoreResult(boolean moreResult) {
+ this.moreResult = moreResult;
+ }
+
+ public String getNextPageToken() {
+ return nextPageToken;
+ }
+
+ public void setNextPageToken(String nextPageToken) {
+ this.nextPageToken = nextPageToken;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/DateRange.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/DateRange.java
new file mode 100644
index 0000000..9e71503
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/DateRange.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities;
+
+/**
+ * Represents date range.
+ */
+public class DateRange {
+ String endAt;
+ String startAt;
+
+ public DateRange() {
+
+ }
+
+ public DateRange(String startAt, String endAt) {
+ this.endAt = endAt;
+ this.startAt = startAt;
+ }
+
+ public String getEndAt() {
+ return endAt;
+ }
+
+ public String getStartAt() {
+ return startAt;
+ }
+
+ @Override
+ public String toString() {
+ return startAt + " -- " + endAt;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/Error.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Error.java
new file mode 100644
index 0000000..594e9cb
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Error.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities;
+
+/**
+ * Represents error message.
+ */
+public class Error {
+ private int code;
+ private String message;
+
+ public Error(int code, String message) {
+ this.code = code;
+ this.message = message;
+ }
+
+ public Error() {
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("code: %d, message: %s", code, message);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/MarketoToken.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/MarketoToken.java
new file mode 100644
index 0000000..258d93f
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/MarketoToken.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * Represents marketo token response.
+ */
+public class MarketoToken {
+ @SerializedName("access_token")
+ private String accessToken;
+ private String scope;
+ @SerializedName("expires_in")
+ private String expiresIn;
+ @SerializedName("token_type")
+ private String tokenType;
+
+ public MarketoToken(String accessToken, String scope, String expiresIn, String tokenType) {
+ this.accessToken = accessToken;
+ this.scope = scope;
+ this.expiresIn = expiresIn;
+ this.tokenType = tokenType;
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public String getScope() {
+ return scope;
+ }
+
+ public String getExpiresIn() {
+ return expiresIn;
+ }
+
+ public String getTokenType() {
+ return tokenType;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/Warning.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Warning.java
new file mode 100644
index 0000000..b1167fc
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/Warning.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities;
+
+/**
+ * Represents warning message.
+ */
+public class Warning {
+ private int code;
+ private String message;
+
+ public Warning(int code, String message) {
+ this.code = code;
+ this.message = message;
+ }
+
+ public Warning() {
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("code: %d, message: %s", code, message);
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExport.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExport.java
new file mode 100644
index 0000000..2cfc72e
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExport.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.activities;
+
+/**
+ * Represents export response item.
+ */
+public class ActivitiesExport {
+ String createdAt;
+ String errorMsg;
+ String exportId;
+ int fileSize;
+ String fileChecksum;
+ String finishedAt;
+ String format;
+ int numberOfRecords;
+ String queuedAt;
+ String startedAt;
+ String status;
+
+ public String getCreatedAt() {
+ return createdAt;
+ }
+
+ public String getErrorMsg() {
+ return errorMsg;
+ }
+
+ public String getExportId() {
+ return exportId;
+ }
+
+ public int getFileSize() {
+ return fileSize;
+ }
+
+ public String getFileChecksum() {
+ return fileChecksum;
+ }
+
+ public String getFinishedAt() {
+ return finishedAt;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public int getNumberOfRecords() {
+ return numberOfRecords;
+ }
+
+ public String getQueuedAt() {
+ return queuedAt;
+ }
+
+ public String getStartedAt() {
+ return startedAt;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportRequest.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportRequest.java
new file mode 100644
index 0000000..31fc681
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportRequest.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.activities;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents activities bulk export request.
+ */
+public class ActivitiesExportRequest {
+
+ Map columnHeaderNames = null;
+ List fields = null;
+ ExportActivityFilter filter = null;
+ String format = "CSV";
+
+ public ActivitiesExportRequest(List fields, ExportActivityFilter filter) {
+ this.fields = fields;
+ this.filter = filter;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportResponse.java
new file mode 100644
index 0000000..0b19058
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivitiesExportResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.activities;
+
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents activities bulk export response.
+ */
+public class ActivitiesExportResponse extends BaseResponse {
+
+ List result = Collections.emptyList();
+
+ public ActivitiesExport singleExport() {
+ if (result.size() != 1) {
+ throw new IllegalStateException(
+ String.format("Expected single export job result, but found '%s' results.", result.size()));
+ }
+ return result.get(0);
+ }
+
+ public List getResult() {
+ return result;
+ }
+
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivityTypeResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivityTypeResponse.java
new file mode 100644
index 0000000..b853484
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ActivityTypeResponse.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.activities;
+
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Activity type response.
+ */
+public class ActivityTypeResponse extends BaseResponse {
+ /**
+ * Activity type attribute.
+ */
+ public static class ActivityTypeAttribute {
+ private String apiName;
+ private String dataType;
+ private String name;
+
+ public String getApiName() {
+ return apiName;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(%s, %s)", getApiName(), getDataType(), getName());
+ }
+ }
+
+ /**
+ * Attribute type.
+ */
+ public static class ActivityType {
+ private String apiName;
+ private List attributes;
+ private String description;
+ private Integer id;
+ private String name;
+ private ActivityTypeAttribute primaryAttribute;
+
+ public String getApiName() {
+ return apiName;
+ }
+
+ public List getAttributes() {
+ return attributes;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public ActivityTypeAttribute getPrimaryAttribute() {
+ return primaryAttribute;
+ }
+ }
+
+ List result = Collections.emptyList();
+
+ public List getResult() {
+ return result;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ExportActivityFilter.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ExportActivityFilter.java
new file mode 100644
index 0000000..5e225cf
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/activities/ExportActivityFilter.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.activities;
+
+import io.cdap.plugin.marketo.common.api.entities.DateRange;
+
+import java.util.List;
+
+/**
+ * Represents request filter.
+ */
+public class ExportActivityFilter {
+ private List activityTypeIds;
+ private DateRange createdAt;
+
+ /**
+ * Builder.
+ */
+ public static class Builder {
+
+ private List activityTypeIds = null;
+ private DateRange createdAt = null;
+
+ public Builder() {
+ }
+
+ Builder(List activityTypeIds, DateRange createdAt) {
+ this.activityTypeIds = activityTypeIds;
+ this.createdAt = createdAt;
+ }
+
+ public Builder activityTypeIds(List activityTypeIds) {
+ this.activityTypeIds = activityTypeIds;
+ return Builder.this;
+ }
+
+ public Builder addActivityTypeIds(Integer activityTypeIds) {
+ this.activityTypeIds.add(activityTypeIds);
+ return Builder.this;
+ }
+
+ public Builder createdAt(DateRange createdAt) {
+ this.createdAt = createdAt;
+ return Builder.this;
+ }
+
+ public ExportActivityFilter build() {
+
+ return new ExportActivityFilter(this);
+ }
+ }
+
+ private ExportActivityFilter(Builder builder) {
+ this.activityTypeIds = builder.activityTypeIds;
+ this.createdAt = builder.createdAt;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsDescribeResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsDescribeResponse.java
new file mode 100644
index 0000000..afd89ec
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsDescribeResponse.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.leads;
+
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents leads describe response.
+ */
+public class LeadsDescribeResponse extends BaseResponse {
+ /**
+ * Represents lead field description.
+ */
+ public static class LeadAttribute {
+ String dataType;
+ String displayName;
+ int id;
+ int length;
+ LeadMapAttribute rest;
+ LeadMapAttribute soap;
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public LeadMapAttribute getRest() {
+ return rest;
+ }
+
+ public LeadMapAttribute getSoap() {
+ return soap;
+ }
+ }
+
+ /**
+ * Represents leads field name.
+ */
+ public static class LeadMapAttribute {
+ String name;
+ boolean readOnly = true;
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isReadOnly() {
+ return readOnly;
+ }
+ }
+
+ List result = Collections.emptyList();
+
+ public List getResult() {
+ return result;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExport.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExport.java
new file mode 100644
index 0000000..f833207
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExport.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.leads;
+
+/**
+ * Represents export response item.
+ */
+public class LeadsExport {
+ String createdAt;
+ String errorMsg;
+ String exportId;
+ int fileSize;
+ String fileChecksum;
+ String finishedAt;
+ String format;
+ int numberOfRecords;
+ String queuedAt;
+ String startedAt;
+ String status;
+
+ public String getCreatedAt() {
+ return createdAt;
+ }
+
+ public String getErrorMsg() {
+ return errorMsg;
+ }
+
+ public String getExportId() {
+ return exportId;
+ }
+
+ public int getFileSize() {
+ return fileSize;
+ }
+
+ public String getFileChecksum() {
+ return fileChecksum;
+ }
+
+ public String getFinishedAt() {
+ return finishedAt;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public int getNumberOfRecords() {
+ return numberOfRecords;
+ }
+
+ public String getQueuedAt() {
+ return queuedAt;
+ }
+
+ public String getStartedAt() {
+ return startedAt;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportRequest.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportRequest.java
new file mode 100644
index 0000000..e2035ba
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportRequest.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.leads;
+
+import io.cdap.plugin.marketo.common.api.entities.DateRange;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents leads bulk export request.
+ */
+public class LeadsExportRequest {
+ /**
+ * Represents request filter.
+ */
+ public static class ExportLeadFilter {
+ DateRange createdAt = null;
+ Integer smartListId = null;
+ String smartListName = null;
+ Integer staticListId = null;
+ Integer staticListName = null;
+ DateRange updatedAt = null;
+
+ /**
+ * Builder for ExportLeadFilter.
+ */
+ public static class Builder {
+ private DateRange createdAt = null;
+ private Integer smartListId = null;
+ private String smartListName = null;
+ private Integer staticListId = null;
+ private Integer staticListName = null;
+ private DateRange updatedAt = null;
+
+ public Builder() {
+ }
+
+ public Builder createdAt(DateRange createdAt) {
+ this.createdAt = createdAt;
+ return Builder.this;
+ }
+
+ public Builder smartListId(Integer smartListId) {
+ this.smartListId = smartListId;
+ return Builder.this;
+ }
+
+ public Builder smartListName(String smartListName) {
+ this.smartListName = smartListName;
+ return Builder.this;
+ }
+
+ public Builder staticListId(Integer staticListId) {
+ this.staticListId = staticListId;
+ return Builder.this;
+ }
+
+ public Builder staticListName(Integer staticListName) {
+ this.staticListName = staticListName;
+ return Builder.this;
+ }
+
+ public Builder updatedAt(DateRange updatedAt) {
+ this.updatedAt = updatedAt;
+ return Builder.this;
+ }
+
+ public ExportLeadFilter build() {
+
+ return new ExportLeadFilter(this);
+ }
+ }
+
+ private ExportLeadFilter(Builder builder) {
+ this.createdAt = builder.createdAt;
+ this.smartListId = builder.smartListId;
+ this.smartListName = builder.smartListName;
+ this.staticListId = builder.staticListId;
+ this.staticListName = builder.staticListName;
+ this.updatedAt = builder.updatedAt;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+ }
+
+ Map columnHeaderNames = null;
+ List fields = null;
+ ExportLeadFilter filter = null;
+ String format = "CSV";
+
+ public LeadsExportRequest(List fields, ExportLeadFilter filter) {
+ this.fields = fields;
+ this.filter = filter;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportResponse.java b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportResponse.java
new file mode 100644
index 0000000..bb58467
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/entities/leads/LeadsExportResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.entities.leads;
+
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents leads bulk export response.
+ */
+public class LeadsExportResponse extends BaseResponse {
+
+ List result = Collections.emptyList();
+
+ public LeadsExport singleExport() {
+ if (result.size() != 1) {
+ throw new IllegalStateException("Expected single export job result.");
+ }
+ return result.get(0);
+ }
+
+ public List getResult() {
+ return result;
+ }
+
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/job/AbstractBulkExportJob.java b/src/main/java/io/cdap/plugin/marketo/common/api/job/AbstractBulkExportJob.java
new file mode 100644
index 0000000..de21ca8
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/job/AbstractBulkExportJob.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.job;
+
+import io.cdap.plugin.marketo.common.api.Helpers;
+import io.cdap.plugin.marketo.common.api.Marketo;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base bulk export job wrapper.
+ *
+ * @param object that represents job status
+ */
+public abstract class AbstractBulkExportJob {
+ private static final List WAIT_ABLE_STATE = Arrays.asList("Queued", "Processing");
+ private static final String ENQUEUE_ABLE_STATUS = "Created";
+ private static final String COMPLETED_STATUS = "Completed";
+
+ private String jobId;
+ private T lastState;
+ private Marketo marketo;
+
+ /**
+ * @param jobId job id
+ * @param lastState last status
+ * @param marketo marketo api instance
+ */
+ public AbstractBulkExportJob(String jobId,
+ T lastState,
+ Marketo marketo) {
+
+ this.jobId = jobId;
+ this.lastState = lastState;
+ this.marketo = marketo;
+ getLogger().info("{} - created job '{}'", getLogPrefix(), this.jobId);
+ }
+
+ public abstract Logger getLogger();
+
+ public abstract T getFreshState();
+
+ public abstract String getStateStatus(T state);
+
+ public abstract String getFileUrlTemplate();
+
+ public abstract String getLogPrefix();
+
+ protected abstract T enqueueImpl();
+
+ public T getLastState() {
+ return lastState;
+ }
+
+ public Marketo getMarketo() {
+ return marketo;
+ }
+
+ public void waitCompletion() {
+ if (!WAIT_ABLE_STATE.contains(getStateStatus(getLastState()))) {
+ throw new IllegalStateException("Job must be enqueued before waiting for completion.");
+ }
+
+ do {
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(30));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Failed to wait for job completion - interrupted");
+ }
+
+ T newState = getFreshState();
+ logStatusChange(getStateStatus(getLastState()), getStateStatus(newState));
+ lastState = newState;
+ } while (WAIT_ABLE_STATE.contains(getStateStatus(getLastState())));
+
+ if (!getStateStatus(getLastState()).equals(COMPLETED_STATUS)) {
+ throw new IllegalStateException("Job expected to be in Completed state, but was in " +
+ getStateStatus(getLastState()));
+ }
+ }
+
+ public boolean enqueue() {
+ if (!getStateStatus(getLastState()).equals(ENQUEUE_ABLE_STATUS)) {
+ throw new IllegalStateException("Job must be in Created status before enqueuing, but was in " +
+ getStateStatus(getLastState()));
+ }
+
+ if (!marketo.canEnqueueJob()) {
+ return false;
+ }
+
+ T newState = enqueueImpl();
+
+ logStatusChange(getStateStatus(getLastState()), getStateStatus(newState));
+
+ if (!(getStateStatus(newState).equals("Queued") || getStateStatus(newState).equals("Processing"))) {
+ throw new IllegalStateException(
+ String.format("Expected Queued|Processing state for job '%s' but got '%s'", jobId, getStateStatus(newState)));
+ }
+
+ lastState = newState;
+
+ return true;
+ }
+
+ private void logStatusChange(String oldStatus, String newStatus) {
+ if (!oldStatus.equals(newStatus)) {
+ getLogger().info("{} - job '{}' changed state '{}' -> '{}'", getLogPrefix(), jobId, oldStatus, newStatus);
+ }
+ }
+
+ public String getFile() {
+ return marketo.get(marketo.buildUri(String.format(getFileUrlTemplate(), jobId), Collections.emptyMap()),
+ Helpers::streamToString);
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/job/ActivitiesExportJob.java b/src/main/java/io/cdap/plugin/marketo/common/api/job/ActivitiesExportJob.java
new file mode 100644
index 0000000..7a6c35d
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/job/ActivitiesExportJob.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.job;
+
+import io.cdap.plugin.marketo.common.api.Helpers;
+import io.cdap.plugin.marketo.common.api.Marketo;
+import io.cdap.plugin.marketo.common.api.Urls;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExport;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Activities export job.
+ */
+public class ActivitiesExportJob extends AbstractBulkExportJob {
+ private static final Logger LOG = LoggerFactory.getLogger(ActivitiesExportJob.class);
+
+ public ActivitiesExportJob(ActivitiesExport lastState, Marketo marketo) {
+ super(lastState.getExportId(), lastState, marketo);
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOG;
+ }
+
+ @Override
+ public ActivitiesExport getFreshState() {
+ return getMarketo().activitiesExportJobStatus(getJobId());
+ }
+
+ @Override
+ public String getStateStatus(ActivitiesExport state) {
+ return state.getStatus();
+ }
+
+ @Override
+ public String getFileUrlTemplate() {
+ return Urls.BULK_EXPORT_ACTIVITIES_FILE;
+ }
+
+ @Override
+ public String getLogPrefix() {
+ return "BULK ACTIVITIES EXPORT";
+ }
+
+ @Override
+ protected ActivitiesExport enqueueImpl() {
+ return getMarketo().validatedPost(
+ String.format(Urls.BULK_EXPORT_ACTIVITIES_ENQUEUE, getJobId()),
+ Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, ActivitiesExportResponse.class),
+ null, null).singleExport();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/common/api/job/LeadsExportJob.java b/src/main/java/io/cdap/plugin/marketo/common/api/job/LeadsExportJob.java
new file mode 100644
index 0000000..f96587f
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/common/api/job/LeadsExportJob.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api.job;
+
+import io.cdap.plugin.marketo.common.api.Helpers;
+import io.cdap.plugin.marketo.common.api.Marketo;
+import io.cdap.plugin.marketo.common.api.Urls;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExport;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+/**
+ * Leads export job.
+ */
+public class LeadsExportJob extends AbstractBulkExportJob {
+ private static final Logger LOG = LoggerFactory.getLogger(LeadsExportJob.class);
+
+ public LeadsExportJob(LeadsExport lastState, Marketo marketo) {
+ super(lastState.getExportId(), lastState, marketo);
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOG;
+ }
+
+ @Override
+ public LeadsExport getFreshState() {
+ return getMarketo().leadsExportJobStatus(getJobId());
+ }
+
+ @Override
+ public String getStateStatus(LeadsExport state) {
+ return state.getStatus();
+ }
+
+ @Override
+ public String getFileUrlTemplate() {
+ return Urls.BULK_EXPORT_LEADS_FILE;
+ }
+
+ @Override
+ public String getLogPrefix() {
+ return "BULK LEADS EXPORT";
+ }
+
+ @Override
+ protected LeadsExport enqueueImpl() {
+ return getMarketo().validatedPost(
+ String.format(Urls.BULK_EXPORT_LEADS_ENQUEUE, getJobId()),
+ Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, LeadsExportResponse.class),
+ null, null).singleExport();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormat.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormat.java
new file mode 100644
index 0000000..096b842
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import com.google.gson.Gson;
+import io.cdap.plugin.marketo.common.api.Helpers;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * InputFormat for mapreduce job, which provides a single split of data.
+ */
+public class MarketoInputFormat extends InputFormat {
+ private static final Gson GSON = new Gson();
+
+ @Override
+ public List getSplits(JobContext jobContext) {
+ Configuration conf = jobContext.getConfiguration();
+ MarketoReportingSourceConfig config = GSON.fromJson(
+ conf.get(MarketoInputFormatProvider.PROPERTY_CONFIG_JSON), MarketoReportingSourceConfig.class);
+
+ return Helpers.getDateRanges(config.getStartDate(), config.getEndDate()).stream()
+ .map(dateRange -> new MarketoReportingSplit(dateRange.getStartAt(), dateRange.getEndAt()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
+ MarketoReportingSplit split = (MarketoReportingSplit) inputSplit;
+ return new MarketoRecordReader(split.getBeginDate(), split.getEndDate());
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormatProvider.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormatProvider.java
new file mode 100644
index 0000000..8a70ed1
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoInputFormatProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.cdap.cdap.api.data.batch.InputFormatProvider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InputFormatProvider used by cdap to provide configurations to mapreduce job
+ */
+public class MarketoInputFormatProvider implements InputFormatProvider {
+ public static final String PROPERTY_CONFIG_JSON = "cdap.marketo.reporter.config";
+ private static final Gson gson = new GsonBuilder().create();
+ private final Map conf;
+
+
+ MarketoInputFormatProvider(MarketoReportingSourceConfig config) {
+ this.conf = Collections.unmodifiableMap(new HashMap() {{
+ put(PROPERTY_CONFIG_JSON, gson.toJson(config));
+ }});
+ }
+
+ @Override
+ public String getInputFormatClassName() {
+ return MarketoInputFormat.class.getName();
+ }
+
+ @Override
+ public Map getInputFormatConfiguration() {
+ return conf;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoRecordReader.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoRecordReader.java
new file mode 100644
index 0000000..4af14f6
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoRecordReader.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.marketo.common.api.Marketo;
+import io.cdap.plugin.marketo.common.api.entities.DateRange;
+import io.cdap.plugin.marketo.common.api.entities.activities.ActivitiesExportRequest;
+import io.cdap.plugin.marketo.common.api.entities.activities.ExportActivityFilter;
+import io.cdap.plugin.marketo.common.api.entities.leads.LeadsExportRequest;
+import io.cdap.plugin.marketo.common.api.job.AbstractBulkExportJob;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * RecordReader implementation, which reads events from Marketo api.
+ */
+public class MarketoRecordReader extends RecordReader> {
+ private static final Logger LOG = LoggerFactory.getLogger(MarketoRecordReader.class);
+ private static final Gson GSON = new GsonBuilder().create();
+ /**
+ * Wait for 25 minutes for available slot in queue.
+ */
+ private static final long JOB_ENQUEUE_TIMEOUT = 25 * 60;
+ private Map current = null;
+ private Iterator iterator = null;
+ private String beginDate;
+ private String endDate;
+
+ public MarketoRecordReader(String beginDate, String endDate) {
+ this.beginDate = beginDate;
+ this.endDate = endDate;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
+ Configuration conf = taskAttemptContext.getConfiguration();
+ String configJson = conf.get(MarketoInputFormatProvider.PROPERTY_CONFIG_JSON);
+ MarketoReportingSourceConfig config = GSON.fromJson(configJson, MarketoReportingSourceConfig.class);
+
+ Marketo marketo = config.getMarketo();
+
+ DateRange dateRange = new DateRange(beginDate, endDate);
+ AbstractBulkExportJob job;
+ switch (config.getReportType()) {
+ case LEADS:
+ List leadsFields = config.getSchema().getFields().stream()
+ .map(Schema.Field::getName)
+ .collect(Collectors.toList());
+
+ LeadsExportRequest.ExportLeadFilter leadsFilter = LeadsExportRequest.ExportLeadFilter.builder()
+ .createdAt(dateRange)
+ .build();
+
+ job = marketo.exportLeads(new LeadsExportRequest(leadsFields, leadsFilter));
+ break;
+ case ACTIVITIES:
+ ExportActivityFilter activitiesFilter = ExportActivityFilter.builder()
+ .createdAt(dateRange)
+ .build();
+
+ job = marketo.exportActivities(new ActivitiesExportRequest(Marketo.ACTIVITY_FIELDS, activitiesFilter));
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid report type " + config.getReportType());
+ }
+
+ LOG.info("BULK EXPORT JOB - job '{}' has date range '{}'", job.getJobId(), dateRange);
+
+ // wait for 25 minutes for available slot and enqueue job
+ marketo.onBulkExtractQueueAvailable(job::enqueue, JOB_ENQUEUE_TIMEOUT);
+
+ job.waitCompletion();
+
+ String data = job.getFile();
+ CSVParser parser = CSVFormat.DEFAULT.withHeader().parse(new StringReader(data));
+ iterator = parser.iterator();
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ if (iterator.hasNext()) {
+ current = iterator.next().toMap();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public Map getCurrentValue() {
+ return current;
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingPlugin.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingPlugin.java
new file mode 100644
index 0000000..585814d
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingPlugin.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.batch.Input;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import io.cdap.plugin.common.IdUtils;
+import io.cdap.plugin.common.LineageRecorder;
+import org.apache.hadoop.io.NullWritable;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Plugin that reads entities from Marketo api.
+ */
+@Plugin(type = BatchSource.PLUGIN_TYPE)
+@Name(MarketoReportingPlugin.NAME)
+@Description("Reads Leads or Activities from Marketo.")
+public class MarketoReportingPlugin extends BatchSource, StructuredRecord> {
+ public static final String NAME = "MarketoReportingPlugin";
+
+ private final MarketoReportingSourceConfig config;
+
+ public MarketoReportingPlugin(MarketoReportingSourceConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ validateConfiguration(pipelineConfigurer.getStageConfigurer().getFailureCollector());
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
+ }
+
+ @Override
+ public void prepareRun(BatchSourceContext batchSourceContext) {
+ validateConfiguration(batchSourceContext.getFailureCollector());
+ LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, config.referenceName);
+ lineageRecorder.createExternalDataset(config.getSchema());
+ lineageRecorder.recordRead("Read", "Reading Marketo entities",
+ Objects.requireNonNull(config.getSchema().getFields()).stream()
+ .map(Schema.Field::getName)
+ .collect(Collectors.toList()));
+
+ batchSourceContext.setInput(Input.of(config.referenceName, new MarketoInputFormatProvider(config)));
+ }
+
+ @Override
+ public void transform(KeyValue> input, Emitter emitter) {
+ emitter.emit(MarketoReportingSchemaHelper.getRecord(config.getSchema(), input.getValue()));
+ }
+
+ private void validateConfiguration(FailureCollector failureCollector) {
+ config.validate(failureCollector);
+ failureCollector.getOrThrowException();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSchemaHelper.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSchemaHelper.java
new file mode 100644
index 0000000..0a578fd
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSchemaHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.marketo.common.api.Marketo;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Various methods to deal with schema and record.
+ */
+public class MarketoReportingSchemaHelper {
+ public static Schema getActivitySchema() {
+ List fields = Marketo.ACTIVITY_FIELDS.stream()
+ .map(s -> Schema.Field.of(s, Schema.nullableOf(Schema.of(Schema.Type.STRING))))
+ .collect(Collectors.toList());
+ return Schema.recordOf("activityRecord", fields);
+ }
+
+ public static Schema getSchema(MarketoReportingSourceConfig config) {
+ switch (config.getReportType()) {
+ case LEADS:
+ List fields = config.getMarketo().describeLeads().stream().map(
+ leadAttribute -> {
+ if (leadAttribute.getRest() != null) {
+ return Schema.Field.of(leadAttribute.getRest().getName(),
+ Schema.nullableOf(Schema.of(Schema.Type.STRING)));
+ } else {
+ return null;
+ }
+ }
+ ).filter(Objects::nonNull).collect(Collectors.toList());
+
+ return Schema.recordOf("LeadsRecord", fields);
+ case ACTIVITIES:
+ return getActivitySchema();
+ }
+ throw new IllegalArgumentException("Failed to get schema for type " + config.getReportType());
+ }
+
+ public static StructuredRecord getRecord(Schema schema, Map fields) {
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+ schema.getFields().forEach(
+ field -> {
+ if (fields.containsKey(field.getName())) {
+ builder.set(field.getName(), fields.get(field.getName()));
+ }
+ }
+ );
+ return builder.build();
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSourceConfig.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSourceConfig.java
new file mode 100644
index 0000000..e7282aa
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSourceConfig.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.common.IdUtils;
+import io.cdap.plugin.common.ReferencePluginConfig;
+import io.cdap.plugin.marketo.common.api.Marketo;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeParseException;
+
+/**
+ * Provides all required configuration for reading Marketo entities.
+ */
+public class MarketoReportingSourceConfig extends ReferencePluginConfig {
+ public static final String PROPERTY_CLIENT_ID = "clientId";
+ public static final String PROPERTY_CLIENT_SECRET = "clientSecret";
+ public static final String PROPERTY_REST_API_ENDPOINT = "restApiEndpoint";
+ public static final String PROPERTY_REPORT_TYPE = "reportType";
+ public static final String PROPERTY_START_DATE = "startDate";
+ public static final String PROPERTY_END_DATE = "endDate";
+
+ @Name(PROPERTY_CLIENT_ID)
+ @Description("Marketo Client ID.")
+ @Macro
+ protected String clientId;
+
+ @Name(PROPERTY_CLIENT_SECRET)
+ @Description("Marketo Client secret.")
+ @Macro
+ protected String clientSecret;
+
+ @Name(PROPERTY_REST_API_ENDPOINT)
+ @Description("REST API endpoint URL.")
+ @Macro
+ protected String restApiEndpoint;
+
+ @Name(PROPERTY_REPORT_TYPE)
+ @Description("Report type format, leads or activities.")
+ @Macro
+ protected String reportType;
+
+ @Name(PROPERTY_START_DATE)
+ @Description("Start date for the report.")
+ @Macro
+ protected String startDate;
+
+ @Name(PROPERTY_END_DATE)
+ @Description("End date for the report.")
+ @Macro
+ protected String endDate;
+
+ private transient Schema schema = null;
+ private transient Marketo marketo = null;
+
+ public MarketoReportingSourceConfig(String referenceName) {
+ super(referenceName);
+ }
+
+ public Schema getSchema() {
+ if (schema == null) {
+ schema = MarketoReportingSchemaHelper.getSchema(this);
+ }
+ return schema;
+ }
+
+ public Marketo getMarketo() {
+ if (marketo == null) {
+ marketo = new Marketo(getRestApiEndpoint(), getClientId(), getClientSecret());
+ }
+ return marketo;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public String getClientSecret() {
+ return clientSecret;
+ }
+
+ public String getRestApiEndpoint() {
+ return restApiEndpoint;
+ }
+
+ public String getStartDate() {
+ return startDate;
+ }
+
+ public String getEndDate() {
+ return endDate;
+ }
+
+ public ReportType getReportType() {
+ return ReportType.fromString(reportType);
+ }
+
+ void validate(FailureCollector failureCollector) {
+ IdUtils.validateReferenceName(referenceName, failureCollector);
+ validateDate(failureCollector);
+ validateReportType(failureCollector);
+ validateMarketoEndpoint(failureCollector);
+ validateSecrets(failureCollector);
+ }
+
+ void validateDate(FailureCollector failureCollector) {
+ if (!containsMacro(PROPERTY_START_DATE)) {
+ try {
+ OffsetDateTime.parse(getStartDate());
+ } catch (DateTimeParseException ex) {
+ failureCollector.addFailure("Failed to parse start date.",
+ "Correct date to ISO 8601 format.")
+ .withConfigProperty(PROPERTY_START_DATE);
+ }
+ }
+
+ if (!containsMacro(PROPERTY_END_DATE)) {
+ try {
+ OffsetDateTime.parse(getStartDate());
+ } catch (DateTimeParseException ex) {
+ failureCollector.addFailure("Failed to parse end date.",
+ "Correct date to ISO 8601 format.")
+ .withConfigProperty(PROPERTY_END_DATE);
+ }
+ }
+
+ if (!(containsMacro(PROPERTY_START_DATE) && containsMacro(PROPERTY_END_DATE))) {
+ try {
+ OffsetDateTime start = OffsetDateTime.parse(getStartDate());
+ OffsetDateTime end = OffsetDateTime.parse(getEndDate());
+
+ if (start.compareTo(end) > 0) {
+ failureCollector.addFailure("Start date cannot be greater than the end date.", "Swap dates.")
+ .withConfigProperty(PROPERTY_START_DATE).withConfigProperty(PROPERTY_END_DATE);
+ }
+ } catch (DateTimeParseException ex) {
+ // silently ignore parsing exceptions, we already pushed messages for malformed dates
+ }
+ }
+ }
+
+ void validateReportType(FailureCollector failureCollector) {
+ if (!containsMacro(PROPERTY_REPORT_TYPE)) {
+ try {
+ getReportType();
+ } catch (IllegalArgumentException ex) {
+ failureCollector.addFailure(String.format("Incorrect reporting type '%s'.", reportType),
+ "Set reporting type to 'activities' or 'leads'.")
+ .withConfigProperty(PROPERTY_REPORT_TYPE);
+ }
+ }
+ }
+
+ void validateSecrets(FailureCollector failureCollector) {
+ if (!containsMacro(PROPERTY_CLIENT_ID) && Strings.isNullOrEmpty(getClientId())) {
+ failureCollector.addFailure("Client ID is empty.", null)
+ .withConfigProperty(PROPERTY_CLIENT_ID);
+ }
+
+ if (!containsMacro(PROPERTY_CLIENT_SECRET) && Strings.isNullOrEmpty(getClientSecret())) {
+ failureCollector.addFailure("Client Secret is empty.", null)
+ .withConfigProperty(PROPERTY_CLIENT_SECRET);
+ }
+ }
+
+ void validateMarketoEndpoint(FailureCollector failureCollector) {
+ if (!containsMacro(PROPERTY_REST_API_ENDPOINT)) {
+ try {
+ new URL(getRestApiEndpoint());
+ } catch (MalformedURLException e) {
+ failureCollector
+ .addFailure(String.format("Malformed Marketo Rest API endpoint URL '%s'.", getRestApiEndpoint()), null)
+ .withConfigProperty(PROPERTY_REST_API_ENDPOINT);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSplit.java b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSplit.java
new file mode 100644
index 0000000..7e12e27
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/MarketoReportingSplit.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A no-op split.
+ */
+public class MarketoReportingSplit extends InputSplit implements Writable {
+ private String beginDate;
+ private String endDate;
+
+ public MarketoReportingSplit() {
+ }
+
+ public MarketoReportingSplit(String beginDate, String endDate) {
+ this.beginDate = beginDate;
+ this.endDate = endDate;
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ beginDate = dataInput.readUTF();
+ endDate = dataInput.readUTF();
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeUTF(beginDate);
+ dataOutput.writeUTF(endDate);
+ }
+
+ @Override
+ public long getLength() {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return new String[0];
+ }
+
+ public String getBeginDate() {
+ return beginDate;
+ }
+
+ public String getEndDate() {
+ return endDate;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/marketo/source/batch/ReportType.java b/src/main/java/io/cdap/plugin/marketo/source/batch/ReportType.java
new file mode 100644
index 0000000..13193f0
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/marketo/source/batch/ReportType.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.source.batch;
+
+/**
+ * Represents report type.
+ */
+public enum ReportType {
+ LEADS("leads"),
+ ACTIVITIES("activities");
+
+ private String type;
+
+ ReportType(String type) {
+ this.type = type;
+ }
+
+ public static ReportType fromString(String reportType) {
+ for (ReportType rt : ReportType.values()) {
+ if (rt.type.equals(reportType)) {
+ return rt;
+ }
+ }
+ throw new IllegalArgumentException("Unknown report type: " + reportType);
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/marketo/common/api/MarketoHttpTest.java b/src/test/java/io/cdap/plugin/marketo/common/api/MarketoHttpTest.java
new file mode 100644
index 0000000..80c7071
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/marketo/common/api/MarketoHttpTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.marketo.common.api;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.github.tomakehurst.wiremock.stubbing.Scenario;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
+import io.cdap.plugin.marketo.common.api.entities.BaseResponse;
+import io.cdap.plugin.marketo.common.api.entities.Error;
+import io.cdap.plugin.marketo.common.api.entities.MarketoToken;
+import io.cdap.plugin.marketo.common.api.entities.Warning;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class MarketoHttpTest {
+ private static final Gson GSON = new Gson();
+
+ @Rule
+ public WireMockRule wireMockRule = new WireMockRule(
+ WireMockConfiguration.wireMockConfig().dynamicPort()
+ );
+
+ public static class StubResponse extends BaseResponse {
+ StubResponse(boolean success, List errors, List warnings) {
+ setSuccess(success);
+ setErrors(errors);
+ setWarnings(warnings);
+ }
+ }
+
+ public static class PageResponse extends BaseResponse {
+ private List results;
+
+ PageResponse(boolean moreResults, String nextPageToken, String... items) {
+ setSuccess(true);
+ setMoreResult(moreResults);
+ results = Arrays.asList(items);
+ setNextPageToken(nextPageToken);
+ }
+
+ public List getResults() {
+ return results;
+ }
+ }
+
+ @Test
+ public void testPaging() {
+ setupToken();
+
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/paged.json")).inScenario("page")
+ .whenScenarioStateIs(Scenario.STARTED)
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new PageResponse(true, "page1", "1", "2", "3"))
+ )
+ )
+ .willSetStateTo("page1")
+ );
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/paged.json")).inScenario("page")
+ .whenScenarioStateIs("page1")
+ .withQueryParam("nextPageToken", WireMock.equalTo("page1"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new PageResponse(true, "page2", "4", "5", "6"))
+ )
+ )
+ .willSetStateTo("page2")
+ );
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/paged.json")).inScenario("page")
+ .whenScenarioStateIs("page2")
+ .withQueryParam("nextPageToken", WireMock.equalTo("page2"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new PageResponse(false, null, "7", "8", "9"))
+ )
+ )
+ .willSetStateTo("page3")
+ );
+
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+
+ List results = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
+ m.iteratePage("/rest/v1/paged.json", PageResponse.class, PageResponse::getResults),
+ Spliterator.ORDERED), false).sorted().collect(Collectors.toList());
+
+ Assert.assertArrayEquals(new String[]{"1", "2", "3", "4", "5", "6", "7", "8", "9"}, results.toArray());
+ }
+
+ @Test
+ public void testToken() {
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/identity/oauth/token"))
+ .withQueryParam("grant_type", WireMock.equalTo("client_credentials"))
+ .withQueryParam("client_id", WireMock.equalTo("clientNiceId"))
+ .withQueryParam("client_secret", WireMock.equalTo("clientNiceSecret"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new MarketoToken("niceToken", "hello@world.com", "3600", "bearer")
+ )
+ )
+ )
+ );
+
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+ Assert.assertEquals("niceToken", m.getCurrentToken().getAccessToken());
+ }
+
+ @Test
+ public void testTokenRefresh() {
+ setupToken();
+
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/stub.json")).inScenario("retry")
+ .whenScenarioStateIs(Scenario.STARTED)
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new StubResponse(false,
+ Collections.singletonList(
+ new Error(602, "Access token expired")),
+ Collections.emptyList()))
+ )
+ )
+ .willSetStateTo("refreshed")
+ );
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/stub.json")).inScenario("retry")
+ .whenScenarioStateIs("refreshed")
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new StubResponse(true, Collections.emptyList(), Collections.emptyList()))
+ )
+ )
+ );
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+ m.validatedGet("/rest/v1/stub.json", Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, StubResponse.class));
+ WireMock.verify(WireMock.exactly(2),
+ WireMock.getRequestedFor(WireMock.urlPathEqualTo("/identity/oauth/token")));
+ WireMock.verify(WireMock.exactly(2),
+ WireMock.getRequestedFor(WireMock.urlPathEqualTo("/rest/v1/stub.json")));
+ }
+
+ @Test
+ public void testPost() {
+ setupToken();
+
+ WireMock.stubFor(
+ WireMock.post(WireMock.urlPathMatching("/rest/v1/post.json"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new StubResponse(true, Collections.emptyList(), Collections.emptyList()))
+ )
+ )
+ );
+
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+ m.validatedPost("/rest/v1/post.json", Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, StubResponse.class), "body", String::toString);
+
+ WireMock.verify(
+ WireMock.postRequestedFor(WireMock.urlPathEqualTo("/rest/v1/post.json"))
+ .withRequestBody(WireMock.equalTo("body"))
+ );
+ }
+
+ @Test
+ public void testMessages() {
+ setupToken();
+
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/justWarnings.json"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new StubResponse(true, Collections.emptyList(), Arrays.asList(
+ new Warning(700, "Reversed agent 007"),
+ new Warning(777, "Result of 1000 - 333")
+ ))))
+ )
+ );
+
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/errors.json"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new StubResponse(false,
+ Collections.singletonList(new Error(123, "No way")),
+ Collections.emptyList())))
+ )
+ );
+
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+ m.validatedGet("/rest/v1/justWarnings.json", Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, StubResponse.class));
+
+ try {
+ m.validatedGet("/rest/v1/errors.json", Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, StubResponse.class));
+ Assert.fail("This call expected to fail.");
+ } catch (RuntimeException ex) {
+ Assert.assertTrue(ex.getMessage().contains("123"));
+ Assert.assertTrue(ex.getMessage().contains("No way"));
+ }
+ }
+
+ @Test
+ public void testBuildUri() {
+ setupToken();
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+ String uriWithToken = m.buildUri("/hello", Collections.emptyMap()).toString();
+ Assert.assertTrue(uriWithToken.contains("access_token"));
+ String uriWithoutToken = m.buildUri("/hello", ImmutableMap.of("param", "value"), false).toString();
+ Assert.assertFalse(uriWithoutToken.contains("access_token"));
+ Assert.assertTrue(uriWithoutToken.contains("param=value"));
+ }
+
+ @Test
+ public void testHttpError() {
+ setupToken();
+
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/rest/v1/fail.json"))
+ .willReturn(
+ WireMock.aResponse().withStatus(500).withBody("GJ server.")
+ )
+ );
+
+ try {
+ MarketoHttp m = new MarketoHttp(getApiUrl(), "clientNiceId", "clientNiceSecret");
+ m.validatedGet("/rest/v1/fail.json", Collections.emptyMap(),
+ inputStream -> Helpers.streamToObject(inputStream, StubResponse.class));
+ Assert.fail("This call expected to fail.");
+ } catch (RuntimeException ex) {
+ Assert.assertTrue(ex.getMessage().contains("GJ server"));
+ }
+ }
+
+ @Test
+ public void invalidEndpoint() {
+ try {
+ new MarketoHttp("%^%^&%^", "clientNiceId", "clientNiceSecret");
+ Assert.fail("This call expected to fail.");
+ } catch (IllegalArgumentException ex) {
+ Assert.assertEquals("'%^%^&%^/identity/oauth/token' is invalid URI", ex.getMessage());
+ }
+ }
+
+ void setupToken() {
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlPathMatching("/identity/oauth/token"))
+ .willReturn(
+ WireMock.aResponse().withBody(
+ GSON.toJson(new MarketoToken("niceToken", "hello@world.com", "3600", "bearer")
+ )
+ )
+ )
+ );
+ }
+
+ String getApiUrl() {
+ return String.format("http://localhost:%d", wireMockRule.port());
+ }
+}
diff --git a/suppressions.xml b/suppressions.xml
new file mode 100644
index 0000000..600350b
--- /dev/null
+++ b/suppressions.xml
@@ -0,0 +1,33 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/widgets/MarketoReportingPlugin-batchsource.json b/widgets/MarketoReportingPlugin-batchsource.json
new file mode 100644
index 0000000..e5390ba
--- /dev/null
+++ b/widgets/MarketoReportingPlugin-batchsource.json
@@ -0,0 +1,77 @@
+{
+ "metadata": {
+ "spec-version": "1.0"
+ },
+ "display-name": "Marketo Reporting",
+ "configuration-groups": [
+ {
+ "label": "General",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Rest API endpoint",
+ "name": "restApiEndpoint"
+ }
+ ]
+ },
+ {
+ "label": "Authentication",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Client ID",
+ "name": "clientId"
+ },
+ {
+ "widget-type": "password",
+ "label": "Client Secret",
+ "name": "clientSecret"
+ }
+ ]
+ },
+ {
+ "label": "Report",
+ "properties": [
+ {
+ "widget-type": "select",
+ "label": "Report Type",
+ "name": "reportType",
+ "widget-attributes": {
+ "default": "leads",
+ "values": [
+ "leads",
+ "activities"
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Start Date",
+ "name": "startDate",
+ "widget-attributes": {
+ "placeholder": "Start date in ISO 8601 format(1997-07-16T19:20:30+01:00)"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "End Date",
+ "name": "endDate",
+ "widget-attributes": {
+ "placeholder": "End date in ISO 8601 format(1997-07-16T19:20:30+01:00)"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "widget-type": "non-editable-schema-editor",
+ "schema": {}
+ }
+ ]
+}
\ No newline at end of file