{"id":1030,"date":"2018-12-14T01:18:31","date_gmt":"2018-12-14T01:18:31","guid":{"rendered":"http:\/\/kusuaks7\/?p=635"},"modified":"2023-08-10T14:38:41","modified_gmt":"2023-08-10T14:38:41","slug":"scale-your-machine-learning-pipeline","status":"publish","type":"post","link":"https:\/\/www.experfy.com\/blog\/ai-ml\/scale-your-machine-learning-pipeline\/","title":{"rendered":"Scale Your Machine Learning Pipeline"},"content":{"rendered":"<p><strong><em>Ready to learn Machine Learning? Browse<\/em><\/strong> <strong><em><a href=\"https:\/\/www.experfy.com\/training\/tracks\/machine-learning-training-certification\">Machine Learning Training and Certification courses<\/a> developed by industry thought leaders and Experfy in Harvard Innovation Lab.<\/em><\/strong><\/p>\n<blockquote>\n<h4>How to parallelize and distribute your Python machine learning pipelines with Luigi, Docker, and Kubernetes<\/h4>\n<\/blockquote>\n<h4><\/h4>\n<p style=\"text-align: center;\"><img decoding=\"async\" style=\"width: 600px; height: 333px;\" src=\"https:\/\/images.ctfassets.net\/2yr4wv2jga4w\/2ef5X68QNGIMca0QIEuwKK\/94eebeeddebad91410cb2f5e3ef7d556\/Scaling.gif\" alt=\"image\" \/><\/p>\n<p>&nbsp;<\/p>\n<p>This article presents the easiest way to turn your machine learning application from a simple Python program into\u00a0<strong>a scalable pipeline that runs on a cluster<\/strong>.<\/p>\n<p>Check out the\u00a0<a href=\"https:\/\/github.com\/datarevenue-berlin\/tac-example\" rel=\"noopener\">Github repository<\/a>\u00a0for ready-to-use example code.<\/p>\n<h2 id=\"overview\">Overview<\/h2>\n<h3 id=\"what-you-will-learn\">What you will learn:<\/h3>\n<ul>\n<li>How to use\u00a0<a href=\"http:\/\/luigi.readthedocs.io\/\" rel=\"noopener\"><code>luigi<\/code><\/a>\u00a0to manage tasks<\/li>\n<li>How to easily create Command Line Interface for python script with\u00a0<a href=\"http:\/\/click.pocoo.org\/6\/\" rel=\"noopener\"><code>click<\/code><\/a><\/li>\n<li>How to run the pipeline in multiple Docker containers<\/li>\n<li>How to deploy a small cluster on your machine<\/li>\n<li>How to run your application on the cluster<\/li>\n<\/ul>\n<h2 id=\"dont-calculate-things-twice--luigify-your-pipeline\">Don\u2019t calculate things twice \u2013 Luigify your pipeline<\/h2>\n<p>Some of the functions in your application may be time consuming and return huge outputs, so if your pipeline fails along the way, for any reason,\u00a0<strong>it\u2019s gonna cost you a lot of time and frustration to fix a small bug and rerun everything.<\/strong><\/p>\n<p>Let\u2019s do something about it.<\/p>\n<p>Suppose your pipeline needs to do the following things:<\/p>\n<ol>\n<li>fetch the data for the last 10 days;<\/li>\n<li>transform the data;<\/li>\n<li>make predictions with two different models.<\/li>\n<\/ol>\n<p>You could code the workflow like this:<\/p>\n<pre><code>data = []\r\nfor date in dates:\r\ndata.append(fetch_data(date))\r\ntransformed = transform_data(data)\r\npredictions = {}\r\nfor name, model in models.items():\r\npredictions[name] = predict(transformed, model)\r\n<\/code><\/pre>\n<p>But this code is quite prone to errors, such as may occur while downloading the data \u2013 one network error, and all the work you\u2019ve done is lost. Moreover, if you download data for the last ten days today and you\u2019re planning to run the same pipeline again tomorrow,\u00a0<strong>it doesn\u2019t make much sense to download 90% of necessary data all over again.<\/strong><\/p>\n<blockquote><p>So how can you avoid doing the same thing twice?<\/p><\/blockquote>\n<p>Sure, you can come up with ideas on how to save and reuse intermediate results, but there\u2019s no need for you to code it yourself.<\/p>\n<p>I recommend using the\u00a0<a href=\"http:\/\/luigi.readthedocs.io\/\" rel=\"noopener\"><code>luigi<\/code><\/a>\u00a0package. It lets you easily divide your code into separate data-processing units \u2013 called tasks \u2013 each with concrete requirements and output.<\/p>\n<p>One of your tasks could look like this:<\/p>\n<pre><code>class TransformData(luigi.Task):\r\ndate = luigi.DateParameter()\r\n\r\ndef requires(self):\r\nfor delta in range(1, 11):\r\nyield FetchData(date=self.date - timedelta(days=delta))\r\n\r\ndef output(self):\r\nreturn luigi.LocalTarget(\r\npath='.\/data\/transformed\/{:%Y-%m-%d}.csv'.format(self.date)\r\n)\r\n\r\ndef run(self):\r\nresult = transform_data(paths=[item.path for item in self.input()])\r\nsave_result(data=result, path=self.output().path)\r\n<\/code><\/pre>\n<p>From this snippet, we can see that:<\/p>\n<ul>\n<li>The name of the task is\u00a0<code>TransformData<\/code>;<\/li>\n<li>The task has one parameter, namely\u00a0<code>date<\/code>;<\/li>\n<li>It depends on ten tasks from the\u00a0<code>FetchData<\/code>\u00a0class, one for each of the ten previous days;<\/li>\n<li>It saves its output in a CSV file named after the \u2018date\u2019 parameter.<\/li>\n<\/ul>\n<p>I\u2019ve given a complete example of a dummy pipeline below. Take a moment to analyse how the tasks\u2019 dependencies create a logical pipeline:<\/p>\n<pre><code># task-dummy.py\r\nfrom datetime import timedelta\r\nfrom time import sleep\r\nimport luigi\r\nimport os\r\n\r\n\r\nclass SourceData(luigi.ExternalTask):\r\ndate = luigi.DateParameter()\r\n\r\ndef output(self):\r\nreturn luigi.LocalTarget(\r\npath='.\/data\/source\/{:%Y-%m-%d}.csv'.format(self.date)\r\n)\r\n\r\ndef complete(self):\r\n\"\"\"Hack so we don't have to create input files manually.\r\n\r\nLuigi will always think that this task is done, without checking for\r\npresence of source files.\r\n\"\"\"\r\nreturn True\r\n\r\n\r\nclass FetchData(luigi.Task):\r\ndate = luigi.DateParameter()\r\n\r\ndef requires(self):\r\nreturn SourceData(date=self.date)\r\n\r\ndef output(self):\r\nreturn luigi.LocalTarget(\r\npath='.\/data\/raw\/{:%Y-%m-%d}.csv'.format(self.date)\r\n)\r\n\r\ndef run(self):\r\nprint('Reading from {} and writing to {}'\r\n.format(self.input().path, self.output().path))\r\nsleep(1)\r\nself.output().makedirs()\r\nself.output().open('w').close()  # write output file\r\n\r\n\r\nclass TransformData(luigi.Task):\r\ndate = luigi.DateParameter()\r\n\r\ndef requires(self):\r\nfor delta in range(1, 11):\r\nyield FetchData(date=self.date - timedelta(days=delta))\r\n\r\ndef output(self):\r\nreturn luigi.LocalTarget(\r\npath='.\/data\/transformed\/{:%Y-%m-%d}.csv'.format(self.date)\r\n)\r\n\r\ndef run(self):\r\n# result = transform_data(paths=[item.path for item in self.input()])\r\n# save_result(data=result, path=self.output().path)\r\nprint('Loading data, transforming and saving to {}'\r\n.format(self.output().path))\r\nsleep(1)\r\nself.output().makedirs()\r\nself.output().open('w').close()\r\n\r\n\r\nclass Predict(luigi.Task):\r\ndate = luigi.DateParameter()\r\nmodel_name = luigi.Parameter()\r\n\r\ndef requires(self):\r\nreturn TransformData(date=self.date)\r\n\r\ndef output(self):\r\nreturn luigi.LocalTarget(\r\npath='.\/data\/predictions\/{:%Y-%m-%d}_{}.csv'\r\n.format(self.date, self.model_name)\r\n)\r\n\r\ndef run(self):\r\nprint('Predicting with model {} and saving to {}'\r\n.format(self.model_name, self.output().path))\r\nsleep(1)\r\nself.output().makedirs()\r\nself.output().open('w').close()\r\n\r\n\r\nclass MakePredictions(luigi.WrapperTask):\r\ndate = luigi.DateParameter()\r\n\r\ndef requires(self):\r\nfor model_name in ['A', 'B']:\r\nyield Predict(date=self.date, model_name=model_name)\r\n\r\n<\/code><\/pre>\n<p>Now, when you try to run the \u2018MakePredictions\u2019 task, Luigi will make sure all the upstream tasks run beforehand. Try installing Luigi with\u00a0<code>pip install luigi<\/code>, save the above example in\u00a0<code>task-dummy.py<\/code>, and run this command:<\/p>\n<pre><code>PYTHONPATH=. luigi --module task-dummy MakePredictions --date 2018-01-01 --local-scheduler\r\n<\/code><\/pre>\n<p>Furthermore, Luigi won\u2019t run any task when its output is already present. Try running the same command again \u2013 Luigi will report that \u2018MakePredictions\u2019 for a given date has already been done.<\/p>\n<p><a href=\"https:\/\/luigi.readthedocs.io\/en\/stable\/example_top_artists.html\" rel=\"noopener\">Here<\/a>\u00a0you can find another good example that will help you get started with Luigi.<\/p>\n<h2 id=\"parallelism-for-free--luigi-workers\">Parallelism for free \u2013 Luigi workers<\/h2>\n<blockquote><p>Can I run multiple tasks at the same time?<\/p><\/blockquote>\n<p>Yes, you can! Luigi provides this functionality out of the box. Just by adding\u00a0<code>--workers 4<\/code>\u00a0to the command, for example, you\u2019re letting Luigi run four tasks simultaneously.<\/p>\n<p>Let\u2019s use this opportunity to present Luigi\u2019s graphical interface.<\/p>\n<p>Open a second terminal and run the following command:<\/p>\n<pre><code>luigid\r\n<\/code><\/pre>\n<p>This will start a so-called\u00a0<em>central Luigi scheduler<\/em>\u00a0that listens on a default port 8082. You can check its pretty dashboard on your browser at:\u00a0<a href=\"http:\/\/www.experfy.com:8082\/\">http:\/\/www.experfy.com:8082<\/a>.<\/p>\n<p>Now go back to the first terminal \u2013 you can run the Luigi command again, this time without the\u00a0<code>--local-scheduler<\/code>\u00a0argument (don\u2019t forget to delete the files you\u2019ve already created or choose another date if you want to see the tasks executing). If you want parallelism, add\u00a0<code>--workers 4<\/code>\u00a0to the command. After refreshing the dashboard page, you should see a list of scheduled tasks. Click on the tree icon next to the\u00a0<code>MakePredictions<\/code>\u00a0task to see all its dependencies (Isn\u2019t it pretty?):<\/p>\n<p style=\"text-align: center;\"><img decoding=\"async\" style=\"width: 700px; height: 469px;\" src=\"https:\/\/i.imgur.com\/rPiTcDd.png\" alt=\"luigi dashboard\" \/><\/p>\n<h2 id=\"parallelism-at-scale--moving-to-a-cluster\">Parallelism at scale \u2013 Moving to a cluster<\/h2>\n<p>Now we\u2019re getting serious \u2013 if one machine is not enough for you to run your tasks in parallel, you can take your pipeline to the next level and deploy it on a cluster. I\u2019ll walk you through all the necessary steps.<\/p>\n<h3 id=\"share-files-between-machines--use-aws-s3\">Share files between machines \u2013 Use AWS S3<\/h3>\n<p>In the previous example, all the files were saved locally on the same machine the tasks were executed on.<\/p>\n<blockquote><p>So how can I share files between multiple machines in the cluster?<\/p><\/blockquote>\n<p>There are many answers to this question, but we\u2019ll focus on one of the possible ways \u2013 using Amazon\u2019s S3.<\/p>\n<p>AWS S3 is a\u00a0<em>Simple Storage Service<\/em>. It lets you store your files in the cloud. Instead of\u00a0<code>\/home\/user\/data\/file.csv<\/code>, you save your file under\u00a0<code>s3:\/\/bucket\/data\/file.csv<\/code>. Python provides tools that make it easy to switch from local storage to S3.<\/p>\n<hr \/>\n<p><em>Info<\/em>: For this simplicity of this tutorial, if you want to follow the instructions below, I need you to\u00a0<strong>set up a free AWS trial account<\/strong>\u00a0you\u2019ll use for storing your files. You can do it\u00a0<a href=\"https:\/\/portal.aws.amazon.com\/billing\/signup\" rel=\"noopener\">here<\/a>\u00a0and it\u2019s completely free of charge for one year. You can opt out after this period if you don\u2019t need it anymore.<\/p>\n<p>After creating the account, go\u00a0<a href=\"https:\/\/s3.console.aws.amazon.com\/s3\" rel=\"noopener\">here<\/a>\u00a0and create a\u00a0<em>bucket<\/em>. Think of a bucket as a partition on a hard drive.<\/p>\n<hr \/>\n<p>To read and write data from S3, we\u2019re gonna use\u00a0<code>luigi.contrib.s3.S3Target<\/code>\u00a0class. You can modify the dummy example by simply adding a proper import and replacing the\u00a0<code>LocalTarget<\/code>in-task definitions as I\u2019ve done here:<\/p>\n<pre><code>def output(self):\r\n-   return luigi.LocalTarget(\r\n-       path='.\/data\/predictions\/{:%Y-%m-%d}_{}.csv'\r\n+   return S3Target(\r\n+       path='s3:\/\/your-bucket\/data\/predictions\/{:%Y-%m-%d}_{}.csv'\r\n.format(self.date, self.model_name)\r\n)\r\n<\/code><\/pre>\n<p>You\u2019ll also need to remove all\u00a0<code>self.output().makedirs()<\/code>\u00a0calls, because you don\u2019t need to create folders on S3.<\/p>\n<p>To use Luigi\u2019s S3 functionality, you must\u00a0<code>pip install boto3<\/code>.<\/p>\n<p>You\u2019ll also need to give your application credentials for S3 authentication. Let\u2019s use the simplest approach: you can create a new Access Key on\u00a0<a href=\"https:\/\/console.aws.amazon.com\/iam\/home#\/security_credential\" rel=\"noopener\">this site<\/a>. You\u2019ll get an Access Key ID and a Secret Access Key \u2013 save them in the environment variables\u00a0<code>AWS_ACCESS_KEY_ID<\/code>\u00a0and\u00a0<code>AWS_SECRET_ACCESS_KEY<\/code>, respectively.<\/p>\n<p>Now your application should be able to read and write data to AWS S3. Try it out by running the pipeline again.<\/p>\n<h3 id=\"containerize-your-pipeline-to-put-it-on-a-cluster\">Containerize your pipeline to put it on a cluster<\/h3>\n<p>Sadly, you can\u2019t put your Python code on a cluster and just execute it. However, you can run a certain command in a certain container.<\/p>\n<p>I\u2019ll show you how to refactor your pipeline to run each task in a separate Docker container.<\/p>\n<h4 id=\"turn-your-tasks-into-mini-programs--add-a-simple-cli-with-click\">Turn your tasks into mini programs \u2013 Add a simple CLI with Click<\/h4>\n<p>The first step towards running tasks in containers is making them runnable from the command line.<\/p>\n<blockquote><p>What\u2019s the fastest way to write a CLI in Python?<\/p><\/blockquote>\n<p>Answer:\u00a0<a href=\"http:\/\/click.pocoo.org\/6\/\" rel=\"noopener\">Click<\/a>. Click is an awesome package that makes creating command line interfaces very easy.<\/p>\n<p>Let\u2019s get back to the\u00a0<code>TransformData<\/code>\u00a0task example (not the dummy one). Its\u00a0<code>run()<\/code>\u00a0method calls two functions \u2013 namely,\u00a0<code>transform_data<\/code>\u00a0and\u00a0<code>save_result<\/code>. Let\u2019s say these functions are defined in the file called\u00a0<code>transform.py<\/code>:<\/p>\n<pre><code># transform.py\r\nimport os\r\nfrom time import sleep\r\nfrom luigi.contrib.s3 import S3Target\r\nimport boto3.s3.transfer  # Luigi's bug workaround, sorry about this\r\n\r\ndef transform_data(paths):\r\nprint('Transforming data')\r\nsleep(3)\r\nreturn 123\r\n\r\ndef save_result(data, path):\r\nprint('Saving result')\r\nsleep(3)\r\nS3Target(path).open('w').close()\r\n<\/code><\/pre>\n<p>Now let\u2019s enable the running of these functions from the command line:<\/p>\n<pre><code># transform.py\r\nimport click\r\n\r\ndef transform_data(paths): ...\r\ndef save_result(data, path): ...\r\n\r\n@click.command()\r\n@click.argument('output-path')  # first argument\r\n@click.argument('input-paths', nargs=-1)  # all other arguments\r\ndef cli(output_path, input_paths):\r\nresult = transform_data(paths=input_paths)\r\nsave_result(data=result, path=output_path)\r\n\r\nif __name__ == '__main__':\r\ncli()\r\n<\/code><\/pre>\n<p>Here, we defined a function (<code>cli<\/code>) that will be called when we run this script from a command line. We specified that the first argument will be an output path, and all further arguments will make up a tuple of input paths. After running \u2018pip install click\u2019, we can invoke data transformation from the command line:<\/p>\n<pre><code>python transform.py s3:\/\/your-bucket\/output.csv input1.csv input2.csv\r\n<\/code><\/pre>\n<p>For convenience, let\u2019s call our project\u00a0<code>tac<\/code>. If you add\u00a0<code>setup.py<\/code>\u00a0to your project and\u00a0<code>pip install<\/code>\u00a0it (take a look at\u00a0<a href=\"https:\/\/github.com\/datarevenue-berlin\/tac-example\" rel=\"noopener\">example project<\/a>\u00a0to see how a project should be structured, and don\u2019t forget to add\u00a0<code>__init__.py<\/code>\u00a0to the package directory), you should be able to run your script with:<\/p>\n<pre><code>python -m tac.transform s3:\/\/your-bucket\/output.csv input1.csv input2.csv\r\n<\/code><\/pre>\n<h4 id=\"make-things-portable--run-tasks-in-containers\">Make things portable \u2013 Run tasks in containers<\/h4>\n<p>Now the question is:<\/p>\n<blockquote><p>How can I easily create a Docker container in which to run my command?<\/p><\/blockquote>\n<p>Well, that\u2019s simple.<\/p>\n<p>First, create a\u00a0<code>requirements.txt<\/code>\u00a0file in the project root dir. You\u2019ll need the following packages:<\/p>\n<pre><code>click\r\nluigi\r\nboto3\r\npykube  # we'll talk about this one later\r\n<\/code><\/pre>\n<p>Now, create a\u00a0<code>Dockerfile<\/code>\u00a0in the project root dir and put this inside:<\/p>\n<pre><code>FROM python\r\n\r\nCOPY requirements.txt \/requirements.txt\r\nRUN pip install -r \/requirements.txt\r\nCOPY . \/tac\r\nRUN pip install \/tac\r\n\r\nARG AWS_ACCESS_KEY_ID\r\nENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}\r\n\r\nARG AWS_SECRET_ACCESS_KEY\r\nENV AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}\r\n<\/code><\/pre>\n<p>Let\u2019s break it down:<\/p>\n<ul>\n<li><code>FROM python<\/code>\u00a0gives us a base image with python installed.<\/li>\n<li><code>COPY requirements.txt \/requirements.txt<\/code>\u00a0and\u00a0<code>RUN pip install -r \/requirements.txt<\/code>\u00a0install all the required packages.<\/li>\n<li><code>COPY . \/tac<\/code>\u00a0and\u00a0<code>RUN pip install \/tac<\/code>\u00a0install our project.<\/li>\n<li>The last four lines will let us set AWS credentials inside the image on build time (it\u2019s not a good practice, but let\u2019s keep this tutorial simple).<\/li>\n<\/ul>\n<p>Now you can build a Docker image containing your project by executing this from your project root dir (assuming you still have your AWS credentials in env variables):<\/p>\n<pre><code>docker build -t tac-example:v1 . --build-arg AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID --\r\nbuild-arg AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY\r\n<\/code><\/pre>\n<p>So you\u2019ve just built a Docker image tagged\u00a0<code>tac-example:v1<\/code>. Let\u2019s see if it works:<\/p>\n<pre><code>docker run tac-example:v1 python -m tac.transform s3:\/\/your-bucket\/docker-output.csv\r\ninput1.csv input2.csv\r\n<\/code><\/pre>\n<p>This should save a\u00a0<code>docker-output.csv<\/code>\u00a0file in your S3 bucket.<\/p>\n<h4 id=\"talk-to-the-cluster--prepare-your-tasks-to-be-run-in-kubernetes\">Talk to the cluster \u2013 Prepare your tasks to be run in Kubernetes<\/h4>\n<p>If you want to run all \u2013 or just some \u2013 of your pipeline tasks in a cluster, Luigi comes with a solution.<\/p>\n<p>Take a look at\u00a0<code>luigi.contrib.kubernetes.KubernetesJobTask<\/code>.<\/p>\n<p>Long story short, Kubernetes is a system that can manage a cluster. If you want to interact with a cluster, talk to Kubernetes.<\/p>\n<p>To run a piece of code in a cluster, you need to provide the following information to Kubernetes:<\/p>\n<ul>\n<li>the image that should be used to create a container;<\/li>\n<li>the name that container should be given;<\/li>\n<li>the command that should be executed in the container.<\/li>\n<\/ul>\n<p>Let\u2019s modify our good old \u2018TransformData\u2019 task from the dummy pipeline to conform to these requirements.<\/p>\n<ul>\n<li>First, change the base class to \u2018KubernetesJobTask\u2019:\n<pre><code>  from luigi.contrib.kubernetes import KubernetesJobTask\r\n\r\nclass TransformData(KubernetesJobTask):\r\ndate = luigi.DateParameter()\r\n<\/code><\/pre>\n<\/li>\n<li>Give it a name:\n<pre><code>      @property\r\ndef name(self):\r\nreturn 'transform-data'\r\n<\/code><\/pre>\n<\/li>\n<li>Define the command that should be run:\n<pre><code>      @property\r\ndef cmd(self):\r\ncommand = ['python', '-m', 'tac.transform', self.output().path]\r\ncommand += [item.path for item in self.input()]\r\nreturn command\r\n<\/code><\/pre>\n<\/li>\n<li>Provide the information to be passed on to Kubernetes:\n<pre><code>      @property\r\ndef spec_schema(self):\r\nreturn {\r\n\"containers\": [{\r\n\"name\": self.name,\r\n\"image\": 'tac-example:v1',\r\n\"command\": self.cmd\r\n}],\r\n}\r\n<\/code><\/pre>\n<\/li>\n<li>And delete the\u00a0<code>run()<\/code>\u00a0method, since this is implemented by\u00a0<code>KubernetesJobTask<\/code>.<\/li>\n<li>Also, run\u00a0<code>pip install pykube<\/code>, since it\u2019s a requirement for\u00a0<code>KubernetesJobTask<\/code>.<\/li>\n<\/ul>\n<p>You should end up with something similar to what you can see in the\u00a0<a href=\"https:\/\/github.com\/datarevenue-berlin\/tac-example\/blob\/master\/tac\/task-dummy.py\" rel=\"noopener\">example project<\/a>.<\/p>\n<p>But we can\u2019t run it until we connect to a cluster. Keep reading.<\/p>\n<h3 id=\"cluster-at-home--kubernetes-and-minikube\">Cluster at home \u2013 Kubernetes and Minikube<\/h3>\n<blockquote><p>How can I run my pipeline in a cluster \u2013 without having access to a cluster?<\/p><\/blockquote>\n<p>The cool thing is, you actually can run a mini version of a real cluster on your laptop!<\/p>\n<p>You can do this with\u00a0<a href=\"https:\/\/kubernetes.io\/docs\/setup\/minikube\/\" rel=\"noopener\">Minikube<\/a>. Minikube runs a single-node (single-machine) cluster inside a Virtual Machine on your computer.<\/p>\n<p>Take a moment now to install Minikube. You can find instructions\u00a0<a href=\"https:\/\/kubernetes.io\/docs\/tasks\/tools\/install-minikube\/\" rel=\"noopener\">here<\/a>. You\u2019re gonna need all the components mentioned in these instructions.<\/p>\n<p>After installation, you should be able to run<\/p>\n<pre><code>minikube start\r\n<\/code><\/pre>\n<p>to spin up your local cluster. Be patient, as this may take a while, especially when you do it for the first time. Verify that your cluster is running with<\/p>\n<pre><code>kubectl cluster-info.\r\n<\/code><\/pre>\n<p>You should see something similar to:<\/p>\n<pre><code>Kubernetes master is running at https:\/\/192.168.99.100:8443\r\nKubeDNS is running at https:\/\/192.168.99.100:8443\/api\/v1\/namespaces\/kube-system\/\r\nservices\/kube-dns:dns\/proxy\r\n<\/code><\/pre>\n<p>If everything is okay, you should be able to access Kubernetes\u2019 dashboard, which shows the current status of your cluster:<\/p>\n<pre><code>minikube dashboard\r\n<\/code><\/pre>\n<p>A new browser tab will open and show you this:<\/p>\n<p style=\"text-align: center;\"><img decoding=\"async\" style=\"width: 700px; height: 409px;\" src=\"https:\/\/i.imgur.com\/ju3wZpk.png\" alt=\"Kubernetes dashboard\" \/><\/p>\n<p>Since the cluster runs in a separate (virtual) machine, it doesn\u2019t have access to your Docker image (since you haven\u2019t pushed it to any online registry). We\u2019ll use a little trick to overcome this.<\/p>\n<p>The following command will set your current console session to execute docker commands using not your local Docker Engine, but the cluster VM\u2019s Docker Engine:<\/p>\n<pre><code>eval $(minikube docker-env)\r\n<\/code><\/pre>\n<p>Now all you need to do is call the \u2018docker build\u2019 command again. This time, your image will be built inside the VM:<\/p>\n<pre><code>docker build -t tac-example:v1 . --build-arg AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID\r\n--build-arg AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY\r\n<\/code><\/pre>\n<p>And here comes the moment of truth.<\/p>\n<p>We\u2019re gonna execute our pipeline inside the cluster we\u2019ve just configured.<\/p>\n<p>If everything went well, just calling the Luigi command should be enough. Minikube has already set the proper configuration, so\u00a0<code>KubernetesJobTask<\/code>\u00a0knows where the target Kubernetes is running.<\/p>\n<p>So try executing this command from the directory where\u00a0<code>task-dummy<\/code>\u00a0lives:<\/p>\n<pre><code>PYTHONPATH=. luigi --module task-dummy MakePredictions --date 2018-01-01\r\n<\/code><\/pre>\n<p>and watch how your\u00a0<code>TransformTask<\/code>\u00a0job runs in the cluster:<\/p>\n<p><img decoding=\"async\" style=\"width: 700px; height: 135px;\" src=\"https:\/\/i.imgur.com\/ohGPot9.png\" alt=\"Job running on Kubernetes\" \/><\/p>\n<h2 id=\"endnotes\">Endnotes<\/h2>\n<ul>\n<li>If\u00a0<code>KubernetesJobTask<\/code>\u00a0reports a message like this:\u00a0<code>No pod scheduled by transform-data-20180716075521-bc4f349a74f44ddf<\/code>\u00a0and fails to run, it\u2019s probably a bug, and not your fault. Check the dashboard to see if the\u00a0<code>transform-data-...<\/code>\u00a0pod has the status\u00a0<code>Terminated:Completed<\/code>. If so, then the task is actually finished and running your pipeline again should solve the problem. It\u2019s probably Minikube\u2019s fault.<\/li>\n<li>Consider Google Kubernetes Engine for spinning up a real cluster.<\/li>\n<li>When using Google\u2019s cluster, consider switching from AWS S3 to Google Cloud Storage to significantly speed up data access.\u00a0<a href=\"http:\/\/luigi.readthedocs.io\/en\/stable\/api\/luigi.contrib.gcs.html\" rel=\"noopener\">This module<\/a>\u00a0should be helpful.<\/li>\n<li>Read more about speeding up your pipeline with\u00a0<a href=\"https:\/\/dask.readthedocs.io\/en\/latest\/\" rel=\"noopener\">Dask<\/a>\u00a0and\u00a0<a href=\"https:\/\/github.com\/dask\/dask-kubernetes\" rel=\"noopener\">integrating it with Kubernetes<\/a>.<\/li>\n<\/ul>\n","protected":false},"excerpt":{"rendered":"<p>Ready to learn Machine Learning? Browse Machine Learning Training and Certification courses developed by industry thought leaders and Experfy in Harvard Innovation Lab. How to parallelize and distribute your Python machine learning pipelines with Luigi, Docker, and Kubernetes &nbsp; This article presents the easiest way to turn your machine learning application from a simple Python<\/p>\n","protected":false},"author":314,"featured_media":23827,"comment_status":"open","ping_status":"open","sticky":false,"template":"single-post-2.php","format":"standard","meta":{"content-type":"","footnotes":""},"categories":[183],"tags":[92],"ppma_author":[2069],"class_list":["post-1030","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-ai-ml","tag-machine-learning"],"authors":[{"term_id":2069,"user_id":314,"is_guest":0,"slug":"markus-schmitt","display_name":"Markus Schmitt","avatar_url":"https:\/\/secure.gravatar.com\/avatar\/?s=96&d=mm&r=g","user_url":"","last_name":"Schmitt","first_name":"Markus","job_title":"","description":"Markus Schmitt is the founder and head of data science at Data Revenue, a Machine Learning Agency based in Berlin, Germany, where he builds custom end-to-end machine learning systems for Medical, Finance and Marketing clients. Before Data Revenue he developed new ventures for the company builder Team Europe and studied Mathematics &amp; Economics at Warwick."}],"_links":{"self":[{"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/posts\/1030","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/users\/314"}],"replies":[{"embeddable":true,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/comments?post=1030"}],"version-history":[{"count":2,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/posts\/1030\/revisions"}],"predecessor-version":[{"id":23829,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/posts\/1030\/revisions\/23829"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/media\/23827"}],"wp:attachment":[{"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/media?parent=1030"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/categories?post=1030"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/tags?post=1030"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.experfy.com\/blog\/wp-json\/wp\/v2\/ppma_author?post=1030"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}