Pipelines
Pipelines are a key concept in the aggregation framework.
Therefore, the Pipeline
class is also a central class in the package
Building a pipeline
Pipeline
class includes a method for each stage of the aggregation framework.
Each stage of the aggregation framework also has its own class in the package.
And each Stage
class has a mirror method in the Pipeline
. For more information, see the stages page.
For example, the Match
stage has a match
method in the Pipeline
class that can be typed as pipeline.match()
like in the code snippet below.
The last line of code will add a Match
stage instance to the pipeline and return the pipeline instance.
That way, you can chain the stages together to build your pipeline.
from monggregate import Pipeline
pipeline = Pipeline()
# The below pipeline will return (when executed)
# the most recent movie with the title "A Star is Born"
pipeline.match(
title="A Star Is Born"
).sort(
by="year"
).limit(
value=1
)
Executing A Pipeline
So far, we have built our pipeline object. But what do we do with it?
monggregate
offers a bilateral integration with the tool of your choice to execute the pipeline.
Bilateral because you can either integrate your pipeline to your tool or your tool to into the pipeline.
In the following examples, I'll use pymongo
because at the end of the day, motor
, beanie
and, mongoengine
all use pymongo
under the hood.
Passing Your Pipeline to Your Tool
The Pipeline
class has an export
method that returns a list of dictionaries of raw MongoDB aggregation language, which is the format expected by pymongo
.
import pymongo
from monggregate import Pipeline, S
# Setup your pymongo connexion
MONGODB_URI = f"insert_your_own_uri"
client = pymongo.MongoClient(MONGODB_URI)
db = client["sample_mflix"]
# Create your pipeline
pipeline = Pipeline()
# Build your pipeline
pipeline.match(
title="A Star Is Born"
).sort(
by="year"
).limit(
value=1
)
# Execute your pipeline
curosr = db["movies"].aggregate(pipeline.export())
results = list(curosr)
print(results)
Passing Your Tool to Your Pipeline
The pipeline class also has run
method, a _db
and a collection
attributes that you can set that make your pipelines callable and runnable by being aware of your database connection.
Thus, you could write the above example like this:
import pymongo
from monggregate import Pipeline, S
# Setup your pymongo connexion
MONGODB_URI = f"insert_your_own_uri"
client = pymongo.MongoClient(MONGODB_URI)
db = client["sample_mflix"]
# Create your database aware pipeline
pipeline = Pipeline(_db=db, collection="movies")
# Build your pipeline
# (This does not change)
pipeline.match(
title="A Star Is Born"
).sort(
by="year"
).limit(
value=1
)
# Execute your pipeline
# (The execution is simpler)
results = pipeline.run()
print(results)
It also has a __call__
method, so you could replace the last two lines with:
How to Choose a Method?
It is up to you to choose the method that suits you the best.
I personnaly use the first method for now.
There are plans to replace the _db
attribute by a uri
attribute and make the database connection happen under the hood, but it is not implemented yet. When it is added into the second method, it will become more appealing.
An Alternative to Build Pipelines
Another way to build your pipeline is to access the stages classes directly. All the stages are accessible in the monggregate.stages
namespace.
As such, you could write the above example like this:
import pymongo
from monggregate import Pipeline, stages
# Setup your pymongo connexion
MONGODB_URI = f"insert_your_own_uri"
client = pymongo.MongoClient(MONGODB_URI)
db = client["sample_mflix"]
# Prepare your stages
match_stage = stages.Match(query={"title": "A Star Is Born"})
sort_stage = stages.Sort(by="year")
limit_stage = stages.Limit(value=1)
stages = [match_stage, sort_stage, limit_stage]
# Create your pipeline ready to be executed
pipeline = Pipeline(stages=stages)
# Execute your pipeline
curosr = db["movies"].aggregate(pipeline.export())
results = list(curosr)
print(results)
This approach might be more readable for some people, but it is also more verbose.
However, there are still a couple of other advantages with this approach:
- You can reuse the stages in multiple pipelines
- You can easily reorder the stages
The second point is particularly relevant given the utilities function in the Pipeline
class.
Pipeline Utilities
The Pipeline
class has a few utilities methods to help you build your pipeline.
Indeed it implements most of the python list methods, so you do not have to access the stages attribute to perform list operations.
In the examples above, len(pipeline)
would return 3
.
You could also, for example, append a stage to the pipeline like this:
You also have access to the append
, extend
, insert
, methods directly on the pipeline
object.